commit - 1997f839a022252a607d6dff033bdd52f62fff41
commit + ecaf366aabafc87df8b53596ef5d28a30a6d5fac
blob - 3a33d3f4cc7d716df3bc38350fad8a8a3c7bc882
blob + a4e92383472be403bb4344a4cf219a4aae08427f
--- dulwich/index.py
+++ dulwich/index.py
Iterable,
Iterator,
Tuple,
+ Union,
)
if TYPE_CHECKING:
Tree,
hex_to_sha,
sha_to_hex,
+ ObjectID,
)
from dulwich.pack import (
SHA1Reader,
DEFAULT_VERSION = 2
-def pathsplit(path):
+def pathsplit(path: bytes) -> Tuple[bytes, bytes]:
"""Split a /-delimited path into a directory part and a basename.
Args:
))
-def write_cache_entry(f, name, entry, version):
+def write_cache_entry(f, name: bytes, entry: IndexEntry, version: int) -> None:
"""Write an index entry to a file.
Args:
yield read_cache_entry(f, version)
-def read_index_dict(f):
+def read_index_dict(f) -> Dict[bytes, IndexEntry]:
"""Read an index file and return it as a dictionary.
Args:
class Index(object):
"""A Git Index file."""
- def __init__(self, filename):
+ def __init__(self, filename: Union[bytes, str]):
"""Open an index file.
Args:
"""Remove all contents from this index."""
self._byname = {}
- def __setitem__(self, name, x):
+ def __setitem__(self, name: bytes, x: IndexEntry):
assert isinstance(name, bytes)
assert len(x) == len(IndexEntry._fields)
# Remove the old entry if any
self._byname[name] = IndexEntry(*x)
- def __delitem__(self, name):
+ def __delitem__(self, name: bytes):
assert isinstance(name, bytes)
del self._byname[name]
- def iteritems(self):
+ def iteritems(self) -> Iterator[Tuple[bytes, IndexEntry]]:
return self._byname.items()
- def items(self):
+ def items(self) -> Iterator[Tuple[bytes, IndexEntry]]:
return self._byname.items()
- def update(self, entries):
+ def update(self, entries: Dict[bytes, IndexEntry]):
for name, value in entries.items():
self[name] = value
- def changes_from_tree(self, object_store, tree, want_unchanged=False):
+ def changes_from_tree(
+ self, object_store, tree: ObjectID, want_unchanged: bool = False):
"""Find the differences between the contents of this index and a tree.
Args:
def build_file_from_blob(
- blob, mode, target_path, *, honor_filemode=True, tree_encoding="utf-8",
- symlink_fn=None
+ blob: Blob, mode: int, target_path: bytes, *, honor_filemode=True,
+ tree_encoding="utf-8", symlink_fn=None
):
"""Build a file or symlink on disk based on a Git object.
INVALID_DOTNAMES = (b".git", b".", b"..", b"")
-def validate_path_element_default(element):
+def validate_path_element_default(element: bytes) -> bool:
return element.lower() not in INVALID_DOTNAMES
-def validate_path_element_ntfs(element):
+def validate_path_element_ntfs(element: bytes) -> bool:
stripped = element.rstrip(b". ").lower()
if stripped in INVALID_DOTNAMES:
return False
return True
-def validate_path(path, element_validator=validate_path_element_default):
+def validate_path(path: bytes,
+ element_validator=validate_path_element_default) -> bool:
"""Default path validator that just checks for .git/."""
parts = path.split(b"/")
for p in parts:
def build_index_from_tree(
- root_path,
- index_path,
- object_store,
- tree_id,
- honor_filemode=True,
+ root_path: Union[str, bytes],
+ index_path: Union[str, bytes],
+ object_store: "BaseObjectStore",
+ tree_id: bytes,
+ honor_filemode: bool = True,
validate_path_element=validate_path_element_default,
symlink_fn=None
):
index.write()
-def blob_from_path_and_mode(fs_path, mode, tree_encoding="utf-8"):
+def blob_from_path_and_mode(fs_path: bytes, mode: int,
+ tree_encoding="utf-8"):
"""Create a blob from a path and a stat object.
Args:
return blob
-def blob_from_path_and_stat(fs_path, st, tree_encoding="utf-8"):
+def blob_from_path_and_stat(fs_path: bytes, st, tree_encoding="utf-8"):
"""Create a blob from a path and a stat object.
Args:
return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
-def read_submodule_head(path):
+def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
"""Read the head commit of a submodule.
Args:
return None
-def _has_directory_changed(tree_path, entry):
+def _has_directory_changed(tree_path: bytes, entry):
"""Check if a directory has changed after getting an error.
When handling an error trying to create a blob from a path, call this
return False
-def get_unstaged_changes(index: Index, root_path, filter_blob_callback=None):
+def get_unstaged_changes(
+ index: Index, root_path: Union[str, bytes],
+ filter_blob_callback=None):
"""Walk through an index and check for differences against working tree.
Args:
os_sep_bytes = os.sep.encode("ascii")
-def _tree_to_fs_path(root_path, tree_path: bytes):
+def _tree_to_fs_path(root_path: bytes, tree_path: bytes):
"""Convert a git tree path to a file system path.
Args:
return os.path.join(root_path, sep_corrected_path)
-def _fs_to_tree_path(fs_path):
+def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
"""Convert a file system path to a git tree path.
Args:
return tree_path
-def index_entry_from_directory(st, path):
+def index_entry_from_directory(st, path: bytes) -> Optional[IndexEntry]:
if os.path.exists(os.path.join(path, b".git")):
head = read_submodule_head(path)
if head is None:
return None
-def index_entry_from_path(path, object_store=None):
+def index_entry_from_path(
+ path: bytes, object_store: Optional["BaseObjectStore"] = None
+) -> Optional[IndexEntry]:
"""Create an index from a filesystem path.
This returns an index value for files, symlinks
def iter_fresh_entries(
- paths, root_path, object_store: Optional["BaseObjectStore"] = None
-):
+ paths: Iterable[bytes], root_path: bytes,
+ object_store: Optional["BaseObjectStore"] = None
+) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]:
"""Iterate over current versions of index entries on disk.
Args:
yield path, entry
-def iter_fresh_objects(paths, root_path, include_deleted=False, object_store=None):
+def iter_fresh_objects(
+ paths: Iterable[bytes], root_path: bytes, include_deleted=False,
+ object_store=None) -> Iterator[
+ Tuple[bytes, Optional[bytes], Optional[int]]]:
"""Iterate over versions of objects on disk referenced by index.
Args:
object_store: Optional object store to report new items to
Returns: Iterator over path, sha, mode
"""
- for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
+ for path, entry in iter_fresh_entries(
+ paths, root_path, object_store=object_store):
if entry is None:
if include_deleted:
yield path, None, None
yield path, entry.sha, cleanup_mode(entry.mode)
-def refresh_index(index, root_path):
+def refresh_index(index: Index, root_path: bytes):
"""Refresh the contents of an index.
This is the equivalent to running 'git commit -a'.
root_path: Root filesystem path
"""
for path, entry in iter_fresh_entries(index, root_path):
- index[path] = path
+ if entry:
+ index[path] = entry
class locked_index(object):
Works as a context manager.
"""
- def __init__(self, path):
+ def __init__(self, path: Union[bytes, str]):
self._path = path
def __enter__(self):
blob - 2a397a75c225d04b76d94183dc20829ddd6d475d
blob + 2b03c242ca820116b43f5a2e16229cdb980b79a6
--- dulwich/porcelain.py
+++ dulwich/porcelain.py
tag_time=None,
tag_timezone=None,
sign=False,
+ encoding=DEFAULT_ENCODING
):
"""Creates a tag in git via dulwich calls:
# TODO(jelmer): Don't use repo private method.
author = r._get_user_identity(r.get_config_stack())
tag_obj.tagger = author
- tag_obj.message = message + "\n".encode()
+ tag_obj.message = message + "\n".encode(encoding)
tag_obj.name = tag
tag_obj.object = (type(object), object.id)
if tag_time is None:
target: branch or commit or b'HEAD' to reset
"""
tree = parse_tree(repo, treeish=target)
- file_path = _fs_to_tree_path(file_path)
+ tree_path = _fs_to_tree_path(file_path)
- file_entry = tree.lookup_path(repo.object_store.__getitem__, file_path)
- full_path = os.path.join(repo.path.encode(), file_path)
+ file_entry = tree.lookup_path(repo.object_store.__getitem__, tree_path)
+ full_path = os.path.join(os.fsencode(repo.path), tree_path)
blob = repo.object_store[file_entry[1]]
mode = file_entry[0]
build_file_from_blob(blob, mode, full_path, symlink_fn=symlink_fn)
blob - 89439b4b22e7fee40a30893f00779eca2eb6a6db
blob + f3b5f7c86ba0b198dba68955cee7bf9ede03dd68
--- dulwich/repo.py
+++ dulwich/repo.py
raise
return target
- def reset_index(self, tree: Optional[Tree] = None):
+ def reset_index(self, tree: Optional[bytes] = None):
"""Reset the index back to a specific tree.
Args: