commit - 245331a60d743b7b73ba3a8b15e6f4648273369f
commit + cc93029c25d975ee7e3fd1460948efeef631e1f1
blob - /dev/null
blob + 99da26a53a2f182153745c44decc728309e16bda (mode 644)
--- /dev/null
+++ dulwich/tests/__init__.py
+# __init__.py -- The tests for dulwich
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for Dulwich."""
blob - /dev/null
blob + a33bf67871e5d4e129fced36f06c4b16035ff2cc (mode 644)
--- /dev/null
+++ dulwich/tests/test_object_store.py
+# test_object_store.py -- tests for object_store.py
+# Copyright (C) 2008 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for the object store interface."""
+
+from unittest import skipUnless
+
+from dulwich.index import commit_tree
+from dulwich.object_store import (
+ iter_tree_contents,
+ peel_sha,
+)
+from dulwich.objects import (
+ Blob,
+ TreeEntry,
+)
+from dulwich.protocol import DEPTH_INFINITE
+
+from .utils import make_object, make_tag
+
+try:
+ from unittest.mock import patch
+except ImportError:
+ patch = None # type: ignore
+
+
+testobject = make_object(Blob, data=b"yummy data")
+
+
+class ObjectStoreTests:
+ def test_determine_wants_all(self):
+ self.assertEqual(
+ [b"1" * 40],
+ self.store.determine_wants_all({b"refs/heads/foo": b"1" * 40}),
+ )
+
+ def test_determine_wants_all_zero(self):
+ self.assertEqual(
+ [], self.store.determine_wants_all({b"refs/heads/foo": b"0" * 40})
+ )
+
+ @skipUnless(patch, "Required mock.patch")
+ def test_determine_wants_all_depth(self):
+ self.store.add_object(testobject)
+ refs = {b"refs/heads/foo": testobject.id}
+ with patch.object(self.store, "_get_depth", return_value=1) as m:
+ self.assertEqual([], self.store.determine_wants_all(refs, depth=0))
+ self.assertEqual(
+ [testobject.id],
+ self.store.determine_wants_all(refs, depth=DEPTH_INFINITE),
+ )
+ m.assert_not_called()
+
+ self.assertEqual([], self.store.determine_wants_all(refs, depth=1))
+ m.assert_called_with(testobject.id)
+ self.assertEqual(
+ [testobject.id], self.store.determine_wants_all(refs, depth=2)
+ )
+
+ def test_get_depth(self):
+ self.assertEqual(0, self.store._get_depth(testobject.id))
+
+ self.store.add_object(testobject)
+ self.assertEqual(
+ 1, self.store._get_depth(testobject.id, get_parents=lambda x: [])
+ )
+
+ parent = make_object(Blob, data=b"parent data")
+ self.store.add_object(parent)
+ self.assertEqual(
+ 2,
+ self.store._get_depth(
+ testobject.id,
+ get_parents=lambda x: [parent.id] if x == testobject else [],
+ ),
+ )
+
+ def test_iter(self):
+ self.assertEqual([], list(self.store))
+
+ def test_get_nonexistant(self):
+ self.assertRaises(KeyError, lambda: self.store[b"a" * 40])
+
+ def test_contains_nonexistant(self):
+ self.assertNotIn(b"a" * 40, self.store)
+
+ def test_add_objects_empty(self):
+ self.store.add_objects([])
+
+ def test_add_commit(self):
+ # TODO: Argh, no way to construct Git commit objects without
+ # access to a serialized form.
+ self.store.add_objects([])
+
+ def test_store_resilience(self):
+ """Test if updating an existing stored object doesn't erase the
+ object from the store.
+ """
+ test_object = make_object(Blob, data=b"data")
+
+ self.store.add_object(test_object)
+ test_object_id = test_object.id
+ test_object.data = test_object.data + b"update"
+ stored_test_object = self.store[test_object_id]
+
+ self.assertNotEqual(test_object.id, stored_test_object.id)
+ self.assertEqual(stored_test_object.id, test_object_id)
+
+ def test_add_object(self):
+ self.store.add_object(testobject)
+ self.assertEqual({testobject.id}, set(self.store))
+ self.assertIn(testobject.id, self.store)
+ r = self.store[testobject.id]
+ self.assertEqual(r, testobject)
+
+ def test_add_objects(self):
+ data = [(testobject, "mypath")]
+ self.store.add_objects(data)
+ self.assertEqual({testobject.id}, set(self.store))
+ self.assertIn(testobject.id, self.store)
+ r = self.store[testobject.id]
+ self.assertEqual(r, testobject)
+
+ def test_tree_changes(self):
+ blob_a1 = make_object(Blob, data=b"a1")
+ blob_a2 = make_object(Blob, data=b"a2")
+ blob_b = make_object(Blob, data=b"b")
+ for blob in [blob_a1, blob_a2, blob_b]:
+ self.store.add_object(blob)
+
+ blobs_1 = [(b"a", blob_a1.id, 0o100644), (b"b", blob_b.id, 0o100644)]
+ tree1_id = commit_tree(self.store, blobs_1)
+ blobs_2 = [(b"a", blob_a2.id, 0o100644), (b"b", blob_b.id, 0o100644)]
+ tree2_id = commit_tree(self.store, blobs_2)
+ change_a = (
+ (b"a", b"a"),
+ (0o100644, 0o100644),
+ (blob_a1.id, blob_a2.id),
+ )
+ self.assertEqual([change_a], list(self.store.tree_changes(tree1_id, tree2_id)))
+ self.assertEqual(
+ [
+ change_a,
+ ((b"b", b"b"), (0o100644, 0o100644), (blob_b.id, blob_b.id)),
+ ],
+ list(self.store.tree_changes(tree1_id, tree2_id, want_unchanged=True)),
+ )
+
+ def test_iter_tree_contents(self):
+ blob_a = make_object(Blob, data=b"a")
+ blob_b = make_object(Blob, data=b"b")
+ blob_c = make_object(Blob, data=b"c")
+ for blob in [blob_a, blob_b, blob_c]:
+ self.store.add_object(blob)
+
+ blobs = [
+ (b"a", blob_a.id, 0o100644),
+ (b"ad/b", blob_b.id, 0o100644),
+ (b"ad/bd/c", blob_c.id, 0o100755),
+ (b"ad/c", blob_c.id, 0o100644),
+ (b"c", blob_c.id, 0o100644),
+ ]
+ tree_id = commit_tree(self.store, blobs)
+ self.assertEqual(
+ [TreeEntry(p, m, h) for (p, h, m) in blobs],
+ list(iter_tree_contents(self.store, tree_id)),
+ )
+ self.assertEqual([], list(iter_tree_contents(self.store, None)))
+
+ def test_iter_tree_contents_include_trees(self):
+ blob_a = make_object(Blob, data=b"a")
+ blob_b = make_object(Blob, data=b"b")
+ blob_c = make_object(Blob, data=b"c")
+ for blob in [blob_a, blob_b, blob_c]:
+ self.store.add_object(blob)
+
+ blobs = [
+ (b"a", blob_a.id, 0o100644),
+ (b"ad/b", blob_b.id, 0o100644),
+ (b"ad/bd/c", blob_c.id, 0o100755),
+ ]
+ tree_id = commit_tree(self.store, blobs)
+ tree = self.store[tree_id]
+ tree_ad = self.store[tree[b"ad"][1]]
+ tree_bd = self.store[tree_ad[b"bd"][1]]
+
+ expected = [
+ TreeEntry(b"", 0o040000, tree_id),
+ TreeEntry(b"a", 0o100644, blob_a.id),
+ TreeEntry(b"ad", 0o040000, tree_ad.id),
+ TreeEntry(b"ad/b", 0o100644, blob_b.id),
+ TreeEntry(b"ad/bd", 0o040000, tree_bd.id),
+ TreeEntry(b"ad/bd/c", 0o100755, blob_c.id),
+ ]
+ actual = iter_tree_contents(self.store, tree_id, include_trees=True)
+ self.assertEqual(expected, list(actual))
+
+ def make_tag(self, name, obj):
+ tag = make_tag(obj, name=name)
+ self.store.add_object(tag)
+ return tag
+
+ def test_peel_sha(self):
+ self.store.add_object(testobject)
+ tag1 = self.make_tag(b"1", testobject)
+ tag2 = self.make_tag(b"2", testobject)
+ tag3 = self.make_tag(b"3", testobject)
+ for obj in [testobject, tag1, tag2, tag3]:
+ self.assertEqual((obj, testobject), peel_sha(self.store, obj.id))
+
+ def test_get_raw(self):
+ self.store.add_object(testobject)
+ self.assertEqual(
+ (Blob.type_num, b"yummy data"), self.store.get_raw(testobject.id)
+ )
+
+ def test_close(self):
+ # For now, just check that close doesn't barf.
+ self.store.add_object(testobject)
+ self.store.close()
+
+
+class PackBasedObjectStoreTests(ObjectStoreTests):
+ def tearDown(self):
+ for pack in self.store.packs:
+ pack.close()
+
+ def test_empty_packs(self):
+ self.assertEqual([], list(self.store.packs))
+
+ def test_pack_loose_objects(self):
+ b1 = make_object(Blob, data=b"yummy data")
+ self.store.add_object(b1)
+ b2 = make_object(Blob, data=b"more yummy data")
+ self.store.add_object(b2)
+ b3 = make_object(Blob, data=b"even more yummy data")
+ b4 = make_object(Blob, data=b"and more yummy data")
+ self.store.add_objects([(b3, None), (b4, None)])
+ self.assertEqual({b1.id, b2.id, b3.id, b4.id}, set(self.store))
+ self.assertEqual(1, len(self.store.packs))
+ self.assertEqual(2, self.store.pack_loose_objects())
+ self.assertNotEqual([], list(self.store.packs))
+ self.assertEqual(0, self.store.pack_loose_objects())
+
+ def test_repack(self):
+ b1 = make_object(Blob, data=b"yummy data")
+ self.store.add_object(b1)
+ b2 = make_object(Blob, data=b"more yummy data")
+ self.store.add_object(b2)
+ b3 = make_object(Blob, data=b"even more yummy data")
+ b4 = make_object(Blob, data=b"and more yummy data")
+ self.store.add_objects([(b3, None), (b4, None)])
+ b5 = make_object(Blob, data=b"and more data")
+ b6 = make_object(Blob, data=b"and some more data")
+ self.store.add_objects([(b5, None), (b6, None)])
+ self.assertEqual({b1.id, b2.id, b3.id, b4.id, b5.id, b6.id}, set(self.store))
+ self.assertEqual(2, len(self.store.packs))
+ self.assertEqual(6, self.store.repack())
+ self.assertEqual(1, len(self.store.packs))
+ self.assertEqual(0, self.store.pack_loose_objects())
+
+ def test_repack_existing(self):
+ b1 = make_object(Blob, data=b"yummy data")
+ self.store.add_object(b1)
+ b2 = make_object(Blob, data=b"more yummy data")
+ self.store.add_object(b2)
+ self.store.add_objects([(b1, None), (b2, None)])
+ self.store.add_objects([(b2, None)])
+ self.assertEqual({b1.id, b2.id}, set(self.store))
+ self.assertEqual(2, len(self.store.packs))
+ self.assertEqual(2, self.store.repack())
+ self.assertEqual(1, len(self.store.packs))
+ self.assertEqual(0, self.store.pack_loose_objects())
+
+ self.assertEqual({b1.id, b2.id}, set(self.store))
+ self.assertEqual(1, len(self.store.packs))
+ self.assertEqual(2, self.store.repack())
+ self.assertEqual(1, len(self.store.packs))
+ self.assertEqual(0, self.store.pack_loose_objects())
+
+
+
blob - /dev/null
blob + a616de5187b46416952df73385617109d18b3ff4 (mode 644)
--- /dev/null
+++ dulwich/tests/utils.py
+# utils.py -- Test utilities for Dulwich.
+# Copyright (C) 2010 Google, Inc.
+#
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Utility functions common to Dulwich tests."""
+
+import datetime
+import os
+import shutil
+import tempfile
+import time
+import types
+import warnings
+from unittest import SkipTest
+
+from dulwich.index import commit_tree
+from dulwich.objects import Commit, FixedSha, Tag, object_class
+from dulwich.pack import (
+ DELTA_TYPES,
+ OFS_DELTA,
+ REF_DELTA,
+ SHA1Writer,
+ create_delta,
+ obj_sha,
+ write_pack_header,
+ write_pack_object,
+)
+from dulwich.repo import Repo
+
+# Plain files are very frequently used in tests, so let the mode be very short.
+F = 0o100644 # Shorthand mode for Files.
+
+
+def open_repo(name, temp_dir=None):
+ """Open a copy of a repo in a temporary directory.
+
+ Use this function for accessing repos in dulwich/tests/data/repos to avoid
+ accidentally or intentionally modifying those repos in place. Use
+ tear_down_repo to delete any temp files created.
+
+ Args:
+ name: The name of the repository, relative to
+ dulwich/tests/data/repos
+ temp_dir: temporary directory to initialize to. If not provided, a
+ temporary directory will be created.
+ Returns: An initialized Repo object that lives in a temporary directory.
+ """
+ if temp_dir is None:
+ temp_dir = tempfile.mkdtemp()
+ repo_dir = os.path.join(
+ os.path.dirname(__file__), "..", "..", "testdata", "repos", name
+ )
+ temp_repo_dir = os.path.join(temp_dir, name)
+ shutil.copytree(repo_dir, temp_repo_dir, symlinks=True)
+ return Repo(temp_repo_dir)
+
+
+def tear_down_repo(repo):
+ """Tear down a test repository."""
+ repo.close()
+ temp_dir = os.path.dirname(repo.path.rstrip(os.sep))
+ shutil.rmtree(temp_dir)
+
+
+def make_object(cls, **attrs):
+ """Make an object for testing and assign some members.
+
+ This method creates a new subclass to allow arbitrary attribute
+ reassignment, which is not otherwise possible with objects having
+ __slots__.
+
+ Args:
+ attrs: dict of attributes to set on the new object.
+ Returns: A newly initialized object of type cls.
+ """
+
+ class TestObject(cls):
+ """Class that inherits from the given class, but without __slots__.
+
+ Note that classes with __slots__ can't have arbitrary attributes
+ monkey-patched in, so this is a class that is exactly the same only
+ with a __dict__ instead of __slots__.
+ """
+
+ TestObject.__name__ = "TestObject_" + cls.__name__
+
+ obj = TestObject()
+ for name, value in attrs.items():
+ if name == "id":
+ # id property is read-only, so we overwrite sha instead.
+ sha = FixedSha(value)
+ obj.sha = lambda: sha
+ else:
+ setattr(obj, name, value)
+ return obj
+
+
+def make_commit(**attrs):
+ """Make a Commit object with a default set of members.
+
+ Args:
+ attrs: dict of attributes to overwrite from the default values.
+ Returns: A newly initialized Commit object.
+ """
+ default_time = 1262304000 # 2010-01-01 00:00:00
+ all_attrs = {
+ "author": b"Test Author <test@nodomain.com>",
+ "author_time": default_time,
+ "author_timezone": 0,
+ "committer": b"Test Committer <test@nodomain.com>",
+ "commit_time": default_time,
+ "commit_timezone": 0,
+ "message": b"Test message.",
+ "parents": [],
+ "tree": b"0" * 40,
+ }
+ all_attrs.update(attrs)
+ return make_object(Commit, **all_attrs)
+
+
+def make_tag(target, **attrs):
+ """Make a Tag object with a default set of values.
+
+ Args:
+ target: object to be tagged (Commit, Blob, Tree, etc)
+ attrs: dict of attributes to overwrite from the default values.
+ Returns: A newly initialized Tag object.
+ """
+ target_id = target.id
+ target_type = object_class(target.type_name)
+ default_time = int(time.mktime(datetime.datetime(2010, 1, 1).timetuple()))
+ all_attrs = {
+ "tagger": b"Test Author <test@nodomain.com>",
+ "tag_time": default_time,
+ "tag_timezone": 0,
+ "message": b"Test message.",
+ "object": (target_type, target_id),
+ "name": b"Test Tag",
+ }
+ all_attrs.update(attrs)
+ return make_object(Tag, **all_attrs)
+
+
+def functest_builder(method, func):
+ """Generate a test method that tests the given function."""
+
+ def do_test(self):
+ method(self, func)
+
+ return do_test
+
+
+def ext_functest_builder(method, func):
+ """Generate a test method that tests the given extension function.
+
+ This is intended to generate test methods that test both a pure-Python
+ version and an extension version using common test code. The extension test
+ will raise SkipTest if the extension is not found.
+
+ Sample usage:
+
+ class MyTest(TestCase);
+ def _do_some_test(self, func_impl):
+ self.assertEqual('foo', func_impl())
+
+ test_foo = functest_builder(_do_some_test, foo_py)
+ test_foo_extension = ext_functest_builder(_do_some_test, _foo_c)
+
+ Args:
+ method: The method to run. It must must two parameters, self and the
+ function implementation to test.
+ func: The function implementation to pass to method.
+ """
+
+ def do_test(self):
+ if not isinstance(func, types.BuiltinFunctionType):
+ raise SkipTest("%s extension not found" % func)
+ method(self, func)
+
+ return do_test
+
+
+def build_pack(f, objects_spec, store=None):
+ """Write test pack data from a concise spec.
+
+ Args:
+ f: A file-like object to write the pack to.
+ objects_spec: A list of (type_num, obj). For non-delta types, obj
+ is the string of that object's data.
+ For delta types, obj is a tuple of (base, data), where:
+
+ * base can be either an index in objects_spec of the base for that
+ * delta; or for a ref delta, a SHA, in which case the resulting pack
+ * will be thin and the base will be an external ref.
+ * data is a string of the full, non-deltified data for that object.
+
+ Note that offsets/refs and deltas are computed within this function.
+ store: An optional ObjectStore for looking up external refs.
+ Returns: A list of tuples in the order specified by objects_spec:
+ (offset, type num, data, sha, CRC32)
+ """
+ sf = SHA1Writer(f)
+ num_objects = len(objects_spec)
+ write_pack_header(sf.write, num_objects)
+
+ full_objects = {}
+ offsets = {}
+ crc32s = {}
+
+ while len(full_objects) < num_objects:
+ for i, (type_num, data) in enumerate(objects_spec):
+ if type_num not in DELTA_TYPES:
+ full_objects[i] = (type_num, data, obj_sha(type_num, [data]))
+ continue
+ base, data = data
+ if isinstance(base, int):
+ if base not in full_objects:
+ continue
+ base_type_num, _, _ = full_objects[base]
+ else:
+ base_type_num, _ = store.get_raw(base)
+ full_objects[i] = (
+ base_type_num,
+ data,
+ obj_sha(base_type_num, [data]),
+ )
+
+ for i, (type_num, obj) in enumerate(objects_spec):
+ offset = f.tell()
+ if type_num == OFS_DELTA:
+ base_index, data = obj
+ base = offset - offsets[base_index]
+ _, base_data, _ = full_objects[base_index]
+ obj = (base, list(create_delta(base_data, data)))
+ elif type_num == REF_DELTA:
+ base_ref, data = obj
+ if isinstance(base_ref, int):
+ _, base_data, base = full_objects[base_ref]
+ else:
+ base_type_num, base_data = store.get_raw(base_ref)
+ base = obj_sha(base_type_num, base_data)
+ obj = (base, list(create_delta(base_data, data)))
+
+ crc32 = write_pack_object(sf.write, type_num, obj)
+ offsets[i] = offset
+ crc32s[i] = crc32
+
+ expected = []
+ for i in range(num_objects):
+ type_num, data, sha = full_objects[i]
+ assert len(sha) == 20
+ expected.append((offsets[i], type_num, data, sha, crc32s[i]))
+
+ sf.write_sha()
+ f.seek(0)
+ return expected
+
+
+def build_commit_graph(object_store, commit_spec, trees=None, attrs=None):
+ """Build a commit graph from a concise specification.
+
+ Sample usage:
+ >>> c1, c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]])
+ >>> store[store[c3].parents[0]] == c1
+ True
+ >>> store[store[c3].parents[1]] == c2
+ True
+
+ If not otherwise specified, commits will refer to the empty tree and have
+ commit times increasing in the same order as the commit spec.
+
+ Args:
+ object_store: An ObjectStore to commit objects to.
+ commit_spec: An iterable of iterables of ints defining the commit
+ graph. Each entry defines one commit, and entries must be in
+ topological order. The first element of each entry is a commit number,
+ and the remaining elements are its parents. The commit numbers are only
+ meaningful for the call to make_commits; since real commit objects are
+ created, they will get created with real, opaque SHAs.
+ trees: An optional dict of commit number -> tree spec for building
+ trees for commits. The tree spec is an iterable of (path, blob, mode)
+ or (path, blob) entries; if mode is omitted, it defaults to the normal
+ file mode (0100644).
+ attrs: A dict of commit number -> (dict of attribute -> value) for
+ assigning additional values to the commits.
+ Returns: The list of commit objects created.
+
+ Raises:
+ ValueError: If an undefined commit identifier is listed as a parent.
+ """
+ if trees is None:
+ trees = {}
+ if attrs is None:
+ attrs = {}
+ commit_time = 0
+ nums = {}
+ commits = []
+
+ for commit in commit_spec:
+ commit_num = commit[0]
+ try:
+ parent_ids = [nums[pn] for pn in commit[1:]]
+ except KeyError as exc:
+ (missing_parent,) = exc.args
+ raise ValueError("Unknown parent %i" % missing_parent) from exc
+
+ blobs = []
+ for entry in trees.get(commit_num, []):
+ if len(entry) == 2:
+ path, blob = entry
+ entry = (path, blob, F)
+ path, blob, mode = entry
+ blobs.append((path, blob.id, mode))
+ object_store.add_object(blob)
+ tree_id = commit_tree(object_store, blobs)
+
+ commit_attrs = {
+ "message": ("Commit %i" % commit_num).encode("ascii"),
+ "parents": parent_ids,
+ "tree": tree_id,
+ "commit_time": commit_time,
+ }
+ commit_attrs.update(attrs.get(commit_num, {}))
+ commit_obj = make_commit(**commit_attrs)
+
+ # By default, increment the time by a lot. Out-of-order commits should
+ # be closer together than this because their main cause is clock skew.
+ commit_time = commit_attrs["commit_time"] + 100
+ nums[commit_num] = commit_obj.id
+ object_store.add_object(commit_obj)
+ commits.append(commit_obj)
+
+ return commits
+
+
+def setup_warning_catcher():
+ """Wrap warnings.showwarning with code that records warnings."""
+ caught_warnings = []
+ original_showwarning = warnings.showwarning
+
+ def custom_showwarning(*args, **kwargs):
+ caught_warnings.append(args[0])
+
+ warnings.showwarning = custom_showwarning
+
+ def restore_showwarning():
+ warnings.showwarning = original_showwarning
+
+ return caught_warnings, restore_showwarning
blob - cd06126c084b381fb590de441314ee90ec4626ba
blob + ec82a88b2bfaac6721b1fd429dff08a23730be2a
--- tests/compat/server_utils.py
+++ tests/compat/server_utils.py
from dulwich.protocol import CAPABILITY_SIDE_BAND_64K
from dulwich.repo import Repo
from dulwich.server import ReceivePackHandler
+from dulwich.tests.utils import tear_down_repo
-from ..utils import tear_down_repo
from .utils import require_git_version, run_git_or_fail
blob - 5f81e137166827f3e0c0019e93c17d4972c548c4
blob + aed8dfca66ee6e3239f690d12d29f788c264eb02
--- tests/compat/test_porcelain.py
+++ tests/compat/test_porcelain.py
from unittest import skipIf
from dulwich import porcelain
+from dulwich.tests.utils import build_commit_graph
from ..test_porcelain import PorcelainGpgTestCase
-from ..utils import build_commit_graph
from .utils import CompatTestCase, run_git_or_fail
blob - 4f6622b40b6cf24486cb3142832ad6d37ab5c914
blob + 38439e030e1c9b30990663e800a6d952250f2aed
--- tests/contrib/test_release_robot.py
+++ tests/contrib/test_release_robot.py
from dulwich.contrib import release_robot
from dulwich.repo import Repo
+from dulwich.tests.utils import make_commit, make_tag
-from ..utils import make_commit, make_tag
-
BASEDIR = os.path.abspath(os.path.dirname(__file__)) # this directory
blob - a0a0508865cbb93ca48f9cceafe74f23b4e45921
blob + 98988cdd57bc992f8c59b13dae1befa61544a444
--- tests/test_archive.py
+++ tests/test_archive.py
from dulwich.archive import tar_stream
from dulwich.object_store import MemoryObjectStore
from dulwich.objects import Blob, Tree
+from dulwich.tests.utils import build_commit_graph
from . import TestCase
-from .utils import build_commit_graph
try:
from unittest.mock import patch
blob - 3ada349921cf98083d73749174ad67ec17466072
blob + 6f4820c6bf6c2729d4862687ec0b1a0d0a20493d
--- tests/test_client.py
+++ tests/test_client.py
from dulwich.pack import pack_objects_to_data, write_pack_data, write_pack_objects
from dulwich.protocol import TCP_GIT_PORT, Protocol
from dulwich.repo import MemoryRepo, Repo
+from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
from . import TestCase, skipIf
-from .utils import open_repo, setup_warning_catcher, tear_down_repo
class DummyClient(TraditionalGitClient):
blob - dd985227bcf8c1e1c739ed63f615bc842478592b
blob + 5fb0afd8fda7c4ae99ee06ff3f2fdca383fc66f3
--- tests/test_config.py
+++ tests/test_config.py
@skipIf(sys.platform != "win32", "Windows specific config location.")
def test_windows_config_from_path(self):
- from ..config import get_win_system_paths
+ from dulwich.config import get_win_system_paths
install_dir = os.path.join("C:", "foo", "Git")
self.overrideEnv("PATH", os.path.join(install_dir, "cmd"))
def test_windows_config_from_reg(self):
import winreg
- from ..config import get_win_system_paths
+ from dulwich.config import get_win_system_paths
self.overrideEnv("PATH", None)
install_dir = os.path.join("C:", "foo", "Git")
blob - 04c83d053ffecff9cf77799285e182bcfecd06bb
blob + 09823abd4a7a1dd3dc870a4490edb38609ef6729
--- tests/test_diff_tree.py
+++ tests/test_diff_tree.py
from dulwich.index import commit_tree
from dulwich.object_store import MemoryObjectStore
from dulwich.objects import Blob, ShaFile, Tree, TreeEntry
+from dulwich.tests.utils import F, ext_functest_builder, functest_builder, make_object
from . import TestCase
-from .utils import F, ext_functest_builder, functest_builder, make_object
class DiffTestCase(TestCase):
blob - 5076f7767b6d517b08f1dd34c6515586e7763118
blob + 86e3a0565c1854025b7751fe299ec918fd86137e
--- tests/test_fastexport.py
+++ tests/test_fastexport.py
from dulwich.object_store import MemoryObjectStore
from dulwich.objects import ZERO_SHA, Blob, Commit, Tree
from dulwich.repo import MemoryRepo
+from dulwich.tests.utils import build_commit_graph
from . import SkipTest, TestCase
-from .utils import build_commit_graph
class GitFastExporterTests(TestCase):
blob - 0d14b5e356fca3b51749d7c540ef9d6a0b00d93f
blob + c6ac3fd3af34af142063418f75383baf546a650f
--- tests/test_graph.py
+++ tests/test_graph.py
from dulwich.graph import WorkList, _find_lcas, can_fast_forward
from dulwich.repo import MemoryRepo
+from dulwich.tests.utils import make_commit
from . import TestCase
-from .utils import make_commit
class FindMergeBaseTests(TestCase):
blob - 7a6b79b5230c062a11cb89eb3f93c4ad0487879b
blob + b222099a21c3181c15644db2c9d38acbbb92139e
--- tests/test_missing_obj_finder.py
+++ tests/test_missing_obj_finder.py
from dulwich.object_store import MemoryObjectStore, MissingObjectFinder
from dulwich.objects import Blob
+from dulwich.tests.utils import build_commit_graph, make_object, make_tag
from . import TestCase
-from .utils import build_commit_graph, make_object, make_tag
class MissingObjectFinderTest(TestCase):
blob - b9d78e8aa227c57b0163118e3a4690b9e562d896
blob + ac4cb9edb926f58bc7371bd5c5db25fc9f6bd2cc
--- tests/test_object_store.py
+++ tests/test_object_store.py
import tempfile
from contextlib import closing
from io import BytesIO
-from unittest import skipUnless
from dulwich.errors import NotTreeError
from dulwich.index import commit_tree
ObjectStoreGraphWalker,
OverlayObjectStore,
commit_tree_changes,
- iter_tree_contents,
- peel_sha,
read_packs_file,
tree_lookup_path,
)
sha_to_hex,
)
from dulwich.pack import REF_DELTA, write_pack_objects
-from dulwich.protocol import DEPTH_INFINITE
+from dulwich.tests.test_object_store import ObjectStoreTests, PackBasedObjectStoreTests
+from dulwich.tests.utils import build_pack, make_object
from . import TestCase
-from .utils import build_pack, make_object, make_tag
-try:
- from unittest.mock import patch
-except ImportError:
- patch = None # type: ignore
-
-
testobject = make_object(Blob, data=b"yummy data")
-class ObjectStoreTests:
- def test_determine_wants_all(self):
- self.assertEqual(
- [b"1" * 40],
- self.store.determine_wants_all({b"refs/heads/foo": b"1" * 40}),
- )
-
- def test_determine_wants_all_zero(self):
- self.assertEqual(
- [], self.store.determine_wants_all({b"refs/heads/foo": b"0" * 40})
- )
-
- @skipUnless(patch, "Required mock.patch")
- def test_determine_wants_all_depth(self):
- self.store.add_object(testobject)
- refs = {b"refs/heads/foo": testobject.id}
- with patch.object(self.store, "_get_depth", return_value=1) as m:
- self.assertEqual([], self.store.determine_wants_all(refs, depth=0))
- self.assertEqual(
- [testobject.id],
- self.store.determine_wants_all(refs, depth=DEPTH_INFINITE),
- )
- m.assert_not_called()
-
- self.assertEqual([], self.store.determine_wants_all(refs, depth=1))
- m.assert_called_with(testobject.id)
- self.assertEqual(
- [testobject.id], self.store.determine_wants_all(refs, depth=2)
- )
-
- def test_get_depth(self):
- self.assertEqual(0, self.store._get_depth(testobject.id))
-
- self.store.add_object(testobject)
- self.assertEqual(
- 1, self.store._get_depth(testobject.id, get_parents=lambda x: [])
- )
-
- parent = make_object(Blob, data=b"parent data")
- self.store.add_object(parent)
- self.assertEqual(
- 2,
- self.store._get_depth(
- testobject.id,
- get_parents=lambda x: [parent.id] if x == testobject else [],
- ),
- )
-
- def test_iter(self):
- self.assertEqual([], list(self.store))
-
- def test_get_nonexistant(self):
- self.assertRaises(KeyError, lambda: self.store[b"a" * 40])
-
- def test_contains_nonexistant(self):
- self.assertNotIn(b"a" * 40, self.store)
-
- def test_add_objects_empty(self):
- self.store.add_objects([])
-
- def test_add_commit(self):
- # TODO: Argh, no way to construct Git commit objects without
- # access to a serialized form.
- self.store.add_objects([])
-
- def test_store_resilience(self):
- """Test if updating an existing stored object doesn't erase the
- object from the store.
- """
- test_object = make_object(Blob, data=b"data")
-
- self.store.add_object(test_object)
- test_object_id = test_object.id
- test_object.data = test_object.data + b"update"
- stored_test_object = self.store[test_object_id]
-
- self.assertNotEqual(test_object.id, stored_test_object.id)
- self.assertEqual(stored_test_object.id, test_object_id)
-
- def test_add_object(self):
- self.store.add_object(testobject)
- self.assertEqual({testobject.id}, set(self.store))
- self.assertIn(testobject.id, self.store)
- r = self.store[testobject.id]
- self.assertEqual(r, testobject)
-
- def test_add_objects(self):
- data = [(testobject, "mypath")]
- self.store.add_objects(data)
- self.assertEqual({testobject.id}, set(self.store))
- self.assertIn(testobject.id, self.store)
- r = self.store[testobject.id]
- self.assertEqual(r, testobject)
-
- def test_tree_changes(self):
- blob_a1 = make_object(Blob, data=b"a1")
- blob_a2 = make_object(Blob, data=b"a2")
- blob_b = make_object(Blob, data=b"b")
- for blob in [blob_a1, blob_a2, blob_b]:
- self.store.add_object(blob)
-
- blobs_1 = [(b"a", blob_a1.id, 0o100644), (b"b", blob_b.id, 0o100644)]
- tree1_id = commit_tree(self.store, blobs_1)
- blobs_2 = [(b"a", blob_a2.id, 0o100644), (b"b", blob_b.id, 0o100644)]
- tree2_id = commit_tree(self.store, blobs_2)
- change_a = (
- (b"a", b"a"),
- (0o100644, 0o100644),
- (blob_a1.id, blob_a2.id),
- )
- self.assertEqual([change_a], list(self.store.tree_changes(tree1_id, tree2_id)))
- self.assertEqual(
- [
- change_a,
- ((b"b", b"b"), (0o100644, 0o100644), (blob_b.id, blob_b.id)),
- ],
- list(self.store.tree_changes(tree1_id, tree2_id, want_unchanged=True)),
- )
-
- def test_iter_tree_contents(self):
- blob_a = make_object(Blob, data=b"a")
- blob_b = make_object(Blob, data=b"b")
- blob_c = make_object(Blob, data=b"c")
- for blob in [blob_a, blob_b, blob_c]:
- self.store.add_object(blob)
-
- blobs = [
- (b"a", blob_a.id, 0o100644),
- (b"ad/b", blob_b.id, 0o100644),
- (b"ad/bd/c", blob_c.id, 0o100755),
- (b"ad/c", blob_c.id, 0o100644),
- (b"c", blob_c.id, 0o100644),
- ]
- tree_id = commit_tree(self.store, blobs)
- self.assertEqual(
- [TreeEntry(p, m, h) for (p, h, m) in blobs],
- list(iter_tree_contents(self.store, tree_id)),
- )
- self.assertEqual([], list(iter_tree_contents(self.store, None)))
-
- def test_iter_tree_contents_include_trees(self):
- blob_a = make_object(Blob, data=b"a")
- blob_b = make_object(Blob, data=b"b")
- blob_c = make_object(Blob, data=b"c")
- for blob in [blob_a, blob_b, blob_c]:
- self.store.add_object(blob)
-
- blobs = [
- (b"a", blob_a.id, 0o100644),
- (b"ad/b", blob_b.id, 0o100644),
- (b"ad/bd/c", blob_c.id, 0o100755),
- ]
- tree_id = commit_tree(self.store, blobs)
- tree = self.store[tree_id]
- tree_ad = self.store[tree[b"ad"][1]]
- tree_bd = self.store[tree_ad[b"bd"][1]]
-
- expected = [
- TreeEntry(b"", 0o040000, tree_id),
- TreeEntry(b"a", 0o100644, blob_a.id),
- TreeEntry(b"ad", 0o040000, tree_ad.id),
- TreeEntry(b"ad/b", 0o100644, blob_b.id),
- TreeEntry(b"ad/bd", 0o040000, tree_bd.id),
- TreeEntry(b"ad/bd/c", 0o100755, blob_c.id),
- ]
- actual = iter_tree_contents(self.store, tree_id, include_trees=True)
- self.assertEqual(expected, list(actual))
-
- def make_tag(self, name, obj):
- tag = make_tag(obj, name=name)
- self.store.add_object(tag)
- return tag
-
- def test_peel_sha(self):
- self.store.add_object(testobject)
- tag1 = self.make_tag(b"1", testobject)
- tag2 = self.make_tag(b"2", testobject)
- tag3 = self.make_tag(b"3", testobject)
- for obj in [testobject, tag1, tag2, tag3]:
- self.assertEqual((obj, testobject), peel_sha(self.store, obj.id))
-
- def test_get_raw(self):
- self.store.add_object(testobject)
- self.assertEqual(
- (Blob.type_num, b"yummy data"), self.store.get_raw(testobject.id)
- )
-
- def test_close(self):
- # For now, just check that close doesn't barf.
- self.store.add_object(testobject)
- self.store.close()
-
-
class OverlayObjectStoreTests(ObjectStoreTests, TestCase):
def setUp(self):
TestCase.setUp(self)
o.add_thin_pack(f.read, None)
-class PackBasedObjectStoreTests(ObjectStoreTests):
- def tearDown(self):
- for pack in self.store.packs:
- pack.close()
-
- def test_empty_packs(self):
- self.assertEqual([], list(self.store.packs))
-
- def test_pack_loose_objects(self):
- b1 = make_object(Blob, data=b"yummy data")
- self.store.add_object(b1)
- b2 = make_object(Blob, data=b"more yummy data")
- self.store.add_object(b2)
- b3 = make_object(Blob, data=b"even more yummy data")
- b4 = make_object(Blob, data=b"and more yummy data")
- self.store.add_objects([(b3, None), (b4, None)])
- self.assertEqual({b1.id, b2.id, b3.id, b4.id}, set(self.store))
- self.assertEqual(1, len(self.store.packs))
- self.assertEqual(2, self.store.pack_loose_objects())
- self.assertNotEqual([], list(self.store.packs))
- self.assertEqual(0, self.store.pack_loose_objects())
-
- def test_repack(self):
- b1 = make_object(Blob, data=b"yummy data")
- self.store.add_object(b1)
- b2 = make_object(Blob, data=b"more yummy data")
- self.store.add_object(b2)
- b3 = make_object(Blob, data=b"even more yummy data")
- b4 = make_object(Blob, data=b"and more yummy data")
- self.store.add_objects([(b3, None), (b4, None)])
- b5 = make_object(Blob, data=b"and more data")
- b6 = make_object(Blob, data=b"and some more data")
- self.store.add_objects([(b5, None), (b6, None)])
- self.assertEqual({b1.id, b2.id, b3.id, b4.id, b5.id, b6.id}, set(self.store))
- self.assertEqual(2, len(self.store.packs))
- self.assertEqual(6, self.store.repack())
- self.assertEqual(1, len(self.store.packs))
- self.assertEqual(0, self.store.pack_loose_objects())
-
- def test_repack_existing(self):
- b1 = make_object(Blob, data=b"yummy data")
- self.store.add_object(b1)
- b2 = make_object(Blob, data=b"more yummy data")
- self.store.add_object(b2)
- self.store.add_objects([(b1, None), (b2, None)])
- self.store.add_objects([(b2, None)])
- self.assertEqual({b1.id, b2.id}, set(self.store))
- self.assertEqual(2, len(self.store.packs))
- self.assertEqual(2, self.store.repack())
- self.assertEqual(1, len(self.store.packs))
- self.assertEqual(0, self.store.pack_loose_objects())
-
- self.assertEqual({b1.id, b2.id}, set(self.store))
- self.assertEqual(1, len(self.store.packs))
- self.assertEqual(2, self.store.repack())
- self.assertEqual(1, len(self.store.packs))
- self.assertEqual(0, self.store.pack_loose_objects())
-
-
class DiskObjectStoreTests(PackBasedObjectStoreTests, TestCase):
def setUp(self):
TestCase.setUp(self)
blob - 9c4874bf5f054d0c4232b6c25748f973daed45c6
blob + 0d907f1e1b9529f39144093d2d8e354aae42278e
--- tests/test_objects.py
+++ tests/test_objects.py
sha_to_hex,
sorted_tree_items,
)
+from dulwich.tests.utils import (
+ ext_functest_builder,
+ functest_builder,
+ make_commit,
+ make_object,
+)
from . import TestCase
-from .utils import ext_functest_builder, functest_builder, make_commit, make_object
a_sha = b"6f670c0fb53f9463760b7295fbb814e965fb20c8"
b_sha = b"2969be3e8ee1c0222396a5611407e4769f14e54b"
blob - 185c1fe9b84465db4ab3418148e562c98e85716c
blob + d21e3eee40eeb55d544e83d8ae55c8775eb10d33
--- tests/test_objectspec.py
+++ tests/test_objectspec.py
parse_tree,
)
from dulwich.repo import MemoryRepo
+from dulwich.tests.utils import build_commit_graph
from . import TestCase
-from .utils import build_commit_graph
class ParseObjectTests(TestCase):
blob - c796657042e5c56de556eed051de27fafc1be7e2
blob + 609ce6db00e1b3bf81dbc7d10139a803753e48a6
--- tests/test_pack.py
+++ tests/test_pack.py
write_pack_index_v2,
write_pack_object,
)
+from dulwich.tests.utils import build_pack, make_object
from . import TestCase
-from .utils import build_pack, make_object
pack1_sha = b"bc63ddad95e7321ee734ea11a7a62d314e0d7481"
blob - 7a1c26f38ecff24e70b2549428e85f2e02d464da
blob + 95220a6c4b673d084ac5e66bc81f53b3820dfbf6
--- tests/test_porcelain.py
+++ tests/test_porcelain.py
from dulwich.porcelain import CheckoutError
from dulwich.repo import NoIndexPresent, Repo
from dulwich.server import DictBackend
+from dulwich.tests.utils import build_commit_graph, make_commit, make_object
from dulwich.web import make_server, make_wsgi_chain
from . import TestCase
-from .utils import build_commit_graph, make_commit, make_object
try:
import gpg
blob - 31e3e37254f6b8267c0129b2abd21c4b81ddec7e
blob + 572be9d8043de99402ee1aca3561a3c9dd412a1c
--- tests/test_refs.py
+++ tests/test_refs.py
write_packed_refs,
)
from dulwich.repo import Repo
+from dulwich.tests.utils import open_repo, tear_down_repo
from . import SkipTest, TestCase
-from .utils import open_repo, tear_down_repo
class CheckRefFormatTests(TestCase):
blob - 80605d1b70ff8bbce62d274533238b05e3b62a82
blob + a063c45760ce8294805392e458913e61f34800cf
--- tests/test_repository.py
+++ tests/test_repository.py
UnsupportedVersion,
check_user_identity,
)
+from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
from . import TestCase, skipIf
-from .utils import open_repo, setup_warning_catcher, tear_down_repo
missing_sha = b"b91fa4d900e17e99b433218e988c4eb4a3e9a097"
blob - 2fe005ccd8948db3b6714924a37e34384bbee91e
blob + 7ecb7122f7ff8076cf7cb9a42f9f6fa888e2b4fe
--- tests/test_server.py
+++ tests/test_server.py
serve_command,
update_server_info,
)
+from dulwich.tests.utils import make_commit, make_tag
from . import TestCase
-from .utils import make_commit, make_tag
ONE = b"1" * 40
TWO = b"2" * 40
blob - e1814e94f13685e284387f640e55734c82e3c2c0
blob + c22667fb7967f4d22b4aad12faf6943fc6ff8cf3
--- tests/test_utils.py
+++ tests/test_utils.py
from dulwich.object_store import MemoryObjectStore
from dulwich.objects import Blob
+from dulwich.tests.utils import build_commit_graph, make_object
from . import TestCase
-from .utils import build_commit_graph, make_object
class BuildCommitGraphTest(TestCase):
blob - 79650c423517555b700212e11291946743e18e13
blob + f803b668fe45b13662e08c054559695caf701dc4
--- tests/test_walk.py
+++ tests/test_walk.py
from dulwich.errors import MissingCommitError
from dulwich.object_store import MemoryObjectStore
from dulwich.objects import Blob, Commit
+from dulwich.tests.utils import F, build_commit_graph, make_object, make_tag
from dulwich.walk import ORDER_TOPO, WalkEntry, Walker, _topo_reorder
from . import TestCase
-from .utils import F, build_commit_graph, make_object, make_tag
class TestWalkEntry:
blob - d3ea62d6030b4d807c5d1c44f677f0c58c309c12
blob + 968511b3c5f6aece5807b9d9a580e482b03668d5
--- tests/test_web.py
+++ tests/test_web.py
from dulwich.objects import Blob
from dulwich.repo import BaseRepo, MemoryRepo
from dulwich.server import DictBackend
+from dulwich.tests.utils import make_object, make_tag
from dulwich.web import (
HTTP_ERROR,
HTTP_FORBIDDEN,
)
from . import TestCase
-from .utils import make_object, make_tag
class MinimalistWSGIInputStream:
blob - 13fe5a4542c3d02b3dcf7fd70194a01a39c6f7fd (mode 644)
blob + /dev/null
--- tests/utils.py
+++ /dev/null
-# utils.py -- Test utilities for Dulwich.
-# Copyright (C) 2010 Google, Inc.
-#
-# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
-# General Public License as public by the Free Software Foundation; version 2.0
-# or (at your option) any later version. You can redistribute it and/or
-# modify it under the terms of either of these two licenses.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# You should have received a copy of the licenses; if not, see
-# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
-# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
-# License, Version 2.0.
-#
-
-"""Utility functions common to Dulwich tests."""
-
-import datetime
-import os
-import shutil
-import tempfile
-import time
-import types
-import warnings
-
-from dulwich.index import commit_tree
-from dulwich.objects import Commit, FixedSha, Tag, object_class
-from dulwich.pack import (
- DELTA_TYPES,
- OFS_DELTA,
- REF_DELTA,
- SHA1Writer,
- create_delta,
- obj_sha,
- write_pack_header,
- write_pack_object,
-)
-from dulwich.repo import Repo
-
-from . import SkipTest
-
-# Plain files are very frequently used in tests, so let the mode be very short.
-F = 0o100644 # Shorthand mode for Files.
-
-
-def open_repo(name, temp_dir=None):
- """Open a copy of a repo in a temporary directory.
-
- Use this function for accessing repos in dulwich/tests/data/repos to avoid
- accidentally or intentionally modifying those repos in place. Use
- tear_down_repo to delete any temp files created.
-
- Args:
- name: The name of the repository, relative to
- dulwich/tests/data/repos
- temp_dir: temporary directory to initialize to. If not provided, a
- temporary directory will be created.
- Returns: An initialized Repo object that lives in a temporary directory.
- """
- if temp_dir is None:
- temp_dir = tempfile.mkdtemp()
- repo_dir = os.path.join(
- os.path.dirname(__file__), "..", "testdata", "repos", name
- )
- temp_repo_dir = os.path.join(temp_dir, name)
- shutil.copytree(repo_dir, temp_repo_dir, symlinks=True)
- return Repo(temp_repo_dir)
-
-
-def tear_down_repo(repo):
- """Tear down a test repository."""
- repo.close()
- temp_dir = os.path.dirname(repo.path.rstrip(os.sep))
- shutil.rmtree(temp_dir)
-
-
-def make_object(cls, **attrs):
- """Make an object for testing and assign some members.
-
- This method creates a new subclass to allow arbitrary attribute
- reassignment, which is not otherwise possible with objects having
- __slots__.
-
- Args:
- attrs: dict of attributes to set on the new object.
- Returns: A newly initialized object of type cls.
- """
-
- class TestObject(cls):
- """Class that inherits from the given class, but without __slots__.
-
- Note that classes with __slots__ can't have arbitrary attributes
- monkey-patched in, so this is a class that is exactly the same only
- with a __dict__ instead of __slots__.
- """
-
- TestObject.__name__ = "TestObject_" + cls.__name__
-
- obj = TestObject()
- for name, value in attrs.items():
- if name == "id":
- # id property is read-only, so we overwrite sha instead.
- sha = FixedSha(value)
- obj.sha = lambda: sha
- else:
- setattr(obj, name, value)
- return obj
-
-
-def make_commit(**attrs):
- """Make a Commit object with a default set of members.
-
- Args:
- attrs: dict of attributes to overwrite from the default values.
- Returns: A newly initialized Commit object.
- """
- default_time = 1262304000 # 2010-01-01 00:00:00
- all_attrs = {
- "author": b"Test Author <test@nodomain.com>",
- "author_time": default_time,
- "author_timezone": 0,
- "committer": b"Test Committer <test@nodomain.com>",
- "commit_time": default_time,
- "commit_timezone": 0,
- "message": b"Test message.",
- "parents": [],
- "tree": b"0" * 40,
- }
- all_attrs.update(attrs)
- return make_object(Commit, **all_attrs)
-
-
-def make_tag(target, **attrs):
- """Make a Tag object with a default set of values.
-
- Args:
- target: object to be tagged (Commit, Blob, Tree, etc)
- attrs: dict of attributes to overwrite from the default values.
- Returns: A newly initialized Tag object.
- """
- target_id = target.id
- target_type = object_class(target.type_name)
- default_time = int(time.mktime(datetime.datetime(2010, 1, 1).timetuple()))
- all_attrs = {
- "tagger": b"Test Author <test@nodomain.com>",
- "tag_time": default_time,
- "tag_timezone": 0,
- "message": b"Test message.",
- "object": (target_type, target_id),
- "name": b"Test Tag",
- }
- all_attrs.update(attrs)
- return make_object(Tag, **all_attrs)
-
-
-def functest_builder(method, func):
- """Generate a test method that tests the given function."""
-
- def do_test(self):
- method(self, func)
-
- return do_test
-
-
-def ext_functest_builder(method, func):
- """Generate a test method that tests the given extension function.
-
- This is intended to generate test methods that test both a pure-Python
- version and an extension version using common test code. The extension test
- will raise SkipTest if the extension is not found.
-
- Sample usage:
-
- class MyTest(TestCase);
- def _do_some_test(self, func_impl):
- self.assertEqual('foo', func_impl())
-
- test_foo = functest_builder(_do_some_test, foo_py)
- test_foo_extension = ext_functest_builder(_do_some_test, _foo_c)
-
- Args:
- method: The method to run. It must must two parameters, self and the
- function implementation to test.
- func: The function implementation to pass to method.
- """
-
- def do_test(self):
- if not isinstance(func, types.BuiltinFunctionType):
- raise SkipTest("%s extension not found" % func)
- method(self, func)
-
- return do_test
-
-
-def build_pack(f, objects_spec, store=None):
- """Write test pack data from a concise spec.
-
- Args:
- f: A file-like object to write the pack to.
- objects_spec: A list of (type_num, obj). For non-delta types, obj
- is the string of that object's data.
- For delta types, obj is a tuple of (base, data), where:
-
- * base can be either an index in objects_spec of the base for that
- * delta; or for a ref delta, a SHA, in which case the resulting pack
- * will be thin and the base will be an external ref.
- * data is a string of the full, non-deltified data for that object.
-
- Note that offsets/refs and deltas are computed within this function.
- store: An optional ObjectStore for looking up external refs.
- Returns: A list of tuples in the order specified by objects_spec:
- (offset, type num, data, sha, CRC32)
- """
- sf = SHA1Writer(f)
- num_objects = len(objects_spec)
- write_pack_header(sf.write, num_objects)
-
- full_objects = {}
- offsets = {}
- crc32s = {}
-
- while len(full_objects) < num_objects:
- for i, (type_num, data) in enumerate(objects_spec):
- if type_num not in DELTA_TYPES:
- full_objects[i] = (type_num, data, obj_sha(type_num, [data]))
- continue
- base, data = data
- if isinstance(base, int):
- if base not in full_objects:
- continue
- base_type_num, _, _ = full_objects[base]
- else:
- base_type_num, _ = store.get_raw(base)
- full_objects[i] = (
- base_type_num,
- data,
- obj_sha(base_type_num, [data]),
- )
-
- for i, (type_num, obj) in enumerate(objects_spec):
- offset = f.tell()
- if type_num == OFS_DELTA:
- base_index, data = obj
- base = offset - offsets[base_index]
- _, base_data, _ = full_objects[base_index]
- obj = (base, list(create_delta(base_data, data)))
- elif type_num == REF_DELTA:
- base_ref, data = obj
- if isinstance(base_ref, int):
- _, base_data, base = full_objects[base_ref]
- else:
- base_type_num, base_data = store.get_raw(base_ref)
- base = obj_sha(base_type_num, base_data)
- obj = (base, list(create_delta(base_data, data)))
-
- crc32 = write_pack_object(sf.write, type_num, obj)
- offsets[i] = offset
- crc32s[i] = crc32
-
- expected = []
- for i in range(num_objects):
- type_num, data, sha = full_objects[i]
- assert len(sha) == 20
- expected.append((offsets[i], type_num, data, sha, crc32s[i]))
-
- sf.write_sha()
- f.seek(0)
- return expected
-
-
-def build_commit_graph(object_store, commit_spec, trees=None, attrs=None):
- """Build a commit graph from a concise specification.
-
- Sample usage:
- >>> c1, c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]])
- >>> store[store[c3].parents[0]] == c1
- True
- >>> store[store[c3].parents[1]] == c2
- True
-
- If not otherwise specified, commits will refer to the empty tree and have
- commit times increasing in the same order as the commit spec.
-
- Args:
- object_store: An ObjectStore to commit objects to.
- commit_spec: An iterable of iterables of ints defining the commit
- graph. Each entry defines one commit, and entries must be in
- topological order. The first element of each entry is a commit number,
- and the remaining elements are its parents. The commit numbers are only
- meaningful for the call to make_commits; since real commit objects are
- created, they will get created with real, opaque SHAs.
- trees: An optional dict of commit number -> tree spec for building
- trees for commits. The tree spec is an iterable of (path, blob, mode)
- or (path, blob) entries; if mode is omitted, it defaults to the normal
- file mode (0100644).
- attrs: A dict of commit number -> (dict of attribute -> value) for
- assigning additional values to the commits.
- Returns: The list of commit objects created.
-
- Raises:
- ValueError: If an undefined commit identifier is listed as a parent.
- """
- if trees is None:
- trees = {}
- if attrs is None:
- attrs = {}
- commit_time = 0
- nums = {}
- commits = []
-
- for commit in commit_spec:
- commit_num = commit[0]
- try:
- parent_ids = [nums[pn] for pn in commit[1:]]
- except KeyError as exc:
- (missing_parent,) = exc.args
- raise ValueError("Unknown parent %i" % missing_parent) from exc
-
- blobs = []
- for entry in trees.get(commit_num, []):
- if len(entry) == 2:
- path, blob = entry
- entry = (path, blob, F)
- path, blob, mode = entry
- blobs.append((path, blob.id, mode))
- object_store.add_object(blob)
- tree_id = commit_tree(object_store, blobs)
-
- commit_attrs = {
- "message": ("Commit %i" % commit_num).encode("ascii"),
- "parents": parent_ids,
- "tree": tree_id,
- "commit_time": commit_time,
- }
- commit_attrs.update(attrs.get(commit_num, {}))
- commit_obj = make_commit(**commit_attrs)
-
- # By default, increment the time by a lot. Out-of-order commits should
- # be closer together than this because their main cause is clock skew.
- commit_time = commit_attrs["commit_time"] + 100
- nums[commit_num] = commit_obj.id
- object_store.add_object(commit_obj)
- commits.append(commit_obj)
-
- return commits
-
-
-def setup_warning_catcher():
- """Wrap warnings.showwarning with code that records warnings."""
- caught_warnings = []
- original_showwarning = warnings.showwarning
-
- def custom_showwarning(*args, **kwargs):
- caught_warnings.append(args[0])
-
- warnings.showwarning = custom_showwarning
-
- def restore_showwarning():
- warnings.showwarning = original_showwarning
-
- return caught_warnings, restore_showwarning