commit cc93029c25d975ee7e3fd1460948efeef631e1f1 from: Jelmer Vernooij date: Sun Apr 14 10:42:59 2024 UTC Re-add dulwich.tests.utils and dulwich.tests.test_object_store for use with e.g. breezy commit - 245331a60d743b7b73ba3a8b15e6f4648273369f commit + cc93029c25d975ee7e3fd1460948efeef631e1f1 blob - /dev/null blob + 99da26a53a2f182153745c44decc728309e16bda (mode 644) --- /dev/null +++ dulwich/tests/__init__.py @@ -0,0 +1,21 @@ +# __init__.py -- The tests for dulwich +# Copyright (C) 2024 Jelmer Vernooij +# +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. +# + +"""Tests for Dulwich.""" blob - /dev/null blob + a33bf67871e5d4e129fced36f06c4b16035ff2cc (mode 644) --- /dev/null +++ dulwich/tests/test_object_store.py @@ -0,0 +1,299 @@ +# test_object_store.py -- tests for object_store.py +# Copyright (C) 2008 Jelmer Vernooij +# +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. +# + +"""Tests for the object store interface.""" + +from unittest import skipUnless + +from dulwich.index import commit_tree +from dulwich.object_store import ( + iter_tree_contents, + peel_sha, +) +from dulwich.objects import ( + Blob, + TreeEntry, +) +from dulwich.protocol import DEPTH_INFINITE + +from .utils import make_object, make_tag + +try: + from unittest.mock import patch +except ImportError: + patch = None # type: ignore + + +testobject = make_object(Blob, data=b"yummy data") + + +class ObjectStoreTests: + def test_determine_wants_all(self): + self.assertEqual( + [b"1" * 40], + self.store.determine_wants_all({b"refs/heads/foo": b"1" * 40}), + ) + + def test_determine_wants_all_zero(self): + self.assertEqual( + [], self.store.determine_wants_all({b"refs/heads/foo": b"0" * 40}) + ) + + @skipUnless(patch, "Required mock.patch") + def test_determine_wants_all_depth(self): + self.store.add_object(testobject) + refs = {b"refs/heads/foo": testobject.id} + with patch.object(self.store, "_get_depth", return_value=1) as m: + self.assertEqual([], self.store.determine_wants_all(refs, depth=0)) + self.assertEqual( + [testobject.id], + self.store.determine_wants_all(refs, depth=DEPTH_INFINITE), + ) + m.assert_not_called() + + self.assertEqual([], self.store.determine_wants_all(refs, depth=1)) + m.assert_called_with(testobject.id) + self.assertEqual( + [testobject.id], self.store.determine_wants_all(refs, depth=2) + ) + + def test_get_depth(self): + self.assertEqual(0, self.store._get_depth(testobject.id)) + + self.store.add_object(testobject) + self.assertEqual( + 1, self.store._get_depth(testobject.id, get_parents=lambda x: []) + ) + + parent = make_object(Blob, data=b"parent data") + self.store.add_object(parent) + self.assertEqual( + 2, + self.store._get_depth( + testobject.id, + get_parents=lambda x: [parent.id] if x == testobject else [], + ), + ) + + def test_iter(self): + self.assertEqual([], list(self.store)) + + def test_get_nonexistant(self): + self.assertRaises(KeyError, lambda: self.store[b"a" * 40]) + + def test_contains_nonexistant(self): + self.assertNotIn(b"a" * 40, self.store) + + def test_add_objects_empty(self): + self.store.add_objects([]) + + def test_add_commit(self): + # TODO: Argh, no way to construct Git commit objects without + # access to a serialized form. + self.store.add_objects([]) + + def test_store_resilience(self): + """Test if updating an existing stored object doesn't erase the + object from the store. + """ + test_object = make_object(Blob, data=b"data") + + self.store.add_object(test_object) + test_object_id = test_object.id + test_object.data = test_object.data + b"update" + stored_test_object = self.store[test_object_id] + + self.assertNotEqual(test_object.id, stored_test_object.id) + self.assertEqual(stored_test_object.id, test_object_id) + + def test_add_object(self): + self.store.add_object(testobject) + self.assertEqual({testobject.id}, set(self.store)) + self.assertIn(testobject.id, self.store) + r = self.store[testobject.id] + self.assertEqual(r, testobject) + + def test_add_objects(self): + data = [(testobject, "mypath")] + self.store.add_objects(data) + self.assertEqual({testobject.id}, set(self.store)) + self.assertIn(testobject.id, self.store) + r = self.store[testobject.id] + self.assertEqual(r, testobject) + + def test_tree_changes(self): + blob_a1 = make_object(Blob, data=b"a1") + blob_a2 = make_object(Blob, data=b"a2") + blob_b = make_object(Blob, data=b"b") + for blob in [blob_a1, blob_a2, blob_b]: + self.store.add_object(blob) + + blobs_1 = [(b"a", blob_a1.id, 0o100644), (b"b", blob_b.id, 0o100644)] + tree1_id = commit_tree(self.store, blobs_1) + blobs_2 = [(b"a", blob_a2.id, 0o100644), (b"b", blob_b.id, 0o100644)] + tree2_id = commit_tree(self.store, blobs_2) + change_a = ( + (b"a", b"a"), + (0o100644, 0o100644), + (blob_a1.id, blob_a2.id), + ) + self.assertEqual([change_a], list(self.store.tree_changes(tree1_id, tree2_id))) + self.assertEqual( + [ + change_a, + ((b"b", b"b"), (0o100644, 0o100644), (blob_b.id, blob_b.id)), + ], + list(self.store.tree_changes(tree1_id, tree2_id, want_unchanged=True)), + ) + + def test_iter_tree_contents(self): + blob_a = make_object(Blob, data=b"a") + blob_b = make_object(Blob, data=b"b") + blob_c = make_object(Blob, data=b"c") + for blob in [blob_a, blob_b, blob_c]: + self.store.add_object(blob) + + blobs = [ + (b"a", blob_a.id, 0o100644), + (b"ad/b", blob_b.id, 0o100644), + (b"ad/bd/c", blob_c.id, 0o100755), + (b"ad/c", blob_c.id, 0o100644), + (b"c", blob_c.id, 0o100644), + ] + tree_id = commit_tree(self.store, blobs) + self.assertEqual( + [TreeEntry(p, m, h) for (p, h, m) in blobs], + list(iter_tree_contents(self.store, tree_id)), + ) + self.assertEqual([], list(iter_tree_contents(self.store, None))) + + def test_iter_tree_contents_include_trees(self): + blob_a = make_object(Blob, data=b"a") + blob_b = make_object(Blob, data=b"b") + blob_c = make_object(Blob, data=b"c") + for blob in [blob_a, blob_b, blob_c]: + self.store.add_object(blob) + + blobs = [ + (b"a", blob_a.id, 0o100644), + (b"ad/b", blob_b.id, 0o100644), + (b"ad/bd/c", blob_c.id, 0o100755), + ] + tree_id = commit_tree(self.store, blobs) + tree = self.store[tree_id] + tree_ad = self.store[tree[b"ad"][1]] + tree_bd = self.store[tree_ad[b"bd"][1]] + + expected = [ + TreeEntry(b"", 0o040000, tree_id), + TreeEntry(b"a", 0o100644, blob_a.id), + TreeEntry(b"ad", 0o040000, tree_ad.id), + TreeEntry(b"ad/b", 0o100644, blob_b.id), + TreeEntry(b"ad/bd", 0o040000, tree_bd.id), + TreeEntry(b"ad/bd/c", 0o100755, blob_c.id), + ] + actual = iter_tree_contents(self.store, tree_id, include_trees=True) + self.assertEqual(expected, list(actual)) + + def make_tag(self, name, obj): + tag = make_tag(obj, name=name) + self.store.add_object(tag) + return tag + + def test_peel_sha(self): + self.store.add_object(testobject) + tag1 = self.make_tag(b"1", testobject) + tag2 = self.make_tag(b"2", testobject) + tag3 = self.make_tag(b"3", testobject) + for obj in [testobject, tag1, tag2, tag3]: + self.assertEqual((obj, testobject), peel_sha(self.store, obj.id)) + + def test_get_raw(self): + self.store.add_object(testobject) + self.assertEqual( + (Blob.type_num, b"yummy data"), self.store.get_raw(testobject.id) + ) + + def test_close(self): + # For now, just check that close doesn't barf. + self.store.add_object(testobject) + self.store.close() + + +class PackBasedObjectStoreTests(ObjectStoreTests): + def tearDown(self): + for pack in self.store.packs: + pack.close() + + def test_empty_packs(self): + self.assertEqual([], list(self.store.packs)) + + def test_pack_loose_objects(self): + b1 = make_object(Blob, data=b"yummy data") + self.store.add_object(b1) + b2 = make_object(Blob, data=b"more yummy data") + self.store.add_object(b2) + b3 = make_object(Blob, data=b"even more yummy data") + b4 = make_object(Blob, data=b"and more yummy data") + self.store.add_objects([(b3, None), (b4, None)]) + self.assertEqual({b1.id, b2.id, b3.id, b4.id}, set(self.store)) + self.assertEqual(1, len(self.store.packs)) + self.assertEqual(2, self.store.pack_loose_objects()) + self.assertNotEqual([], list(self.store.packs)) + self.assertEqual(0, self.store.pack_loose_objects()) + + def test_repack(self): + b1 = make_object(Blob, data=b"yummy data") + self.store.add_object(b1) + b2 = make_object(Blob, data=b"more yummy data") + self.store.add_object(b2) + b3 = make_object(Blob, data=b"even more yummy data") + b4 = make_object(Blob, data=b"and more yummy data") + self.store.add_objects([(b3, None), (b4, None)]) + b5 = make_object(Blob, data=b"and more data") + b6 = make_object(Blob, data=b"and some more data") + self.store.add_objects([(b5, None), (b6, None)]) + self.assertEqual({b1.id, b2.id, b3.id, b4.id, b5.id, b6.id}, set(self.store)) + self.assertEqual(2, len(self.store.packs)) + self.assertEqual(6, self.store.repack()) + self.assertEqual(1, len(self.store.packs)) + self.assertEqual(0, self.store.pack_loose_objects()) + + def test_repack_existing(self): + b1 = make_object(Blob, data=b"yummy data") + self.store.add_object(b1) + b2 = make_object(Blob, data=b"more yummy data") + self.store.add_object(b2) + self.store.add_objects([(b1, None), (b2, None)]) + self.store.add_objects([(b2, None)]) + self.assertEqual({b1.id, b2.id}, set(self.store)) + self.assertEqual(2, len(self.store.packs)) + self.assertEqual(2, self.store.repack()) + self.assertEqual(1, len(self.store.packs)) + self.assertEqual(0, self.store.pack_loose_objects()) + + self.assertEqual({b1.id, b2.id}, set(self.store)) + self.assertEqual(1, len(self.store.packs)) + self.assertEqual(2, self.store.repack()) + self.assertEqual(1, len(self.store.packs)) + self.assertEqual(0, self.store.pack_loose_objects()) + + + blob - /dev/null blob + a616de5187b46416952df73385617109d18b3ff4 (mode 644) --- /dev/null +++ dulwich/tests/utils.py @@ -0,0 +1,365 @@ +# utils.py -- Test utilities for Dulwich. +# Copyright (C) 2010 Google, Inc. +# +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. +# + +"""Utility functions common to Dulwich tests.""" + +import datetime +import os +import shutil +import tempfile +import time +import types +import warnings +from unittest import SkipTest + +from dulwich.index import commit_tree +from dulwich.objects import Commit, FixedSha, Tag, object_class +from dulwich.pack import ( + DELTA_TYPES, + OFS_DELTA, + REF_DELTA, + SHA1Writer, + create_delta, + obj_sha, + write_pack_header, + write_pack_object, +) +from dulwich.repo import Repo + +# Plain files are very frequently used in tests, so let the mode be very short. +F = 0o100644 # Shorthand mode for Files. + + +def open_repo(name, temp_dir=None): + """Open a copy of a repo in a temporary directory. + + Use this function for accessing repos in dulwich/tests/data/repos to avoid + accidentally or intentionally modifying those repos in place. Use + tear_down_repo to delete any temp files created. + + Args: + name: The name of the repository, relative to + dulwich/tests/data/repos + temp_dir: temporary directory to initialize to. If not provided, a + temporary directory will be created. + Returns: An initialized Repo object that lives in a temporary directory. + """ + if temp_dir is None: + temp_dir = tempfile.mkdtemp() + repo_dir = os.path.join( + os.path.dirname(__file__), "..", "..", "testdata", "repos", name + ) + temp_repo_dir = os.path.join(temp_dir, name) + shutil.copytree(repo_dir, temp_repo_dir, symlinks=True) + return Repo(temp_repo_dir) + + +def tear_down_repo(repo): + """Tear down a test repository.""" + repo.close() + temp_dir = os.path.dirname(repo.path.rstrip(os.sep)) + shutil.rmtree(temp_dir) + + +def make_object(cls, **attrs): + """Make an object for testing and assign some members. + + This method creates a new subclass to allow arbitrary attribute + reassignment, which is not otherwise possible with objects having + __slots__. + + Args: + attrs: dict of attributes to set on the new object. + Returns: A newly initialized object of type cls. + """ + + class TestObject(cls): + """Class that inherits from the given class, but without __slots__. + + Note that classes with __slots__ can't have arbitrary attributes + monkey-patched in, so this is a class that is exactly the same only + with a __dict__ instead of __slots__. + """ + + TestObject.__name__ = "TestObject_" + cls.__name__ + + obj = TestObject() + for name, value in attrs.items(): + if name == "id": + # id property is read-only, so we overwrite sha instead. + sha = FixedSha(value) + obj.sha = lambda: sha + else: + setattr(obj, name, value) + return obj + + +def make_commit(**attrs): + """Make a Commit object with a default set of members. + + Args: + attrs: dict of attributes to overwrite from the default values. + Returns: A newly initialized Commit object. + """ + default_time = 1262304000 # 2010-01-01 00:00:00 + all_attrs = { + "author": b"Test Author ", + "author_time": default_time, + "author_timezone": 0, + "committer": b"Test Committer ", + "commit_time": default_time, + "commit_timezone": 0, + "message": b"Test message.", + "parents": [], + "tree": b"0" * 40, + } + all_attrs.update(attrs) + return make_object(Commit, **all_attrs) + + +def make_tag(target, **attrs): + """Make a Tag object with a default set of values. + + Args: + target: object to be tagged (Commit, Blob, Tree, etc) + attrs: dict of attributes to overwrite from the default values. + Returns: A newly initialized Tag object. + """ + target_id = target.id + target_type = object_class(target.type_name) + default_time = int(time.mktime(datetime.datetime(2010, 1, 1).timetuple())) + all_attrs = { + "tagger": b"Test Author ", + "tag_time": default_time, + "tag_timezone": 0, + "message": b"Test message.", + "object": (target_type, target_id), + "name": b"Test Tag", + } + all_attrs.update(attrs) + return make_object(Tag, **all_attrs) + + +def functest_builder(method, func): + """Generate a test method that tests the given function.""" + + def do_test(self): + method(self, func) + + return do_test + + +def ext_functest_builder(method, func): + """Generate a test method that tests the given extension function. + + This is intended to generate test methods that test both a pure-Python + version and an extension version using common test code. The extension test + will raise SkipTest if the extension is not found. + + Sample usage: + + class MyTest(TestCase); + def _do_some_test(self, func_impl): + self.assertEqual('foo', func_impl()) + + test_foo = functest_builder(_do_some_test, foo_py) + test_foo_extension = ext_functest_builder(_do_some_test, _foo_c) + + Args: + method: The method to run. It must must two parameters, self and the + function implementation to test. + func: The function implementation to pass to method. + """ + + def do_test(self): + if not isinstance(func, types.BuiltinFunctionType): + raise SkipTest("%s extension not found" % func) + method(self, func) + + return do_test + + +def build_pack(f, objects_spec, store=None): + """Write test pack data from a concise spec. + + Args: + f: A file-like object to write the pack to. + objects_spec: A list of (type_num, obj). For non-delta types, obj + is the string of that object's data. + For delta types, obj is a tuple of (base, data), where: + + * base can be either an index in objects_spec of the base for that + * delta; or for a ref delta, a SHA, in which case the resulting pack + * will be thin and the base will be an external ref. + * data is a string of the full, non-deltified data for that object. + + Note that offsets/refs and deltas are computed within this function. + store: An optional ObjectStore for looking up external refs. + Returns: A list of tuples in the order specified by objects_spec: + (offset, type num, data, sha, CRC32) + """ + sf = SHA1Writer(f) + num_objects = len(objects_spec) + write_pack_header(sf.write, num_objects) + + full_objects = {} + offsets = {} + crc32s = {} + + while len(full_objects) < num_objects: + for i, (type_num, data) in enumerate(objects_spec): + if type_num not in DELTA_TYPES: + full_objects[i] = (type_num, data, obj_sha(type_num, [data])) + continue + base, data = data + if isinstance(base, int): + if base not in full_objects: + continue + base_type_num, _, _ = full_objects[base] + else: + base_type_num, _ = store.get_raw(base) + full_objects[i] = ( + base_type_num, + data, + obj_sha(base_type_num, [data]), + ) + + for i, (type_num, obj) in enumerate(objects_spec): + offset = f.tell() + if type_num == OFS_DELTA: + base_index, data = obj + base = offset - offsets[base_index] + _, base_data, _ = full_objects[base_index] + obj = (base, list(create_delta(base_data, data))) + elif type_num == REF_DELTA: + base_ref, data = obj + if isinstance(base_ref, int): + _, base_data, base = full_objects[base_ref] + else: + base_type_num, base_data = store.get_raw(base_ref) + base = obj_sha(base_type_num, base_data) + obj = (base, list(create_delta(base_data, data))) + + crc32 = write_pack_object(sf.write, type_num, obj) + offsets[i] = offset + crc32s[i] = crc32 + + expected = [] + for i in range(num_objects): + type_num, data, sha = full_objects[i] + assert len(sha) == 20 + expected.append((offsets[i], type_num, data, sha, crc32s[i])) + + sf.write_sha() + f.seek(0) + return expected + + +def build_commit_graph(object_store, commit_spec, trees=None, attrs=None): + """Build a commit graph from a concise specification. + + Sample usage: + >>> c1, c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]]) + >>> store[store[c3].parents[0]] == c1 + True + >>> store[store[c3].parents[1]] == c2 + True + + If not otherwise specified, commits will refer to the empty tree and have + commit times increasing in the same order as the commit spec. + + Args: + object_store: An ObjectStore to commit objects to. + commit_spec: An iterable of iterables of ints defining the commit + graph. Each entry defines one commit, and entries must be in + topological order. The first element of each entry is a commit number, + and the remaining elements are its parents. The commit numbers are only + meaningful for the call to make_commits; since real commit objects are + created, they will get created with real, opaque SHAs. + trees: An optional dict of commit number -> tree spec for building + trees for commits. The tree spec is an iterable of (path, blob, mode) + or (path, blob) entries; if mode is omitted, it defaults to the normal + file mode (0100644). + attrs: A dict of commit number -> (dict of attribute -> value) for + assigning additional values to the commits. + Returns: The list of commit objects created. + + Raises: + ValueError: If an undefined commit identifier is listed as a parent. + """ + if trees is None: + trees = {} + if attrs is None: + attrs = {} + commit_time = 0 + nums = {} + commits = [] + + for commit in commit_spec: + commit_num = commit[0] + try: + parent_ids = [nums[pn] for pn in commit[1:]] + except KeyError as exc: + (missing_parent,) = exc.args + raise ValueError("Unknown parent %i" % missing_parent) from exc + + blobs = [] + for entry in trees.get(commit_num, []): + if len(entry) == 2: + path, blob = entry + entry = (path, blob, F) + path, blob, mode = entry + blobs.append((path, blob.id, mode)) + object_store.add_object(blob) + tree_id = commit_tree(object_store, blobs) + + commit_attrs = { + "message": ("Commit %i" % commit_num).encode("ascii"), + "parents": parent_ids, + "tree": tree_id, + "commit_time": commit_time, + } + commit_attrs.update(attrs.get(commit_num, {})) + commit_obj = make_commit(**commit_attrs) + + # By default, increment the time by a lot. Out-of-order commits should + # be closer together than this because their main cause is clock skew. + commit_time = commit_attrs["commit_time"] + 100 + nums[commit_num] = commit_obj.id + object_store.add_object(commit_obj) + commits.append(commit_obj) + + return commits + + +def setup_warning_catcher(): + """Wrap warnings.showwarning with code that records warnings.""" + caught_warnings = [] + original_showwarning = warnings.showwarning + + def custom_showwarning(*args, **kwargs): + caught_warnings.append(args[0]) + + warnings.showwarning = custom_showwarning + + def restore_showwarning(): + warnings.showwarning = original_showwarning + + return caught_warnings, restore_showwarning blob - cd06126c084b381fb590de441314ee90ec4626ba blob + ec82a88b2bfaac6721b1fd429dff08a23730be2a --- tests/compat/server_utils.py +++ tests/compat/server_utils.py @@ -30,8 +30,8 @@ from dulwich.objects import hex_to_sha from dulwich.protocol import CAPABILITY_SIDE_BAND_64K from dulwich.repo import Repo from dulwich.server import ReceivePackHandler +from dulwich.tests.utils import tear_down_repo -from ..utils import tear_down_repo from .utils import require_git_version, run_git_or_fail blob - 5f81e137166827f3e0c0019e93c17d4972c548c4 blob + aed8dfca66ee6e3239f690d12d29f788c264eb02 --- tests/compat/test_porcelain.py +++ tests/compat/test_porcelain.py @@ -26,9 +26,9 @@ import sys from unittest import skipIf from dulwich import porcelain +from dulwich.tests.utils import build_commit_graph from ..test_porcelain import PorcelainGpgTestCase -from ..utils import build_commit_graph from .utils import CompatTestCase, run_git_or_fail blob - 4f6622b40b6cf24486cb3142832ad6d37ab5c914 blob + 38439e030e1c9b30990663e800a6d952250f2aed --- tests/contrib/test_release_robot.py +++ tests/contrib/test_release_robot.py @@ -30,9 +30,8 @@ from typing import ClassVar, Dict, List, Optional, Tup from dulwich.contrib import release_robot from dulwich.repo import Repo +from dulwich.tests.utils import make_commit, make_tag -from ..utils import make_commit, make_tag - BASEDIR = os.path.abspath(os.path.dirname(__file__)) # this directory blob - a0a0508865cbb93ca48f9cceafe74f23b4e45921 blob + 98988cdd57bc992f8c59b13dae1befa61544a444 --- tests/test_archive.py +++ tests/test_archive.py @@ -28,9 +28,9 @@ from unittest import skipUnless from dulwich.archive import tar_stream from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob, Tree +from dulwich.tests.utils import build_commit_graph from . import TestCase -from .utils import build_commit_graph try: from unittest.mock import patch blob - 3ada349921cf98083d73749174ad67ec17466072 blob + 6f4820c6bf6c2729d4862687ec0b1a0d0a20493d --- tests/test_client.py +++ tests/test_client.py @@ -60,9 +60,9 @@ from dulwich.objects import Commit, Tree from dulwich.pack import pack_objects_to_data, write_pack_data, write_pack_objects from dulwich.protocol import TCP_GIT_PORT, Protocol from dulwich.repo import MemoryRepo, Repo +from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo from . import TestCase, skipIf -from .utils import open_repo, setup_warning_catcher, tear_down_repo class DummyClient(TraditionalGitClient): blob - dd985227bcf8c1e1c739ed63f615bc842478592b blob + 5fb0afd8fda7c4ae99ee06ff3f2fdca383fc66f3 --- tests/test_config.py +++ tests/test_config.py @@ -312,7 +312,7 @@ class StackedConfigTests(TestCase): @skipIf(sys.platform != "win32", "Windows specific config location.") def test_windows_config_from_path(self): - from ..config import get_win_system_paths + from dulwich.config import get_win_system_paths install_dir = os.path.join("C:", "foo", "Git") self.overrideEnv("PATH", os.path.join(install_dir, "cmd")) @@ -330,7 +330,7 @@ class StackedConfigTests(TestCase): def test_windows_config_from_reg(self): import winreg - from ..config import get_win_system_paths + from dulwich.config import get_win_system_paths self.overrideEnv("PATH", None) install_dir = os.path.join("C:", "foo", "Git") blob - 04c83d053ffecff9cf77799285e182bcfecd06bb blob + 09823abd4a7a1dd3dc870a4490edb38609ef6729 --- tests/test_diff_tree.py +++ tests/test_diff_tree.py @@ -43,9 +43,9 @@ from dulwich.diff_tree import ( from dulwich.index import commit_tree from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob, ShaFile, Tree, TreeEntry +from dulwich.tests.utils import F, ext_functest_builder, functest_builder, make_object from . import TestCase -from .utils import F, ext_functest_builder, functest_builder, make_object class DiffTestCase(TestCase): blob - 5076f7767b6d517b08f1dd34c6515586e7763118 blob + 86e3a0565c1854025b7751fe299ec918fd86137e --- tests/test_fastexport.py +++ tests/test_fastexport.py @@ -24,9 +24,9 @@ from io import BytesIO from dulwich.object_store import MemoryObjectStore from dulwich.objects import ZERO_SHA, Blob, Commit, Tree from dulwich.repo import MemoryRepo +from dulwich.tests.utils import build_commit_graph from . import SkipTest, TestCase -from .utils import build_commit_graph class GitFastExporterTests(TestCase): blob - 0d14b5e356fca3b51749d7c540ef9d6a0b00d93f blob + c6ac3fd3af34af142063418f75383baf546a650f --- tests/test_graph.py +++ tests/test_graph.py @@ -21,9 +21,9 @@ from dulwich.graph import WorkList, _find_lcas, can_fast_forward from dulwich.repo import MemoryRepo +from dulwich.tests.utils import make_commit from . import TestCase -from .utils import make_commit class FindMergeBaseTests(TestCase): blob - 7a6b79b5230c062a11cb89eb3f93c4ad0487879b blob + b222099a21c3181c15644db2c9d38acbbb92139e --- tests/test_missing_obj_finder.py +++ tests/test_missing_obj_finder.py @@ -20,9 +20,9 @@ from dulwich.object_store import MemoryObjectStore, MissingObjectFinder from dulwich.objects import Blob +from dulwich.tests.utils import build_commit_graph, make_object, make_tag from . import TestCase -from .utils import build_commit_graph, make_object, make_tag class MissingObjectFinderTest(TestCase): blob - b9d78e8aa227c57b0163118e3a4690b9e562d896 blob + ac4cb9edb926f58bc7371bd5c5db25fc9f6bd2cc --- tests/test_object_store.py +++ tests/test_object_store.py @@ -27,7 +27,6 @@ import sys import tempfile from contextlib import closing from io import BytesIO -from unittest import skipUnless from dulwich.errors import NotTreeError from dulwich.index import commit_tree @@ -37,8 +36,6 @@ from dulwich.object_store import ( ObjectStoreGraphWalker, OverlayObjectStore, commit_tree_changes, - iter_tree_contents, - peel_sha, read_packs_file, tree_lookup_path, ) @@ -52,213 +49,14 @@ from dulwich.objects import ( sha_to_hex, ) from dulwich.pack import REF_DELTA, write_pack_objects -from dulwich.protocol import DEPTH_INFINITE +from dulwich.tests.test_object_store import ObjectStoreTests, PackBasedObjectStoreTests +from dulwich.tests.utils import build_pack, make_object from . import TestCase -from .utils import build_pack, make_object, make_tag -try: - from unittest.mock import patch -except ImportError: - patch = None # type: ignore - - testobject = make_object(Blob, data=b"yummy data") -class ObjectStoreTests: - def test_determine_wants_all(self): - self.assertEqual( - [b"1" * 40], - self.store.determine_wants_all({b"refs/heads/foo": b"1" * 40}), - ) - - def test_determine_wants_all_zero(self): - self.assertEqual( - [], self.store.determine_wants_all({b"refs/heads/foo": b"0" * 40}) - ) - - @skipUnless(patch, "Required mock.patch") - def test_determine_wants_all_depth(self): - self.store.add_object(testobject) - refs = {b"refs/heads/foo": testobject.id} - with patch.object(self.store, "_get_depth", return_value=1) as m: - self.assertEqual([], self.store.determine_wants_all(refs, depth=0)) - self.assertEqual( - [testobject.id], - self.store.determine_wants_all(refs, depth=DEPTH_INFINITE), - ) - m.assert_not_called() - - self.assertEqual([], self.store.determine_wants_all(refs, depth=1)) - m.assert_called_with(testobject.id) - self.assertEqual( - [testobject.id], self.store.determine_wants_all(refs, depth=2) - ) - - def test_get_depth(self): - self.assertEqual(0, self.store._get_depth(testobject.id)) - - self.store.add_object(testobject) - self.assertEqual( - 1, self.store._get_depth(testobject.id, get_parents=lambda x: []) - ) - - parent = make_object(Blob, data=b"parent data") - self.store.add_object(parent) - self.assertEqual( - 2, - self.store._get_depth( - testobject.id, - get_parents=lambda x: [parent.id] if x == testobject else [], - ), - ) - - def test_iter(self): - self.assertEqual([], list(self.store)) - - def test_get_nonexistant(self): - self.assertRaises(KeyError, lambda: self.store[b"a" * 40]) - - def test_contains_nonexistant(self): - self.assertNotIn(b"a" * 40, self.store) - - def test_add_objects_empty(self): - self.store.add_objects([]) - - def test_add_commit(self): - # TODO: Argh, no way to construct Git commit objects without - # access to a serialized form. - self.store.add_objects([]) - - def test_store_resilience(self): - """Test if updating an existing stored object doesn't erase the - object from the store. - """ - test_object = make_object(Blob, data=b"data") - - self.store.add_object(test_object) - test_object_id = test_object.id - test_object.data = test_object.data + b"update" - stored_test_object = self.store[test_object_id] - - self.assertNotEqual(test_object.id, stored_test_object.id) - self.assertEqual(stored_test_object.id, test_object_id) - - def test_add_object(self): - self.store.add_object(testobject) - self.assertEqual({testobject.id}, set(self.store)) - self.assertIn(testobject.id, self.store) - r = self.store[testobject.id] - self.assertEqual(r, testobject) - - def test_add_objects(self): - data = [(testobject, "mypath")] - self.store.add_objects(data) - self.assertEqual({testobject.id}, set(self.store)) - self.assertIn(testobject.id, self.store) - r = self.store[testobject.id] - self.assertEqual(r, testobject) - - def test_tree_changes(self): - blob_a1 = make_object(Blob, data=b"a1") - blob_a2 = make_object(Blob, data=b"a2") - blob_b = make_object(Blob, data=b"b") - for blob in [blob_a1, blob_a2, blob_b]: - self.store.add_object(blob) - - blobs_1 = [(b"a", blob_a1.id, 0o100644), (b"b", blob_b.id, 0o100644)] - tree1_id = commit_tree(self.store, blobs_1) - blobs_2 = [(b"a", blob_a2.id, 0o100644), (b"b", blob_b.id, 0o100644)] - tree2_id = commit_tree(self.store, blobs_2) - change_a = ( - (b"a", b"a"), - (0o100644, 0o100644), - (blob_a1.id, blob_a2.id), - ) - self.assertEqual([change_a], list(self.store.tree_changes(tree1_id, tree2_id))) - self.assertEqual( - [ - change_a, - ((b"b", b"b"), (0o100644, 0o100644), (blob_b.id, blob_b.id)), - ], - list(self.store.tree_changes(tree1_id, tree2_id, want_unchanged=True)), - ) - - def test_iter_tree_contents(self): - blob_a = make_object(Blob, data=b"a") - blob_b = make_object(Blob, data=b"b") - blob_c = make_object(Blob, data=b"c") - for blob in [blob_a, blob_b, blob_c]: - self.store.add_object(blob) - - blobs = [ - (b"a", blob_a.id, 0o100644), - (b"ad/b", blob_b.id, 0o100644), - (b"ad/bd/c", blob_c.id, 0o100755), - (b"ad/c", blob_c.id, 0o100644), - (b"c", blob_c.id, 0o100644), - ] - tree_id = commit_tree(self.store, blobs) - self.assertEqual( - [TreeEntry(p, m, h) for (p, h, m) in blobs], - list(iter_tree_contents(self.store, tree_id)), - ) - self.assertEqual([], list(iter_tree_contents(self.store, None))) - - def test_iter_tree_contents_include_trees(self): - blob_a = make_object(Blob, data=b"a") - blob_b = make_object(Blob, data=b"b") - blob_c = make_object(Blob, data=b"c") - for blob in [blob_a, blob_b, blob_c]: - self.store.add_object(blob) - - blobs = [ - (b"a", blob_a.id, 0o100644), - (b"ad/b", blob_b.id, 0o100644), - (b"ad/bd/c", blob_c.id, 0o100755), - ] - tree_id = commit_tree(self.store, blobs) - tree = self.store[tree_id] - tree_ad = self.store[tree[b"ad"][1]] - tree_bd = self.store[tree_ad[b"bd"][1]] - - expected = [ - TreeEntry(b"", 0o040000, tree_id), - TreeEntry(b"a", 0o100644, blob_a.id), - TreeEntry(b"ad", 0o040000, tree_ad.id), - TreeEntry(b"ad/b", 0o100644, blob_b.id), - TreeEntry(b"ad/bd", 0o040000, tree_bd.id), - TreeEntry(b"ad/bd/c", 0o100755, blob_c.id), - ] - actual = iter_tree_contents(self.store, tree_id, include_trees=True) - self.assertEqual(expected, list(actual)) - - def make_tag(self, name, obj): - tag = make_tag(obj, name=name) - self.store.add_object(tag) - return tag - - def test_peel_sha(self): - self.store.add_object(testobject) - tag1 = self.make_tag(b"1", testobject) - tag2 = self.make_tag(b"2", testobject) - tag3 = self.make_tag(b"3", testobject) - for obj in [testobject, tag1, tag2, tag3]: - self.assertEqual((obj, testobject), peel_sha(self.store, obj.id)) - - def test_get_raw(self): - self.store.add_object(testobject) - self.assertEqual( - (Blob.type_num, b"yummy data"), self.store.get_raw(testobject.id) - ) - - def test_close(self): - # For now, just check that close doesn't barf. - self.store.add_object(testobject) - self.store.close() - - class OverlayObjectStoreTests(ObjectStoreTests, TestCase): def setUp(self): TestCase.setUp(self) @@ -316,65 +114,6 @@ class MemoryObjectStoreTests(ObjectStoreTests, TestCas o.add_thin_pack(f.read, None) -class PackBasedObjectStoreTests(ObjectStoreTests): - def tearDown(self): - for pack in self.store.packs: - pack.close() - - def test_empty_packs(self): - self.assertEqual([], list(self.store.packs)) - - def test_pack_loose_objects(self): - b1 = make_object(Blob, data=b"yummy data") - self.store.add_object(b1) - b2 = make_object(Blob, data=b"more yummy data") - self.store.add_object(b2) - b3 = make_object(Blob, data=b"even more yummy data") - b4 = make_object(Blob, data=b"and more yummy data") - self.store.add_objects([(b3, None), (b4, None)]) - self.assertEqual({b1.id, b2.id, b3.id, b4.id}, set(self.store)) - self.assertEqual(1, len(self.store.packs)) - self.assertEqual(2, self.store.pack_loose_objects()) - self.assertNotEqual([], list(self.store.packs)) - self.assertEqual(0, self.store.pack_loose_objects()) - - def test_repack(self): - b1 = make_object(Blob, data=b"yummy data") - self.store.add_object(b1) - b2 = make_object(Blob, data=b"more yummy data") - self.store.add_object(b2) - b3 = make_object(Blob, data=b"even more yummy data") - b4 = make_object(Blob, data=b"and more yummy data") - self.store.add_objects([(b3, None), (b4, None)]) - b5 = make_object(Blob, data=b"and more data") - b6 = make_object(Blob, data=b"and some more data") - self.store.add_objects([(b5, None), (b6, None)]) - self.assertEqual({b1.id, b2.id, b3.id, b4.id, b5.id, b6.id}, set(self.store)) - self.assertEqual(2, len(self.store.packs)) - self.assertEqual(6, self.store.repack()) - self.assertEqual(1, len(self.store.packs)) - self.assertEqual(0, self.store.pack_loose_objects()) - - def test_repack_existing(self): - b1 = make_object(Blob, data=b"yummy data") - self.store.add_object(b1) - b2 = make_object(Blob, data=b"more yummy data") - self.store.add_object(b2) - self.store.add_objects([(b1, None), (b2, None)]) - self.store.add_objects([(b2, None)]) - self.assertEqual({b1.id, b2.id}, set(self.store)) - self.assertEqual(2, len(self.store.packs)) - self.assertEqual(2, self.store.repack()) - self.assertEqual(1, len(self.store.packs)) - self.assertEqual(0, self.store.pack_loose_objects()) - - self.assertEqual({b1.id, b2.id}, set(self.store)) - self.assertEqual(1, len(self.store.packs)) - self.assertEqual(2, self.store.repack()) - self.assertEqual(1, len(self.store.packs)) - self.assertEqual(0, self.store.pack_loose_objects()) - - class DiskObjectStoreTests(PackBasedObjectStoreTests, TestCase): def setUp(self): TestCase.setUp(self) blob - 9c4874bf5f054d0c4232b6c25748f973daed45c6 blob + 0d907f1e1b9529f39144093d2d8e354aae42278e --- tests/test_objects.py +++ tests/test_objects.py @@ -52,9 +52,14 @@ from dulwich.objects import ( sha_to_hex, sorted_tree_items, ) +from dulwich.tests.utils import ( + ext_functest_builder, + functest_builder, + make_commit, + make_object, +) from . import TestCase -from .utils import ext_functest_builder, functest_builder, make_commit, make_object a_sha = b"6f670c0fb53f9463760b7295fbb814e965fb20c8" b_sha = b"2969be3e8ee1c0222396a5611407e4769f14e54b" blob - 185c1fe9b84465db4ab3418148e562c98e85716c blob + d21e3eee40eeb55d544e83d8ae55c8775eb10d33 --- tests/test_objectspec.py +++ tests/test_objectspec.py @@ -34,9 +34,9 @@ from dulwich.objectspec import ( parse_tree, ) from dulwich.repo import MemoryRepo +from dulwich.tests.utils import build_commit_graph from . import TestCase -from .utils import build_commit_graph class ParseObjectTests(TestCase): blob - c796657042e5c56de556eed051de27fafc1be7e2 blob + 609ce6db00e1b3bf81dbc7d10139a803753e48a6 --- tests/test_pack.py +++ tests/test_pack.py @@ -59,9 +59,9 @@ from dulwich.pack import ( write_pack_index_v2, write_pack_object, ) +from dulwich.tests.utils import build_pack, make_object from . import TestCase -from .utils import build_pack, make_object pack1_sha = b"bc63ddad95e7321ee734ea11a7a62d314e0d7481" blob - 7a1c26f38ecff24e70b2549428e85f2e02d464da blob + 95220a6c4b673d084ac5e66bc81f53b3820dfbf6 --- tests/test_porcelain.py +++ tests/test_porcelain.py @@ -42,10 +42,10 @@ from dulwich.objects import ZERO_SHA, Blob, Tag, Tree from dulwich.porcelain import CheckoutError from dulwich.repo import NoIndexPresent, Repo from dulwich.server import DictBackend +from dulwich.tests.utils import build_commit_graph, make_commit, make_object from dulwich.web import make_server, make_wsgi_chain from . import TestCase -from .utils import build_commit_graph, make_commit, make_object try: import gpg blob - 31e3e37254f6b8267c0129b2abd21c4b81ddec7e blob + 572be9d8043de99402ee1aca3561a3c9dd412a1c --- tests/test_refs.py +++ tests/test_refs.py @@ -42,9 +42,9 @@ from dulwich.refs import ( write_packed_refs, ) from dulwich.repo import Repo +from dulwich.tests.utils import open_repo, tear_down_repo from . import SkipTest, TestCase -from .utils import open_repo, tear_down_repo class CheckRefFormatTests(TestCase): blob - 80605d1b70ff8bbce62d274533238b05e3b62a82 blob + a063c45760ce8294805392e458913e61f34800cf --- tests/test_repository.py +++ tests/test_repository.py @@ -41,9 +41,9 @@ from dulwich.repo import ( UnsupportedVersion, check_user_identity, ) +from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo from . import TestCase, skipIf -from .utils import open_repo, setup_warning_catcher, tear_down_repo missing_sha = b"b91fa4d900e17e99b433218e988c4eb4a3e9a097" blob - 2fe005ccd8948db3b6714924a37e34384bbee91e blob + 7ecb7122f7ff8076cf7cb9a42f9f6fa888e2b4fe --- tests/test_server.py +++ tests/test_server.py @@ -53,9 +53,9 @@ from dulwich.server import ( serve_command, update_server_info, ) +from dulwich.tests.utils import make_commit, make_tag from . import TestCase -from .utils import make_commit, make_tag ONE = b"1" * 40 TWO = b"2" * 40 blob - e1814e94f13685e284387f640e55734c82e3c2c0 blob + c22667fb7967f4d22b4aad12faf6943fc6ff8cf3 --- tests/test_utils.py +++ tests/test_utils.py @@ -22,9 +22,9 @@ from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob +from dulwich.tests.utils import build_commit_graph, make_object from . import TestCase -from .utils import build_commit_graph, make_object class BuildCommitGraphTest(TestCase): blob - 79650c423517555b700212e11291946743e18e13 blob + f803b668fe45b13662e08c054559695caf701dc4 --- tests/test_walk.py +++ tests/test_walk.py @@ -27,10 +27,10 @@ from dulwich.diff_tree import CHANGE_MODIFY, CHANGE_RE from dulwich.errors import MissingCommitError from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob, Commit +from dulwich.tests.utils import F, build_commit_graph, make_object, make_tag from dulwich.walk import ORDER_TOPO, WalkEntry, Walker, _topo_reorder from . import TestCase -from .utils import F, build_commit_graph, make_object, make_tag class TestWalkEntry: blob - d3ea62d6030b4d807c5d1c44f677f0c58c309c12 blob + 968511b3c5f6aece5807b9d9a580e482b03668d5 --- tests/test_web.py +++ tests/test_web.py @@ -30,6 +30,7 @@ from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob from dulwich.repo import BaseRepo, MemoryRepo from dulwich.server import DictBackend +from dulwich.tests.utils import make_object, make_tag from dulwich.web import ( HTTP_ERROR, HTTP_FORBIDDEN, @@ -50,7 +51,6 @@ from dulwich.web import ( ) from . import TestCase -from .utils import make_object, make_tag class MinimalistWSGIInputStream: blob - 13fe5a4542c3d02b3dcf7fd70194a01a39c6f7fd (mode 644) blob + /dev/null --- tests/utils.py +++ /dev/null @@ -1,366 +0,0 @@ -# utils.py -- Test utilities for Dulwich. -# Copyright (C) 2010 Google, Inc. -# -# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU -# General Public License as public by the Free Software Foundation; version 2.0 -# or (at your option) any later version. You can redistribute it and/or -# modify it under the terms of either of these two licenses. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# You should have received a copy of the licenses; if not, see -# for a copy of the GNU General Public License -# and for a copy of the Apache -# License, Version 2.0. -# - -"""Utility functions common to Dulwich tests.""" - -import datetime -import os -import shutil -import tempfile -import time -import types -import warnings - -from dulwich.index import commit_tree -from dulwich.objects import Commit, FixedSha, Tag, object_class -from dulwich.pack import ( - DELTA_TYPES, - OFS_DELTA, - REF_DELTA, - SHA1Writer, - create_delta, - obj_sha, - write_pack_header, - write_pack_object, -) -from dulwich.repo import Repo - -from . import SkipTest - -# Plain files are very frequently used in tests, so let the mode be very short. -F = 0o100644 # Shorthand mode for Files. - - -def open_repo(name, temp_dir=None): - """Open a copy of a repo in a temporary directory. - - Use this function for accessing repos in dulwich/tests/data/repos to avoid - accidentally or intentionally modifying those repos in place. Use - tear_down_repo to delete any temp files created. - - Args: - name: The name of the repository, relative to - dulwich/tests/data/repos - temp_dir: temporary directory to initialize to. If not provided, a - temporary directory will be created. - Returns: An initialized Repo object that lives in a temporary directory. - """ - if temp_dir is None: - temp_dir = tempfile.mkdtemp() - repo_dir = os.path.join( - os.path.dirname(__file__), "..", "testdata", "repos", name - ) - temp_repo_dir = os.path.join(temp_dir, name) - shutil.copytree(repo_dir, temp_repo_dir, symlinks=True) - return Repo(temp_repo_dir) - - -def tear_down_repo(repo): - """Tear down a test repository.""" - repo.close() - temp_dir = os.path.dirname(repo.path.rstrip(os.sep)) - shutil.rmtree(temp_dir) - - -def make_object(cls, **attrs): - """Make an object for testing and assign some members. - - This method creates a new subclass to allow arbitrary attribute - reassignment, which is not otherwise possible with objects having - __slots__. - - Args: - attrs: dict of attributes to set on the new object. - Returns: A newly initialized object of type cls. - """ - - class TestObject(cls): - """Class that inherits from the given class, but without __slots__. - - Note that classes with __slots__ can't have arbitrary attributes - monkey-patched in, so this is a class that is exactly the same only - with a __dict__ instead of __slots__. - """ - - TestObject.__name__ = "TestObject_" + cls.__name__ - - obj = TestObject() - for name, value in attrs.items(): - if name == "id": - # id property is read-only, so we overwrite sha instead. - sha = FixedSha(value) - obj.sha = lambda: sha - else: - setattr(obj, name, value) - return obj - - -def make_commit(**attrs): - """Make a Commit object with a default set of members. - - Args: - attrs: dict of attributes to overwrite from the default values. - Returns: A newly initialized Commit object. - """ - default_time = 1262304000 # 2010-01-01 00:00:00 - all_attrs = { - "author": b"Test Author ", - "author_time": default_time, - "author_timezone": 0, - "committer": b"Test Committer ", - "commit_time": default_time, - "commit_timezone": 0, - "message": b"Test message.", - "parents": [], - "tree": b"0" * 40, - } - all_attrs.update(attrs) - return make_object(Commit, **all_attrs) - - -def make_tag(target, **attrs): - """Make a Tag object with a default set of values. - - Args: - target: object to be tagged (Commit, Blob, Tree, etc) - attrs: dict of attributes to overwrite from the default values. - Returns: A newly initialized Tag object. - """ - target_id = target.id - target_type = object_class(target.type_name) - default_time = int(time.mktime(datetime.datetime(2010, 1, 1).timetuple())) - all_attrs = { - "tagger": b"Test Author ", - "tag_time": default_time, - "tag_timezone": 0, - "message": b"Test message.", - "object": (target_type, target_id), - "name": b"Test Tag", - } - all_attrs.update(attrs) - return make_object(Tag, **all_attrs) - - -def functest_builder(method, func): - """Generate a test method that tests the given function.""" - - def do_test(self): - method(self, func) - - return do_test - - -def ext_functest_builder(method, func): - """Generate a test method that tests the given extension function. - - This is intended to generate test methods that test both a pure-Python - version and an extension version using common test code. The extension test - will raise SkipTest if the extension is not found. - - Sample usage: - - class MyTest(TestCase); - def _do_some_test(self, func_impl): - self.assertEqual('foo', func_impl()) - - test_foo = functest_builder(_do_some_test, foo_py) - test_foo_extension = ext_functest_builder(_do_some_test, _foo_c) - - Args: - method: The method to run. It must must two parameters, self and the - function implementation to test. - func: The function implementation to pass to method. - """ - - def do_test(self): - if not isinstance(func, types.BuiltinFunctionType): - raise SkipTest("%s extension not found" % func) - method(self, func) - - return do_test - - -def build_pack(f, objects_spec, store=None): - """Write test pack data from a concise spec. - - Args: - f: A file-like object to write the pack to. - objects_spec: A list of (type_num, obj). For non-delta types, obj - is the string of that object's data. - For delta types, obj is a tuple of (base, data), where: - - * base can be either an index in objects_spec of the base for that - * delta; or for a ref delta, a SHA, in which case the resulting pack - * will be thin and the base will be an external ref. - * data is a string of the full, non-deltified data for that object. - - Note that offsets/refs and deltas are computed within this function. - store: An optional ObjectStore for looking up external refs. - Returns: A list of tuples in the order specified by objects_spec: - (offset, type num, data, sha, CRC32) - """ - sf = SHA1Writer(f) - num_objects = len(objects_spec) - write_pack_header(sf.write, num_objects) - - full_objects = {} - offsets = {} - crc32s = {} - - while len(full_objects) < num_objects: - for i, (type_num, data) in enumerate(objects_spec): - if type_num not in DELTA_TYPES: - full_objects[i] = (type_num, data, obj_sha(type_num, [data])) - continue - base, data = data - if isinstance(base, int): - if base not in full_objects: - continue - base_type_num, _, _ = full_objects[base] - else: - base_type_num, _ = store.get_raw(base) - full_objects[i] = ( - base_type_num, - data, - obj_sha(base_type_num, [data]), - ) - - for i, (type_num, obj) in enumerate(objects_spec): - offset = f.tell() - if type_num == OFS_DELTA: - base_index, data = obj - base = offset - offsets[base_index] - _, base_data, _ = full_objects[base_index] - obj = (base, list(create_delta(base_data, data))) - elif type_num == REF_DELTA: - base_ref, data = obj - if isinstance(base_ref, int): - _, base_data, base = full_objects[base_ref] - else: - base_type_num, base_data = store.get_raw(base_ref) - base = obj_sha(base_type_num, base_data) - obj = (base, list(create_delta(base_data, data))) - - crc32 = write_pack_object(sf.write, type_num, obj) - offsets[i] = offset - crc32s[i] = crc32 - - expected = [] - for i in range(num_objects): - type_num, data, sha = full_objects[i] - assert len(sha) == 20 - expected.append((offsets[i], type_num, data, sha, crc32s[i])) - - sf.write_sha() - f.seek(0) - return expected - - -def build_commit_graph(object_store, commit_spec, trees=None, attrs=None): - """Build a commit graph from a concise specification. - - Sample usage: - >>> c1, c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]]) - >>> store[store[c3].parents[0]] == c1 - True - >>> store[store[c3].parents[1]] == c2 - True - - If not otherwise specified, commits will refer to the empty tree and have - commit times increasing in the same order as the commit spec. - - Args: - object_store: An ObjectStore to commit objects to. - commit_spec: An iterable of iterables of ints defining the commit - graph. Each entry defines one commit, and entries must be in - topological order. The first element of each entry is a commit number, - and the remaining elements are its parents. The commit numbers are only - meaningful for the call to make_commits; since real commit objects are - created, they will get created with real, opaque SHAs. - trees: An optional dict of commit number -> tree spec for building - trees for commits. The tree spec is an iterable of (path, blob, mode) - or (path, blob) entries; if mode is omitted, it defaults to the normal - file mode (0100644). - attrs: A dict of commit number -> (dict of attribute -> value) for - assigning additional values to the commits. - Returns: The list of commit objects created. - - Raises: - ValueError: If an undefined commit identifier is listed as a parent. - """ - if trees is None: - trees = {} - if attrs is None: - attrs = {} - commit_time = 0 - nums = {} - commits = [] - - for commit in commit_spec: - commit_num = commit[0] - try: - parent_ids = [nums[pn] for pn in commit[1:]] - except KeyError as exc: - (missing_parent,) = exc.args - raise ValueError("Unknown parent %i" % missing_parent) from exc - - blobs = [] - for entry in trees.get(commit_num, []): - if len(entry) == 2: - path, blob = entry - entry = (path, blob, F) - path, blob, mode = entry - blobs.append((path, blob.id, mode)) - object_store.add_object(blob) - tree_id = commit_tree(object_store, blobs) - - commit_attrs = { - "message": ("Commit %i" % commit_num).encode("ascii"), - "parents": parent_ids, - "tree": tree_id, - "commit_time": commit_time, - } - commit_attrs.update(attrs.get(commit_num, {})) - commit_obj = make_commit(**commit_attrs) - - # By default, increment the time by a lot. Out-of-order commits should - # be closer together than this because their main cause is clock skew. - commit_time = commit_attrs["commit_time"] + 100 - nums[commit_num] = commit_obj.id - object_store.add_object(commit_obj) - commits.append(commit_obj) - - return commits - - -def setup_warning_catcher(): - """Wrap warnings.showwarning with code that records warnings.""" - caught_warnings = [] - original_showwarning = warnings.showwarning - - def custom_showwarning(*args, **kwargs): - caught_warnings.append(args[0]) - - warnings.showwarning = custom_showwarning - - def restore_showwarning(): - warnings.showwarning = original_showwarning - - return caught_warnings, restore_showwarning