commit - eb409c082b7fd173c9633b481c67e96331083062
commit + 2ea06096079b0d6641dc41e5c598fb835607e54e
blob - 3d71e24bed4cdbbd1dd1e17fc3703ac4e227d678
blob + 0bf0e1f3ab41a828ded9c8b28fe5df0c454dfb69
--- NEWS
+++ NEWS
* On Windows, provide a hint about developer mode
when creating symlinks fails due to a permission
error. (Jelmer Vernooij, #1005)
+
+ * Add new ``ObjectID`` type in ``dulwich.objects``,
+ currently just an alias for ``bytes``.
+ (Jelmer Vernooij)
* Support repository format version 1.
(Jelmer Vernooij, #1056)
blob - fc26dd2fc6304034d507e3b89d70067e2395a6ce
blob + 492a10ac8b8884ab7096db6c2f3b9277255eb05f
--- dulwich/objects.py
+++ dulwich/objects.py
Iterable,
Union,
Type,
+ Iterator,
+ List,
)
import zlib
from hashlib import sha1
BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
+ObjectID = bytes
+
+
class EmptyFileException(FileFormatException):
"""An unexpectedly empty file was encountered."""
def object_header(num_type: int, length: int) -> bytes:
"""Return an object header for the given numeric type and text length."""
- return object_class(num_type).type_name + b" " + str(length).encode("ascii") + b"\0"
+ cls = object_class(num_type)
+ if cls is None:
+ raise AssertionError("unsupported class type num: %d" % num_type)
+ return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
def serializable_property(name: str, docstring: Optional[str] = None):
return property(get, set, doc=docstring)
-def object_class(type):
+def object_class(type: Union[bytes, int]) -> Optional[Type["ShaFile"]]:
"""Get the object class corresponding to the given type.
Args:
raise ObjectFormatException("%s %s" % (error_msg, hex))
-def check_identity(identity, error_msg):
+def check_identity(identity: bytes, error_msg: str) -> None:
"""Check if the specified identity is valid.
This will raise an exception if the identity is not valid.
__slots__ = ("_chunked_text", "_sha", "_needs_serialization")
- type_name = None # type: bytes
- type_num = None # type: int
+ _needs_serialization: bool
+ type_name: bytes
+ type_num: int
+ _chunked_text: Optional[List[bytes]]
@staticmethod
- def _parse_legacy_object_header(magic, f):
+ def _parse_legacy_object_header(magic, f) -> "ShaFile":
"""Parse a legacy object, creating it but not reading the file."""
bufsize = 1024
decomp = zlib.decompressobj()
"Object size not an integer: %s" % exc) from exc
obj_class = object_class(type_name)
if not obj_class:
- raise ObjectFormatException("Not a known type: %s" % type_name)
+ raise ObjectFormatException("Not a known type: %s" % type_name.decode('ascii'))
return obj_class()
- def _parse_legacy_object(self, map):
+ def _parse_legacy_object(self, map) -> None:
"""Parse a legacy object, setting the raw string."""
text = _decompress(map)
header_end = text.find(b"\0")
raise ObjectFormatException("Invalid object header, no \\0")
self.set_raw_string(text[header_end + 1 :])
- def as_legacy_object_chunks(self, compression_level=-1):
+ def as_legacy_object_chunks(
+ self, compression_level: int = -1) -> Iterator[bytes]:
"""Return chunks representing the object in the experimental format.
Returns: List of strings
yield compobj.compress(chunk)
yield compobj.flush()
- def as_legacy_object(self, compression_level=-1):
+ def as_legacy_object(self, compression_level: int = -1) -> bytes:
"""Return string representing the object in the experimental format."""
return b"".join(
self.as_legacy_object_chunks(compression_level=compression_level)
)
- def as_raw_chunks(self):
+ def as_raw_chunks(self) -> List[bytes]:
"""Return chunks with serialization of the object.
Returns: List of strings, not necessarily one per line
self._sha = None
self._chunked_text = self._serialize()
self._needs_serialization = False
- return self._chunked_text
+ return self._chunked_text # type: ignore
- def as_raw_string(self):
+ def as_raw_string(self) -> bytes:
"""Return raw string with serialization of the object.
Returns: String object
"""
return b"".join(self.as_raw_chunks())
- def __bytes__(self):
+ def __bytes__(self) -> bytes:
"""Return raw string serialization of this object."""
return self.as_raw_string()
"""Return unique hash for this object."""
return hash(self.id)
- def as_pretty_string(self):
+ def as_pretty_string(self) -> bytes:
"""Return a string representing this object, fit for display."""
return self.as_raw_string()
- def set_raw_string(self, text, sha=None):
+ def set_raw_string(
+ self, text: bytes, sha: Optional[ObjectID] = None) -> None:
"""Set the contents of this object from a serialized string."""
if not isinstance(text, bytes):
raise TypeError("Expected bytes for text, got %r" % text)
self.set_raw_chunks([text], sha)
- def set_raw_chunks(self, chunks, sha=None):
+ def set_raw_chunks(
+ self, chunks: List[bytes],
+ sha: Optional[ObjectID] = None) -> None:
"""Set the contents of this object from a list of chunks."""
self._chunked_text = chunks
self._deserialize(chunks)
if sha is None:
self._sha = None
else:
- self._sha = FixedSha(sha)
+ self._sha = FixedSha(sha) # type: ignore
self._needs_serialization = False
@staticmethod
raise ObjectFormatException("Not a known type %d" % num_type)
return obj_class()
- def _parse_object(self, map):
+ def _parse_object(self, map) -> None:
"""Parse a new style object, setting self._text."""
# skip type and size; type must have already been determined, and
# we trust zlib to fail if it's otherwise corrupted
self.set_raw_string(_decompress(raw))
@classmethod
- def _is_legacy_object(cls, magic):
+ def _is_legacy_object(cls, magic: bytes) -> bool:
b0 = ord(magic[0:1])
b1 = ord(magic[1:2])
word = (b0 << 8) + b1
return obj
@staticmethod
- def from_raw_chunks(type_num, chunks, sha=None):
+ def from_raw_chunks(
+ type_num: int, chunks: List[bytes],
+ sha: Optional[ObjectID] = None):
"""Creates an object of the indicated type from the raw chunks given.
Args:
chunks: An iterable of the raw uncompressed contents.
sha: Optional known sha for the object
"""
- obj = object_class(type_num)()
+ cls = object_class(type_num)
+ if cls is None:
+ raise AssertionError("unsupported class type num: %d" % type_num)
+ obj = cls()
obj.set_raw_chunks(chunks, sha)
return obj
if getattr(self, member, None) is None:
raise ObjectFormatException(error_msg)
- def check(self):
+ def check(self) -> None:
"""Check this object for internal consistency.
Raises:
raise ChecksumMismatch(new_sha, old_sha)
def _header(self):
- return object_header(self.type, self.raw_length())
+ return object_header(self.type_num, self.raw_length())
- def raw_length(self):
+ def raw_length(self) -> int:
"""Returns the length of the raw string of this object."""
ret = 0
for chunk in self.as_raw_chunks():
def copy(self):
"""Create a new copy of this SHA1 object from its raw string"""
- obj_class = object_class(self.get_type())
- return obj_class.from_raw_string(self.get_type(), self.as_raw_string(), self.id)
+ obj_class = object_class(self.type_num)
+ return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
@property
def id(self):
"""The hex SHA of this object."""
return self.sha().hexdigest().encode("ascii")
- def get_type(self):
- """Return the type number for this object class."""
- return self.type_num
-
- def set_type(self, type):
- """Set the type number for this object class."""
- self.type_num = type
-
- # DEPRECATED: use type_num or type_name as needed.
- type = property(get_type, set_type)
-
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.id)
"""
super(Blob, self).check()
- def splitlines(self):
+ def splitlines(self) -> List[bytes]:
"""Return list of lines in this blob.
This preserves the original line endings.
return ret
-def _parse_message(chunks):
+def _parse_message(chunks: Iterable[bytes]):
"""Parse a message with a list of fields and a body.
Args:
"""
f = BytesIO(b"".join(chunks))
k = None
- v = ""
+ v = b""
eof = False
def _strip_last_newline(value):
Tag,
)
-_TYPE_MAP = {} # type: Dict[Union[bytes, int], Type[ShaFile]]
+_TYPE_MAP: Dict[Union[bytes, int], Type[ShaFile]] = {}
for cls in OBJECT_CLASSES:
_TYPE_MAP[cls.type_name] = cls
blob - 45a0f38f40453ac36b72f30f375448a68a0bc57d
blob + 5a38f71770b56e3f191b5c3d79cb731fbf473130
--- dulwich/walk.py
+++ dulwich/walk.py
import collections
import heapq
from itertools import chain
+from typing import List, Tuple, Set
from dulwich.diff_tree import (
RENAME_CHANGE_TYPES,
MissingCommitError,
)
from dulwich.objects import (
+ Commit,
Tag,
+ ObjectID,
)
ORDER_DATE = "date"
class _CommitTimeQueue(object):
"""Priority queue of WalkEntry objects by commit time."""
- def __init__(self, walker):
+ def __init__(self, walker: "Walker"):
self._walker = walker
self._store = walker.store
self._get_parents = walker.get_parents
self._excluded = walker.excluded
- self._pq = []
- self._pq_set = set()
- self._seen = set()
- self._done = set()
+ self._pq: List[Tuple[int, Commit]] = []
+ self._pq_set: Set[ObjectID] = set()
+ self._seen: Set[ObjectID] = set()
+ self._done: Set[ObjectID] = set()
self._min_time = walker.since
self._last = None
self._extra_commits_left = _MAX_EXTRA_COMMITS
for commit_id in chain(walker.include, walker.excluded):
self._push(commit_id)
- def _push(self, object_id):
+ def _push(self, object_id: bytes):
try:
obj = self._store[object_id]
except KeyError as exc: