commit - df1660cb6e65b4a211b8f5c8f547293ec65664d0
commit + 217768b3435b3cd6a99ae9e48fa2aabe76a53be8
blob - 13b839d893f897375de0e9464be3c3b29dadffc0
blob + 343b6117ebe64d95a55a143ce494976a471fdf46
--- NEWS
+++ NEWS
* Support ``init.defaultBranch`` config.
(Jelmer Vernooij)
+
+ * Fix ``ObjectStore.iterobjects_subset()`` when
+ hex shas are passed for objects that live in packs.
+ (Jelmer Vernooij, #1166)
0.21.3 2023-02-17
blob - cdeb4e48504158eddfcbeb91dc1f6a6e24a834b6
blob + 3da419739f842a0eb30c491438864ac228e6df17
--- dulwich/objects.py
+++ dulwich/objects.py
Tuple,
Type,
Union,
+ BinaryIO,
)
import zlib
from collections import namedtuple
self._hexsha = hexsha
self._sha = hex_to_sha(hexsha)
- def digest(self):
+ def digest(self) -> bytes:
"""Return the raw SHA digest."""
return self._sha
- def hexdigest(self):
+ def hexdigest(self) -> str:
"""Return the hex SHA digest."""
return self._hexsha.decode("ascii")
_sha: Union[FixedSha, None, HASH]
@staticmethod
- def _parse_legacy_object_header(magic, f) -> "ShaFile":
+ def _parse_legacy_object_header(magic, f: BinaryIO) -> "ShaFile":
"""Parse a legacy object, creating it but not reading the file."""
bufsize = 1024
decomp = zlib.decompressobj()
if field == _OBJECT_HEADER:
self._object_sha = value
elif field == _TYPE_HEADER:
+ assert isinstance(value, bytes)
obj_class = object_class(value)
if not obj_class:
raise ObjectFormatException("Not a known type: %s" % value)
self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
)
- def verify(self, keyids: Optional[Iterable[str]] = None):
+ def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
"""Verify GPG signature for this tag (if it is signed).
Args:
class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])):
"""Named tuple encapsulating a single tree entry."""
- def in_path(self, path):
+ def in_path(self, path: bytes):
"""Return a copy of this entry with the given path prepended."""
if not isinstance(self.path, bytes):
raise TypeError("Expected bytes for path, got %r" % path)
yield TreeEntry(name, mode, hexsha)
-def key_entry(entry):
+def key_entry(entry) -> bytes:
"""Sort key for tree entry.
Args:
- entry: (name, value) tuplee
+ entry: (name, value) tuple
"""
(name, value) = entry
if stat.S_ISDIR(value[0]):
return entry[0]
-def pretty_format_tree_entry(name, mode, hexsha, encoding="utf-8"):
+def pretty_format_tree_entry(name, mode, hexsha, encoding="utf-8") -> str:
"""Pretty format tree entry.
Args:
return list(serialize_tree(self.iteritems()))
def as_pretty_string(self):
- text = []
+ text: List[str] = []
for name, mode, hexsha in self.iteritems():
text.append(pretty_format_tree_entry(name, mode, hexsha))
return "".join(text)
blob - 6dd8ba484fd5d35b923e5307556454df49fdcab1
blob + 7bdfd764ef15678186d48e46dc6dc2bc575497f0
--- dulwich/pack.py
+++ dulwich/pack.py
PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref)
)
- def iterobjects_subset(self, shas, *, allow_missing: bool = False) -> Iterator[ShaFile]:
+ def iterobjects_subset(self, shas: Iterable[ObjectID], *, allow_missing: bool = False) -> Iterator[ShaFile]:
return (
uo
for uo in
PackInflater.for_pack_subset(
self, shas, allow_missing=allow_missing,
resolve_ext_ref=self.resolve_ext_ref)
- if uo.sha() in shas)
+ if uo.id in shas)
- def iter_unpacked_subset(self, shas, *, include_comp: bool = False, allow_missing: bool = False, convert_ofs_delta: bool = False) -> Iterator[UnpackedObject]:
+ def iter_unpacked_subset(self, shas: Iterable[ObjectID], *, include_comp: bool = False, allow_missing: bool = False, convert_ofs_delta: bool = False) -> Iterator[UnpackedObject]:
ofs_pending: Dict[int, List[UnpackedObject]] = defaultdict(list)
ofs: Dict[bytes, int] = {}
todo = set(shas)
blob - 2e943609986c7ea6ebc3589588190336f10efdc4
blob + e9387bc69128f2a9e0054fe03a1a1934269f0d7f
--- dulwich/tests/test_pack.py
+++ dulwich/tests/test_pack.py
self.assertIsInstance(objs[tree_sha], Tree)
self.assertIsInstance(objs[commit_sha], Commit)
+ def test_iterobjects_subset(self):
+ with self.get_pack(pack1_sha) as p:
+ objs = {o.id: o for o in p.iterobjects_subset([commit_sha])}
+ self.assertEqual(1, len(objs))
+ self.assertIsInstance(objs[commit_sha], Commit)
+
class TestThinPack(PackTests):
def setUp(self):
super().setUp()
self.assertEntriesMatch([], entries, self.make_pack_iter_subset(f, []))
f.seek(0)
self.assertEntriesMatch([1, 0], entries, self.make_pack_iter_subset(f, [entries[0][3], entries[1][3]]))
+ f.seek(0)
+ self.assertEntriesMatch(
+ [1, 0], entries, self.make_pack_iter_subset(f, [sha_to_hex(entries[0][3]), sha_to_hex(entries[1][3])]))
def test_ofs_deltas(self):
f = BytesIO()