commit - 68564db4734e20bb8ae1024885f63df89dcdd13e
commit + 450d6b364c851be52ba139a23c53f234b42c38f5
blob - 7520fdd3d82a7465065473cb468eb7387c103982
blob + 4b13df9eb1147da3efc5e3906ceffad403b82581
--- dulwich/index.py
+++ dulwich/index.py
Dict,
List,
Optional,
- TYPE_CHECKING,
Iterable,
Iterator,
Tuple,
blob - 5bd2a1ffec6b3651861d061e91475898ba89c3f8
blob + d91771e9710ddfbf1e9db3650492dccaaec683c0
--- dulwich/object_store.py
+++ dulwich/object_store.py
pass
raise KeyError(hexsha)
- def get_raw_unresolved(self, sha1: bytes) -> Tuple[int, Union[bytes, None], List[bytes]]:
+ def get_raw_unresolved(self, name: bytes) -> Tuple[int, Union[bytes, None], List[bytes]]:
"""Obtain the unresolved data for an object.
Args:
kset: set to fill with references to files and directories
"""
filetree = obj_store[tree_sha]
+ assert isinstance(filetree, Tree)
for name, mode, sha in filetree.iteritems():
if not S_ISGITLINK(mode) and sha not in kset:
kset.add(sha)
blob - f10062ee7fdd771f4cb32539ed5d48107328e01f
blob + 1fd4feb6ac3adacc2e41fb828b530cd6a13028e5
--- dulwich/pack.py
+++ dulwich/pack.py
import os
import sys
-from typing import Optional, Callable, Tuple, List, Deque, Union, Protocol, Iterable
+from typing import Optional, Callable, Tuple, List, Deque, Union, Protocol, Iterable, Iterator
import warnings
from hashlib import sha1
"""Add a single object to this object store."""
def add_objects(
- self, objects: Iterable[tuple[ShaFile, Optional[str]]],
+ self, objects: Iterable[Tuple[ShaFile, Optional[str]]],
progress: Optional[Callable[[str], None]] = None) -> None:
"""Add a set of objects to this object store.
self._contents, self._size = (contents, size)
@property
- def path(self):
+ def path(self) -> str:
return self._filename
def __eq__(self, other):
return super().__eq__(other)
- def close(self):
+ def close(self) -> None:
self._file.close()
if getattr(self._contents, "close", None) is not None:
self._contents.close()
- def __len__(self):
+ def __len__(self) -> int:
"""Return the number of entries in this pack index."""
return self._fan_out_table[-1]
- def _unpack_entry(self, i):
+ def _unpack_entry(self, i: int) -> Tuple[bytes, int, Optional[int]]:
"""Unpack the i-th entry in the index file.
Returns: Tuple with object name (SHA), offset in pack file and CRC32
"""Unpack the crc32 checksum for the ith object from the index file."""
raise NotImplementedError(self._unpack_crc32_checksum)
- def _itersha(self):
+ def _itersha(self) -> Iterator[bytes]:
for i in range(len(self)):
yield self._unpack_name(i)
- def iterentries(self):
+ def iterentries(self) -> Iterator[Tuple[bytes, int, Optional[int]]]:
"""Iterate over the entries in this pack index.
Returns: iterator over tuples with object name, offset in packfile and
for i in range(len(self)):
yield self._unpack_entry(i)
- def _read_fan_out_table(self, start_offset):
+ def _read_fan_out_table(self, start_offset: int):
ret = []
for i in range(0x100):
fanout_entry = self._contents[
ret.append(struct.unpack(">L", fanout_entry)[0])
return ret
- def check(self):
+ def check(self) -> None:
"""Check that the stored checksum matches the actual checksum."""
actual = self.calculate_checksum()
stored = self.get_stored_checksum()
if actual != stored:
raise ChecksumMismatch(stored, actual)
- def calculate_checksum(self):
+ def calculate_checksum(self) -> bytes:
"""Calculate the SHA1 checksum over this pack index.
Returns: This is a 20-byte binary digest
"""
return sha1(self._contents[:-20]).digest()
- def get_pack_checksum(self):
+ def get_pack_checksum(self) -> bytes:
"""Return the SHA1 checksum stored for the corresponding packfile.
Returns: 20-byte binary digest
"""
return bytes(self._contents[-40:-20])
- def get_stored_checksum(self):
+ def get_stored_checksum(self) -> bytes:
"""Return the SHA1 checksum stored for this index.
Returns: 20-byte binary digest
"""
return bytes(self._contents[-20:])
- def object_index(self, sha):
+ def object_index(self, sha: bytes) -> int:
"""Return the index in to the corresponding packfile for the object.
Given the name of an object it will return the offset that object
class PackIndex1(FilePackIndex):
"""Version 1 Pack Index file."""
- def __init__(self, filename, file=None, contents=None, size=None):
+ def __init__(self, filename: str, file=None, contents=None, size=None):
super().__init__(filename, file, contents, size)
self.version = 1
self._fan_out_table = self._read_fan_out_table(0)
class PackIndex2(FilePackIndex):
"""Version 2 Pack Index file."""
- def __init__(self, filename, file=None, contents=None, size=None):
+ def __init__(self, filename: str, file=None, contents=None, size=None):
super().__init__(filename, file, contents, size)
if self._contents[:4] != b"\377tOc":
raise AssertionError("Not a v2 pack index file")
return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
-def read_pack_header(read):
+def read_pack_header(read) -> Tuple[Optional[int], Optional[int]]:
"""Read the header of a pack file.
Args:
return (version, num_objects)
-def chunks_length(chunks):
+def chunks_length(chunks: Union[bytes, Iterable[bytes]]) -> int:
if isinstance(chunks, bytes):
return len(chunks)
else:
compute_crc32=False,
include_comp=False,
zlib_bufsize=_ZLIB_BUFSIZE,
-):
+) -> Tuple[UnpackedObject, bytes]:
"""Unpack a Git object.
Args:
def write_pack(
- filename,
- objects,
- *,
- deltify: Optional[bool] = None,
- delta_window_size: Optional[int] = None,
- compression_level: int = -1,
- reuse_pack: Optional[PackedObjectContainer] = None):
-):
+ filename,
+ objects,
+ *,
+ deltify: Optional[bool] = None,
+ delta_window_size: Optional[int] = None,
+ compression_level: int = -1,
+ reuse_pack: Optional[PackedObjectContainer] = None):
"""Write a new pack data file.
Args:
possible_bases.pop()
-def pack_objects_to_data(objects):
+def pack_objects_to_data(
+ objects,
+ delta_window_size: Optional[int] = None,
+ deltify: Optional[bool] = None,
+ reuse_pack: Optional[PackedObjectContainer] = None):
"""Create pack data from objects
Args:
Returns: Tuples with (type_num, hexdigest, delta base, object chunks)
"""
count = len(objects)
- return (
- count,
- (
- (o.type_num, o.sha().digest(), None, o.as_raw_chunks())
- for (o, path) in objects
- ),
- )
+ if deltify is None:
+ # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too
+ # slow at the moment.
+ deltify = False
+ if deltify:
+ pack_contents = deltify_pack_objects(
+ objects, window_size=delta_window_size, reuse_pack=reuse_pack)
+ return (count, pack_contents)
+ else:
+ return (
+ count,
+ (
+ (o.type_num, o.sha().digest(), None, o.as_raw_chunks())
+ for (o, path) in objects
+ ),
+ )
def write_pack_objects(
DeprecationWarning, stacklevel=2)
write = write.write
- if deltify is None:
- # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too
- # slow at the moment.
- deltify = False
- if deltify:
- pack_contents = deltify_pack_objects(
- objects, window_size=delta_window_size, reuse_pack=reuse_pack)
- pack_contents_count = len(objects)
- else:
- pack_contents_count, pack_contents = pack_objects_to_data(objects)
+ pack_contents_count, pack_contents = pack_objects_to_data(
+ objects, delta_window_size=delta_window_size,
+ deltify=deltify,
+ reuse_pack=reuse_pack)
return write_pack_data(
write,
blob - 00716fba3a6e08948850a60ccaf2e81568ab74e2
blob + 53376d3a5859dbed1a819b3d1f69ec089c1d1b01
--- dulwich/server.py
+++ dulwich/server.py
considered shallow and unshallow according to the arguments. Note that
these sets may overlap if a commit is reachable along multiple paths.
"""
- parents: Dict[bytes, List[bytes]] = {}
+ parents: Dict[bytes, List[bytes]] = {}
def get_parents(sha):
result = parents.get(sha, None)