commit - cd0990259901351400deda93b529ec601cfa52e8
commit + e3ca1cb5cee7841ce6b1918c91e83120abbc8996
blob - 112e0cd238044a9aaee44fcdfb0c38888a751c0c
blob + f40324e037007282fdd2fe5735e7eb062f23b14b
--- Makefile
+++ Makefile
fix:
ruff check --fix .
+reformat:
+ ruff format .
+
.PHONY: codespell
codespell:
blob - 4bf310db173ef1c0b2e50aad8ff786d771595bd3
blob + 2862fff53091d3144524c96a8e6acbf4ecc82580
--- docs/conf.py
+++ docs/conf.py
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
-sys.path.insert(0, os.path.abspath('..'))
+sys.path.insert(0, os.path.abspath(".."))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__))))
-dulwich = __import__('dulwich')
+dulwich = __import__("dulwich")
# -- General configuration ----------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = [
- 'sphinx.ext.autodoc',
- 'sphinx.ext.ifconfig',
- 'sphinx.ext.intersphinx',
- 'sphinx.ext.napoleon',
+ "sphinx.ext.autodoc",
+ "sphinx.ext.ifconfig",
+ "sphinx.ext.intersphinx",
+ "sphinx.ext.napoleon",
]
autoclass_content = "both"
# Add any paths that contain templates here, relative to this directory.
-templates_path = ['templates']
+templates_path = ["templates"]
# The suffix of source filenames.
-source_suffix = '.txt'
+source_suffix = ".txt"
# The encoding of source files.
# source_encoding = 'utf-8'
# The master toctree document.
-master_doc = 'index'
+master_doc = "index"
# General information about the project.
-project = 'dulwich'
-copyright = '2011-2023 Jelmer Vernooij'
+project = "dulwich"
+copyright = "2011-2023 Jelmer Vernooij"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
-version = '.'.join(map(str, dulwich.__version__[:2]))
+version = ".".join(map(str, dulwich.__version__[:2]))
# The full version, including alpha/beta/rc tags.
-release = '.'.join(map(str, dulwich.__version__))
+release = ".".join(map(str, dulwich.__version__))
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
# List of directories, relative to source directory, that shouldn't be searched
# for source files.
-exclude_trees = ['build']
+exclude_trees = ["build"]
# The reST default role (used for this markup: `text`) to use for all
# documents.
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
-pygments_style = 'sphinx'
+pygments_style = "sphinx"
# A list of ignored prefixes for module index sorting.
# modindex_common_prefix = []
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
# html_theme = 'default'
-html_theme = 'agogo'
+html_theme = "agogo"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
-html_theme_path = ['theme']
+html_theme_path = ["theme"]
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
# html_file_suffix = ''
# Output file base name for HTML help builder.
-htmlhelp_basename = 'dulwichdoc'
+htmlhelp_basename = "dulwichdoc"
# -- Options for LaTeX output ------------------------------------------------
# (source start file, target name, title, author, documentclass
# [howto/manual]).
latex_documents = [
- ('index', 'dulwich.tex', 'dulwich Documentation',
- 'Jelmer Vernooij', 'manual'),
+ ("index", "dulwich.tex", "dulwich Documentation", "Jelmer Vernooij", "manual"),
]
# The name of an image file (relative to this directory) to place at the top of
# Add mappings
intersphinx_mapping = {
- 'urllib3': ('http://urllib3.readthedocs.org/en/latest', None),
- 'python': ('http://docs.python.org/3', None),
+ "urllib3": ("http://urllib3.readthedocs.org/en/latest", None),
+ "python": ("http://docs.python.org/3", None),
}
blob - fe74350b230cbf3cb5395c73050f31e543db7c14
blob + c3a2700e23044e0f99b5c29e1d6cd4262a7a2c9b
--- dulwich/archive.py
+++ dulwich/archive.py
info = tarfile.TarInfo()
# tarfile only works with ascii.
- info.name = entry_abspath.decode('utf-8', 'surrogateescape')
+ info.name = entry_abspath.decode("utf-8", "surrogateescape")
info.size = blob.raw_length()
info.mode = entry.mode
info.mtime = mtime
blob - 3683feab13b54b208fbc236c900b5345d4f6a8c8
blob + c44e5c471543269feea66f60a5c4f82d6bcc0add
--- dulwich/bundle.py
+++ dulwich/bundle.py
class Bundle:
-
version: Optional[int] = None
capabilities: Dict[str, str] = {}
pack_data: Union[PackData, Sequence[bytes]] = []
def __repr__(self) -> str:
- return (f"<{type(self).__name__}(version={self.version}, "
- f"capabilities={self.capabilities}, "
- f"prerequisites={self.prerequisites}, "
- f"references={self.references})>")
+ return (
+ f"<{type(self).__name__}(version={self.version}, "
+ f"capabilities={self.capabilities}, "
+ f"prerequisites={self.prerequisites}, "
+ f"references={self.references})>"
+ )
def __eq__(self, other):
if not isinstance(other, type(self)):
if value is not None:
f.write(b"=" + value.encode("utf-8"))
f.write(b"\n")
- for (obj_id, comment) in bundle.prerequisites:
+ for obj_id, comment in bundle.prerequisites:
f.write(b"-%s %s\n" % (obj_id, comment.encode("utf-8")))
for ref, obj_id in bundle.references.items():
f.write(b"%s %s\n" % (obj_id, ref))
f.write(b"\n")
- write_pack_data(f.write, num_records=len(bundle.pack_data), records=bundle.pack_data.iter_unpacked())
+ write_pack_data(
+ f.write,
+ num_records=len(bundle.pack_data),
+ records=bundle.pack_data.iter_unpacked(),
+ )
blob - b2e501398c2ac0e8c70e3c3da42c09761d60cac7
blob + 0dc94918575b94d7107bdd2fe76f4927bd9805d9
--- dulwich/cli.py
+++ dulwich/cli.py
type=str,
help="Retrieve archive from specified remote repo",
)
- parser.add_argument('committish', type=str, nargs='?')
+ parser.add_argument("committish", type=str, nargs="?")
args = parser.parse_args(args)
if args.remote:
client, path = get_transport_and_path(args.remote)
)
else:
porcelain.archive(
- ".", args.committish, outstream=sys.stdout.buffer,
- errstream=sys.stderr
+ ".", args.committish, outstream=sys.stdout.buffer, errstream=sys.stderr
)
class cmd_fetch_pack(Command):
def run(self, argv):
parser = argparse.ArgumentParser()
- parser.add_argument('--all', action='store_true')
- parser.add_argument('location', nargs='?', type=str)
+ parser.add_argument("--all", action="store_true")
+ parser.add_argument("location", nargs="?", type=str)
args = parser.parse_args(argv)
client, path = get_transport_and_path(args.location)
r = Repo(".")
def run(self, args):
opts, args = getopt(args, "", [])
opts = dict(opts)
- for (obj, msg) in porcelain.fsck("."):
+ for obj, msg in porcelain.fsck("."):
print(f"{obj}: {msg}")
r = Repo(".")
if args == []:
- commit_id = b'HEAD'
+ commit_id = b"HEAD"
else:
commit_id = args[0]
commit = parse_commit(r, commit_id)
parent_commit = r[commit.parents[0]]
porcelain.diff_tree(
- r, parent_commit.tree, commit.tree, outstream=sys.stdout.buffer)
+ r, parent_commit.tree, commit.tree, outstream=sys.stdout.buffer
+ )
class cmd_dump_pack(Command):
"--depth", dest="depth", type=int, help="Depth at which to fetch"
)
parser.add_option(
- "-b", "--branch", dest="branch", type=str,
- help=("Check out branch instead of branch pointed to by remote "
- "HEAD"))
+ "-b",
+ "--branch",
+ dest="branch",
+ type=str,
+ help=("Check out branch instead of branch pointed to by remote " "HEAD"),
+ )
options, args = parser.parse_args(args)
if args == []:
target = None
try:
- porcelain.clone(source, target, bare=options.bare, depth=options.depth,
- branch=options.branch)
+ porcelain.clone(
+ source,
+ target,
+ bare=options.bare,
+ depth=options.depth,
+ branch=options.branch,
+ )
except GitProtocolError as e:
print("%s" % e)
class cmd_pack_refs(Command):
def run(self, argv):
parser = argparse.ArgumentParser()
- parser.add_argument('--all', action='store_true')
+ parser.add_argument("--all", action="store_true")
# ignored, we never prune
- parser.add_argument('--no-prune', action='store_true')
+ parser.add_argument("--no-prune", action="store_true")
args = parser.parse_args(argv)
class cmd_show(Command):
def run(self, argv):
parser = argparse.ArgumentParser()
- parser.add_argument('objectish', type=str, nargs='*')
+ parser.add_argument("objectish", type=str, nargs="*")
args = parser.parse_args(argv)
porcelain.show(".", args.objectish or None)
idxf = open(basename + ".idx", "wb")
close = [packf, idxf]
porcelain.pack_objects(
- ".",
- object_ids,
- packf,
- idxf,
- deltify=deltify,
- reuse_deltas=reuse_deltas)
+ ".", object_ids, packf, idxf, deltify=deltify, reuse_deltas=reuse_deltas
+ )
for f in close:
f.close()
class cmd_push(Command):
-
def run(self, argv):
parser = argparse.ArgumentParser()
- parser.add_argument('-f', '--force', action='store_true', help='Force')
- parser.add_argument('to_location', type=str)
- parser.add_argument('refspec', type=str, nargs='*')
+ parser.add_argument("-f", "--force", action="store_true", help="Force")
+ parser.add_argument("to_location", type=str)
+ parser.add_argument("refspec", type=str, nargs="*")
args = parser.parse_args(argv)
try:
- porcelain.push('.', args.to_location, args.refspec or None, force=args.force)
+ porcelain.push(
+ ".", args.to_location, args.refspec or None, force=args.force
+ )
except porcelain.DivergedBranches:
- sys.stderr.write('Diverged branches; specify --force to override')
+ sys.stderr.write("Diverged branches; specify --force to override")
return 1
class SuperCommand(Command):
-
subcommands: Dict[str, Type[Command]] = {}
default_command: Optional[Type[Command]] = None
class cmd_remote(SuperCommand):
-
subcommands = {
"add": cmd_remote_add,
}
parser = argparse.ArgumentParser()
parser.parse_args(argv)
for path, sha in porcelain.submodule_list("."):
- sys.stdout.write(f' {sha} {path}\n')
+ sys.stdout.write(f" {sha} {path}\n")
class cmd_submodule_init(Command):
class cmd_submodule(SuperCommand):
-
subcommands = {
"init": cmd_submodule_init,
}
class cmd_stash(SuperCommand):
-
subcommands = {
"list": cmd_stash_list,
"pop": cmd_stash_pop,
blob - 8f373accb91528669d473c2efca813656e687647
blob + 80f0787ac2fa96941a04fe4a5016200784df452c
--- dulwich/client.py
+++ dulwich/client.py
"viewvalues",
]
- def __init__(self, refs, symrefs, agent, new_shallow=None, new_unshallow=None) -> None:
+ def __init__(
+ self, refs, symrefs, agent, new_shallow=None, new_unshallow=None
+ ) -> None:
self.refs = refs
self.symrefs = symrefs
self.agent = agent
class _v1ReceivePackHeader:
-
def __init__(self, capabilities, old_refs, new_refs) -> None:
self.want: List[bytes] = []
self.have: List[bytes] = []
if old_sha1 != new_sha1:
logger.debug(
- 'Sending updated ref %r: %r -> %r',
- refname, old_sha1, new_sha1)
+ "Sending updated ref %r: %r -> %r", refname, old_sha1, new_sha1
+ )
if self.sent_capabilities:
yield old_sha1 + b" " + new_sha1 + b" " + refname
else:
yield channel, pkt[1:]
-def _handle_upload_pack_head(
- proto, capabilities, graph_walker, wants, can_read, depth
-):
+def _handle_upload_pack_head(proto, capabilities, graph_walker, wants, can_read, depth):
"""Handle the head of a 'git-upload-pack' request.
Args:
"""
assert isinstance(wants, list) and isinstance(wants[0], bytes)
proto.write_pkt_line(
- COMMAND_WANT
- + b" "
- + wants[0]
- + b" "
- + b" ".join(sorted(capabilities))
- + b"\n"
+ COMMAND_WANT + b" " + wants[0] + b" " + b" ".join(sorted(capabilities)) + b"\n"
)
for want in wants[1:]:
proto.write_pkt_line(COMMAND_WANT + b" " + want + b"\n")
elif chan == SIDE_BAND_CHANNEL_PROGRESS:
progress(data)
else:
- raise AssertionError(
- "Invalid sideband channel %d" % chan)
+ raise AssertionError("Invalid sideband channel %d" % chan)
else:
while True:
data = proto.read(rbufsize)
"""
raise NotImplementedError(cls.from_parsedurl)
- def send_pack(self, path, update_refs, generate_pack_data: Callable[[Set[bytes], Set[bytes], bool], Tuple[int, Iterator[UnpackedObject]]], progress=None):
+ def send_pack(
+ self,
+ path,
+ update_refs,
+ generate_pack_data: Callable[
+ [Set[bytes], Set[bytes], bool], Tuple[int, Iterator[UnpackedObject]]
+ ],
+ progress=None,
+ ):
"""Upload a pack to a remote repository.
Args:
"""
raise NotImplementedError(self.send_pack)
- def clone(self, path, target_path, mkdir: bool = True, bare=False, origin="origin",
- checkout=None, branch=None, progress=None, depth=None):
+ def clone(
+ self,
+ path,
+ target_path,
+ mkdir: bool = True,
+ bare=False,
+ origin="origin",
+ checkout=None,
+ branch=None,
+ progress=None,
+ depth=None,
+ ):
"""Clone a repository."""
from .refs import _set_default_branch, _set_head, _set_origin_head
# TODO(jelmer): abstract method for get_location?
if isinstance(self, (LocalGitClient, SubprocessGitClient)):
- encoded_path = path.encode('utf-8')
+ encoded_path = path.encode("utf-8")
else:
- encoded_path = self.get_url(path).encode('utf-8')
+ encoded_path = self.get_url(path).encode("utf-8")
assert target is not None
target_config = target.get_config()
- target_config.set((b"remote", origin.encode('utf-8')), b"url", encoded_path)
+ target_config.set((b"remote", origin.encode("utf-8")), b"url", encoded_path)
target_config.set(
- (b"remote", origin.encode('utf-8')),
+ (b"remote", origin.encode("utf-8")),
b"fetch",
- b"+refs/heads/*:refs/remotes/" + origin.encode('utf-8') + b"/*",
+ b"+refs/heads/*:refs/remotes/" + origin.encode("utf-8") + b"/*",
)
target_config.write_to_path()
ref_message = b"clone: from " + encoded_path
result = self.fetch(path, target, progress=progress, depth=depth)
- _import_remote_refs(
- target.refs, origin, result.refs, message=ref_message)
+ _import_remote_refs(target.refs, origin, result.refs, message=ref_message)
origin_head = result.symrefs.get(b"HEAD")
- origin_sha = result.refs.get(b'HEAD')
+ origin_sha = result.refs.get(b"HEAD")
if origin_sha and not origin_head:
# set detached HEAD
target.refs[b"HEAD"] = origin_sha
head = origin_sha
else:
- _set_origin_head(target.refs, origin.encode('utf-8'), origin_head)
+ _set_origin_head(target.refs, origin.encode("utf-8"), origin_head)
head_ref = _set_default_branch(
- target.refs, origin.encode('utf-8'), origin_head, branch, ref_message
+ target.refs,
+ origin.encode("utf-8"),
+ origin_head,
+ branch,
+ ref_message,
)
# Update target head
target.close()
if mkdir:
import shutil
+
shutil.rmtree(target_path)
raise
return target
Callable[[Dict[bytes, bytes], Optional[int]], List[bytes]]
] = None,
progress: Optional[Callable[[bytes], None]] = None,
- depth: Optional[int] = None
+ depth: Optional[int] = None,
) -> FetchPackResult:
"""Fetch into a target repository.
determine_wants = target.object_store.determine_wants_all
if CAPABILITY_THIN_PACK in self._fetch_capabilities:
from tempfile import SpooledTemporaryFile
+
f: IO[bytes] = SpooledTemporaryFile(
- max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-',
- dir=getattr(target.object_store, 'path', None))
+ max_size=PACK_SPOOL_FILE_MAX_SIZE,
+ prefix="incoming-",
+ dir=getattr(target.object_store, "path", None),
+ )
def commit():
if f.tell():
elif chan == SIDE_BAND_CHANNEL_PROGRESS:
progress(data)
else:
- raise AssertionError(
- "Invalid sideband channel %d" % chan)
+ raise AssertionError("Invalid sideband channel %d" % chan)
else:
if CAPABILITY_REPORT_STATUS in capabilities:
assert self._report_status_parser
ref_status = None
return SendPackResult(old_refs, agent=agent, ref_status=ref_status)
- header_handler = _v1ReceivePackHeader(negotiated_capabilities, old_refs, new_refs)
+ header_handler = _v1ReceivePackHeader(
+ negotiated_capabilities, old_refs, new_refs
+ )
for pkt in header_handler:
proto.write_pkt_line(pkt)
)
if self._should_send_pack(new_refs):
- for chunk in PackChunkGenerator(pack_data_count, pack_data, progress=progress):
+ for chunk in PackChunkGenerator(
+ pack_data_count, pack_data, progress=progress
+ ):
proto.write(chunk)
ref_status = self._handle_receive_pack_tail(
)
s = None
err = OSError("no address found for %s" % self._host)
- for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
+ for family, socktype, proto, canonname, sockaddr in sockaddrs:
s = socket.socket(family, socktype, proto)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
class LocalGitClient(GitClient):
"""Git Client that just uses a local Repo."""
- def __init__(self, thin_packs=True, report_activity=None,
- config: Optional[Config] = None) -> None:
+ def __init__(
+ self, thin_packs=True, report_activity=None, config: Optional[Config] = None
+ ) -> None:
"""Create a new LocalGitClient instance.
Args:
@classmethod
def _open_repo(cls, path):
-
if not isinstance(path, str):
path = os.fsdecode(path)
return closing(Repo(path))
# Note that the client still expects a 0-object pack in most cases.
if object_ids is None:
return FetchPackResult(None, symrefs, agent)
- write_pack_from_container(pack_data, r.object_store, object_ids, other_haves=other_haves)
+ write_pack_from_container(
+ pack_data, r.object_store, object_ids, other_haves=other_haves
+ )
return FetchPackResult(r.get_refs(), symrefs, agent)
def get_refs(self, path):
key_filename=None,
ssh_command=None,
):
-
if password is not None:
raise NotImplementedError(
"Setting password not supported by SubprocessSSHVendor."
if ssh_command:
import shlex
- args = shlex.split(
- ssh_command, posix=(sys.platform != 'win32')) + ["-x"]
+
+ args = shlex.split(ssh_command, posix=(sys.platform != "win32")) + ["-x"]
else:
args = ["ssh", "-x"]
key_filename=None,
ssh_command=None,
):
-
if ssh_command:
import shlex
- args = shlex.split(
- ssh_command, posix=(sys.platform != 'win32')) + ["-ssh"]
+
+ args = shlex.split(ssh_command, posix=(sys.platform != "win32")) + ["-ssh"]
elif sys.platform == "win32":
args = ["plink.exe", "-ssh"]
else:
password=None,
key_filename=None,
ssh_command=None,
- **kwargs
+ **kwargs,
) -> None:
self.host = host
self.port = port
host=parsedurl.hostname,
port=parsedurl.port,
username=parsedurl.username,
- **kwargs
+ **kwargs,
)
def _get_cmd_path(self, cmd):
return "git/dulwich/%s" % ".".join([str(x) for x in dulwich.__version__])
-def default_urllib3_manager( # noqa: C901
- config, pool_manager_cls=None, proxy_manager_cls=None, base_url=None, **override_kwargs
+def default_urllib3_manager( # noqa: C901
+ config,
+ pool_manager_cls=None,
+ proxy_manager_cls=None,
+ base_url=None,
+ **override_kwargs,
) -> Union["urllib3.ProxyManager", "urllib3.PoolManager"]:
"""Return urllib3 connection pool manager.
if proxy_server:
if check_for_proxy_bypass(base_url):
proxy_server = None
-
+
if config is not None:
if proxy_server is None:
try:
headers = {"User-agent": user_agent}
kwargs = {
- "ca_certs" : ca_certs,
+ "ca_certs": ca_certs,
}
if ssl_verify is True:
kwargs["cert_reqs"] = "CERT_REQUIRED"
except ValueError:
hostname_ip = None
- no_proxy_values = no_proxy_str.split(',')
+ no_proxy_values = no_proxy_str.split(",")
for no_proxy_value in no_proxy_values:
no_proxy_value = no_proxy_value.strip()
if no_proxy_value:
no_proxy_value = no_proxy_value.lower()
- no_proxy_value = no_proxy_value.lstrip('.') # ignore leading dots
+ no_proxy_value = no_proxy_value.lstrip(
+ "."
+ ) # ignore leading dots
if hostname_ip:
# check if no_proxy_value is a ip network
try:
- no_proxy_value_network = ipaddress.ip_network(no_proxy_value, strict=False)
+ no_proxy_value_network = ipaddress.ip_network(
+ no_proxy_value, strict=False
+ )
except ValueError:
no_proxy_value_network = None
if no_proxy_value_network:
# if hostname is a ip address and no_proxy_value is a ip network -> check if ip address is part of network
if hostname_ip in no_proxy_value_network:
return True
-
- if no_proxy_value == '*':
+
+ if no_proxy_value == "*":
# '*' is special case for always bypass proxy
return True
if hostname == no_proxy_value:
return True
- no_proxy_value = '.' + no_proxy_value # add a dot to only match complete domains
+ no_proxy_value = (
+ "." + no_proxy_value
+ ) # add a dot to only match complete domains
if hostname.endswith(no_proxy_value):
return True
return False
base_url = urljoin(url, resp.redirect_location[: -len(tail)])
try:
- self.dumb = (
- resp.content_type is None
- or not resp.content_type.startswith("application/x-git-"))
+ self.dumb = resp.content_type is None or not resp.content_type.startswith(
+ "application/x-git-"
+ )
if not self.dumb:
proto = Protocol(read, None)
# The first line should mention the service
[pkt] = list(proto.read_pkt_seq())
except ValueError as exc:
raise GitProtocolError(
- "unexpected number of packets received") from exc
+ "unexpected number of packets received"
+ ) from exc
if pkt.rstrip(b"\n") != (b"# service=" + service):
raise GitProtocolError(
"unexpected first line %r from smart server" % pkt
if isinstance(data, bytes):
headers["Content-Length"] = str(len(data))
resp, read = self._http_request(url, headers, data)
- if resp.content_type.split(';')[0] != result_content_type:
+ if resp.content_type.split(";")[0] != result_content_type:
raise GitProtocolError(
"Invalid content-type from server: %s" % resp.content_type
)
raise NotImplementedError(self.fetch_pack)
def body_generator():
- header_handler = _v1ReceivePackHeader(negotiated_capabilities, old_refs, new_refs)
+ header_handler = _v1ReceivePackHeader(
+ negotiated_capabilities, old_refs, new_refs
+ )
for pkt in header_handler:
yield pkt_line(pkt)
pack_data_count, pack_data = generate_pack_data(
if self._should_send_pack(new_refs):
yield from PackChunkGenerator(pack_data_count, pack_data)
- resp, read = self._smart_request(
- "git-receive-pack", url, data=body_generator()
- )
+ resp, read = self._smart_request("git-receive-pack", url, data=body_generator())
try:
resp_proto = Protocol(read, None)
ref_status = self._handle_receive_pack_tail(
resp_proto = Protocol(read, None)
if new_shallow is None and new_unshallow is None:
(new_shallow, new_unshallow) = _read_shallow_updates(
- resp_proto.read_pkt_seq())
+ resp_proto.read_pkt_seq()
+ )
_handle_upload_pack_tail(
resp_proto,
negotiated_capabilities,
config=None,
username=None,
password=None,
- **kwargs
+ **kwargs,
) -> None:
self._username = username
self._password = password
self.config = config
- super().__init__(
- base_url=base_url, dumb=dumb, **kwargs)
+ super().__init__(base_url=base_url, dumb=dumb, **kwargs)
def _get_url(self, path):
if not isinstance(path, str):
def _http_request(self, url, headers=None, data=None):
import urllib3.exceptions
+
req_headers = self.pool_manager.headers.copy()
if headers is not None:
req_headers.update(headers)
try:
if data is None:
resp = self.pool_manager.request(
- "GET", url, headers=req_headers, preload_content=False)
+ "GET", url, headers=req_headers, preload_content=False
+ )
else:
resp = self.pool_manager.request(
"POST", url, headers=req_headers, body=data, preload_content=False
def get_transport_and_path_from_url(
- url: str, config: Optional[Config] = None,
- operation: Optional[str] = None, **kwargs) -> Tuple[GitClient, str]:
+ url: str, config: Optional[Config] = None, operation: Optional[str] = None, **kwargs
+) -> Tuple[GitClient, str]:
"""Obtain a git client from a URL.
Args:
url = apply_instead_of(config, url, push=(operation == "push"))
return _get_transport_and_path_from_url(
- url, config=config, operation=operation, **kwargs)
+ url, config=config, operation=operation, **kwargs
+ )
def _get_transport_and_path_from_url(url, config, operation, **kwargs):
location: str,
config: Optional[Config] = None,
operation: Optional[str] = None,
- **kwargs
+ **kwargs,
) -> Tuple[GitClient, str]:
"""Obtain a git client from a URL.
# First, try to parse it as a URL
try:
return _get_transport_and_path_from_url(
- location, config=config, operation=operation, **kwargs)
+ location, config=config, operation=operation, **kwargs
+ )
except ValueError:
pass
blob - 459222434fbf0bfce2e2a1c3a9f51b6d375db6bd
blob + b9dd9b4ca4d19f4931dcc3e00c44cffd80fe57b0
--- dulwich/cloud/gcs.py
+++ dulwich/cloud/gcs.py
class GcsObjectStore(BucketBasedObjectStore):
-
- def __init__(self, bucket, subpath='') -> None:
+ def __init__(self, bucket, subpath="") -> None:
super().__init__()
self.bucket = bucket
self.subpath = subpath
return f"{type(self).__name__}({self.bucket!r}, subpath={self.subpath!r})"
def _remove_pack(self, name):
- self.bucket.delete_blobs([
- posixpath.join(self.subpath, name) + '.' + ext
- for ext in ['pack', 'idx']])
+ self.bucket.delete_blobs(
+ [posixpath.join(self.subpath, name) + "." + ext for ext in ["pack", "idx"]]
+ )
def _iter_pack_names(self):
packs = {}
name, ext = posixpath.splitext(posixpath.basename(blob.name))
packs.setdefault(name, set()).add(ext)
for name, exts in packs.items():
- if exts == {'.pack', '.idx'}:
+ if exts == {".pack", ".idx"}:
yield name
def _load_pack_data(self, name):
- b = self.bucket.blob(posixpath.join(self.subpath, name + '.pack'))
+ b = self.bucket.blob(posixpath.join(self.subpath, name + ".pack"))
f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
b.download_to_file(f)
f.seek(0)
- return PackData(name + '.pack', f)
+ return PackData(name + ".pack", f)
def _load_pack_index(self, name):
- b = self.bucket.blob(posixpath.join(self.subpath, name + '.idx'))
+ b = self.bucket.blob(posixpath.join(self.subpath, name + ".idx"))
f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
b.download_to_file(f)
f.seek(0)
- return load_pack_index_file(name + '.idx', f)
+ return load_pack_index_file(name + ".idx", f)
def _get_pack(self, name):
return Pack.from_lazy_objects(
- lambda: self._load_pack_data(name),
- lambda: self._load_pack_index(name))
+ lambda: self._load_pack_data(name), lambda: self._load_pack_index(name)
+ )
def _upload_pack(self, basename, pack_file, index_file):
- idxblob = self.bucket.blob(posixpath.join(self.subpath, basename + '.idx'))
- datablob = self.bucket.blob(posixpath.join(self.subpath, basename + '.pack'))
+ idxblob = self.bucket.blob(posixpath.join(self.subpath, basename + ".idx"))
+ datablob = self.bucket.blob(posixpath.join(self.subpath, basename + ".pack"))
idxblob.upload_from_file(index_file)
datablob.upload_from_file(pack_file)
blob - 2bd10877cd808773a2d5a1605d932e55fd39faa9
blob + ce5f03f5c094a63703bd78ac07d78083752dccd2
--- dulwich/config.py
+++ dulwich/config.py
class CaseInsensitiveOrderedMultiDict(MutableMapping):
-
def __init__(self) -> None:
self._real: List[Any] = []
self._keyed: Dict[Any, Any] = {}
@classmethod
def make(cls, dict_in=None):
-
if isinstance(dict_in, cls):
return dict_in
raise ValueError("not a valid boolean string: %r" % value)
def set(
- self,
- section: SectionLike,
- name: NameLike,
- value: Union[ValueLike, bool]
+ self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
) -> None:
"""Set a configuration value.
values: Union[
MutableMapping[Section, MutableMapping[Name, Value]], None
] = None,
- encoding: Union[str, None] = None
+ encoding: Union[str, None] = None,
) -> None:
"""Create a new ConfigDict."""
if encoding is None:
def __getitem__(self, key: Section) -> MutableMapping[Name, Value]:
return self._values.__getitem__(key)
- def __setitem__(
- self,
- key: Section,
- value: MutableMapping[Name, Value]
- ) -> None:
+ def __setitem__(self, key: Section, value: MutableMapping[Name, Value]) -> None:
return self._values.__setitem__(key, value)
def __delitem__(self, key: Section) -> None:
return (parts[0], None, parts[1])
def _check_section_and_name(
- self,
- section: SectionLike,
- name: NameLike
+ self, section: SectionLike, name: NameLike
) -> Tuple[Section, Name]:
if not isinstance(section, tuple):
section = (section,)
return checked_section, name
- def get_multivar(
- self,
- section: SectionLike,
- name: NameLike
- ) -> Iterator[Value]:
+ def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
section, name = self._check_section_and_name(section, name)
if len(section) > 1:
self._values.setdefault(section)[name] = value
def items( # type: ignore[override]
- self,
- section: Section
+ self, section: Section
) -> Iterator[Tuple[Name, Value]]:
return self._values.get(section).items()
continue
if c == ord(b'"'):
in_quotes = not in_quotes
- if c == ord(b'\\'):
+ if c == ord(b"\\"):
escaped = True
- if c == ord(b']') and not in_quotes:
+ if c == ord(b"]") and not in_quotes:
last = i
break
else:
raise ValueError("expected trailing ]")
pts = line[1:last].split(b" ", 1)
- line = line[last + 1:]
+ line = line[last + 1 :]
section: Section
if len(pts) == 2:
if pts[1][:1] != b'"' or pts[1][-1:] != b'"':
values: Union[
MutableMapping[Section, MutableMapping[Name, Value]], None
] = None,
- encoding: Union[str, None] = None
+ encoding: Union[str, None] = None,
) -> None:
super().__init__(values=values, encoding=encoding)
self.path: Optional[str] = None
setting = None
continuation = None
for lineno, line in enumerate(f.readlines()):
- if lineno == 0 and line.startswith(b'\xef\xbb\xbf'):
+ if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
line = line[3:]
line = line.lstrip()
if setting is None:
"CurrentVersion\\Uninstall\\Git_is1"
)
else:
- subkey = (
- "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\"
- "Uninstall\\Git_is1"
- )
+ subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\" "Uninstall\\Git_is1"
for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore
with suppress(OSError):
pass
def set(
- self,
- section: SectionLike,
- name: NameLike,
- value: Union[ValueLike, bool]
+ self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
) -> None:
if self.writable is None:
raise NotImplementedError(self.set)
def iter_instead_of(config: Config, push: bool = False) -> Iterable[Tuple[str, str]]:
"""Iterate over insteadOf / pushInsteadOf values."""
for section in config.sections():
- if section[0] != b'url':
+ if section[0] != b"url":
continue
replacement = section[1]
try:
pass
for needle in needles:
assert isinstance(needle, bytes)
- yield needle.decode('utf-8'), replacement.decode('utf-8')
+ yield needle.decode("utf-8"), replacement.decode("utf-8")
def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
continue
if len(longest_needle) < len(needle):
longest_needle = needle
- updated_url = replacement + orig_url[len(needle):]
+ updated_url = replacement + orig_url[len(needle) :]
return updated_url
blob - 563b5982ed6cd2cd68b01cc1bc429a49df3b44b9
blob + 6b44eea3e32abc09e79ec6e38b048b56a3218052
--- dulwich/contrib/diffstat.py
+++ dulwich/contrib/diffstat.py
# only needs to detect git style diffs as this is for
# use with dulwich
-_git_header_name = re.compile(br"diff --git a/(.*) b/(.*)")
+_git_header_name = re.compile(rb"diff --git a/(.*) b/(.*)")
_GIT_HEADER_START = b"diff --git a/"
_GIT_BINARY_START = b"Binary file"
# properly interface with diffstat routine
-def _parse_patch(lines: List[bytes]) -> Tuple[List[bytes], List[bool], List[Tuple[int, int]]]:
+def _parse_patch(
+ lines: List[bytes]
+) -> Tuple[List[bytes], List[bool], List[Tuple[int, int]]]:
"""Parse a git style diff or patch to generate diff stats.
Args:
blob - 7008a819e16b041146a52dfcdcf1420132799791
blob + f3189a3c04aef30ef342851ab68cf441b1a6170b
--- dulwich/contrib/paramiko_vendor.py
+++ dulwich/contrib/paramiko_vendor.py
@property
def stderr(self):
- return self.channel.makefile_stderr('rb')
+ return self.channel.makefile_stderr("rb")
def can_read(self):
return self.channel.recv_ready()
password=None,
pkey=None,
key_filename=None,
- **kwargs
+ **kwargs,
):
-
client = paramiko.SSHClient()
connection_kwargs = {"hostname": host}
blob - 8bd4374066d07d2aff8249bf1b186a192fa4d7a2
blob + bd3e0767dfd307c8e79fa80683da9b4c2143f857
--- dulwich/contrib/requests_vendor.py
+++ dulwich/contrib/requests_vendor.py
class RequestsHttpGitClient(AbstractHttpGitClient):
def __init__(
- self,
- base_url,
- dumb=None,
- config=None,
- username=None,
- password=None,
- **kwargs
+ self, base_url, dumb=None, config=None, username=None, password=None, **kwargs
) -> None:
self._username = username
self._password = password
if username is not None:
self.session.auth = (username, password)
- super().__init__(
- base_url=base_url, dumb=dumb, **kwargs)
+ super().__init__(base_url=base_url, dumb=dumb, **kwargs)
def _http_request(self, url, headers=None, data=None, allow_compression=False):
req_headers = self.session.headers.copy()
session.verify = ssl_verify
if proxy_server:
- session.proxies.update({
- "http": proxy_server,
- "https": proxy_server
- })
+ session.proxies.update({"http": proxy_server, "https": proxy_server})
return session
blob - 813a91661506a2f57232eb1b0560d56bd36fd988
blob + a6f3b4739777292eba0a3967712e44a7fffe7039
--- dulwich/contrib/swift.py
+++ dulwich/contrib/swift.py
try:
confpath = os.environ["DULWICH_SWIFT_CFG"]
except KeyError as exc:
- raise Exception(
- "You need to specify a configuration file") from exc
+ raise Exception("You need to specify a configuration file") from exc
else:
confpath = path
if not os.path.isfile(confpath):
}
if len(sys.argv) < 2:
- print("Usage: {} <{}> [OPTIONS...]".format(sys.argv[0], "|".join(commands.keys())))
+ print(
+ "Usage: {} <{}> [OPTIONS...]".format(sys.argv[0], "|".join(commands.keys()))
+ )
sys.exit(1)
cmd = sys.argv[1]
blob - b7e58287978da42d654c6b1241630f506fe4784a
blob + 496987e20be3c5d236fe6e6c92b4bd371b0b99e7
--- dulwich/contrib/test_paramiko_vendor.py
+++ dulwich/contrib/test_paramiko_vendor.py
class Server(paramiko.ServerInterface):
"""http://docs.paramiko.org/en/2.4/api/server.html."""
+
def __init__(self, commands, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.commands = commands
return "password,publickey"
-USER = 'testuser'
-PASSWORD = 'test'
+USER = "testuser"
+PASSWORD = "test"
SERVER_KEY = """\
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAy/L1sSYAzxsMprtNXW4u/1jGXXkQmQ2xtmKVlR+RlIL3a1BH
@skipIf(not has_paramiko, "paramiko is not installed")
class ParamikoSSHVendorTests(TestCase):
-
def setUp(self):
import paramiko.transport
socket.setdefaulttimeout(10)
self.addCleanup(socket.setdefaulttimeout, None)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.bind(('127.0.0.1', 0))
+ self.socket.bind(("127.0.0.1", 0))
self.socket.listen(5)
self.addCleanup(self.socket.close)
self.port = self.socket.getsockname()[1]
self.transport.start_server(server=server)
def test_run_command_password(self):
- vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False,)
+ vendor = ParamikoSSHVendor(
+ allow_agent=False,
+ look_for_keys=False,
+ )
vendor.run_command(
- '127.0.0.1', 'test_run_command_password',
- username=USER, port=self.port, password=PASSWORD)
+ "127.0.0.1",
+ "test_run_command_password",
+ username=USER,
+ port=self.port,
+ password=PASSWORD,
+ )
- self.assertIn(b'test_run_command_password', self.commands)
+ self.assertIn(b"test_run_command_password", self.commands)
def test_run_command_with_privkey(self):
key = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
- vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False,)
+ vendor = ParamikoSSHVendor(
+ allow_agent=False,
+ look_for_keys=False,
+ )
vendor.run_command(
- '127.0.0.1', 'test_run_command_with_privkey',
- username=USER, port=self.port, pkey=key)
+ "127.0.0.1",
+ "test_run_command_with_privkey",
+ username=USER,
+ port=self.port,
+ pkey=key,
+ )
- self.assertIn(b'test_run_command_with_privkey', self.commands)
+ self.assertIn(b"test_run_command_with_privkey", self.commands)
def test_run_command_data_transfer(self):
- vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False,)
+ vendor = ParamikoSSHVendor(
+ allow_agent=False,
+ look_for_keys=False,
+ )
con = vendor.run_command(
- '127.0.0.1', 'test_run_command_data_transfer',
- username=USER, port=self.port, password=PASSWORD)
+ "127.0.0.1",
+ "test_run_command_data_transfer",
+ username=USER,
+ port=self.port,
+ password=PASSWORD,
+ )
- self.assertIn(b'test_run_command_data_transfer', self.commands)
+ self.assertIn(b"test_run_command_data_transfer", self.commands)
channel = self.transport.accept(5)
- channel.send(b'stdout\n')
- channel.send_stderr(b'stderr\n')
+ channel.send(b"stdout\n")
+ channel.send_stderr(b"stderr\n")
channel.close()
# Fixme: it's return false
# self.assertTrue(con.can_read())
- self.assertEqual(b'stdout\n', con.read(4096))
+ self.assertEqual(b"stdout\n", con.read(4096))
# Fixme: it's return empty string
# self.assertEqual(b'stderr\n', con.read_stderr(4096))
blob - 17ec8331d355b773a407f89f6a911b1bf723a234
blob + ff1352138d49a02f0b6895c0e0e453a7a9a01119
--- dulwich/contrib/test_swift.py
+++ dulwich/contrib/test_swift.py
def test_get_container_objects(self):
with patch(
"geventhttpclient.HTTPClient.request",
- lambda *args: Response(
- content=json.dumps(({"name": "a"}, {"name": "b"}))
- ),
+ lambda *args: Response(content=json.dumps(({"name": "a"}, {"name": "b"}))),
):
self.assertEqual(len(self.conn.get_container_objects()), 2)
blob - dc8e1bb38bdbae2daa71f7cd4eb0a62137018813
blob + 23960d0ee1e2a5bf0575ca5f1ae3cd883197ef9f
--- dulwich/diff_tree.py
+++ dulwich/diff_tree.py
for t in parent_tree_ids
]
num_parents = len(parent_tree_ids)
- changes_by_path: Dict[str, List[Optional[TreeChange]]] = defaultdict(lambda: [None] * num_parents)
+ changes_by_path: Dict[str, List[Optional[TreeChange]]] = defaultdict(
+ lambda: [None] * num_parents
+ )
# Organize by path.
for i, parent_changes in enumerate(all_parent_changes):
self._prune(add_paths, delete_paths)
def _should_find_content_renames(self):
- return len(self._adds) * len(self._deletes) <= self._max_files ** 2
+ return len(self._adds) * len(self._deletes) <= self._max_files**2
def _rename_type(self, check_paths, delete, add):
if check_paths and delete.old.path == add.new.path:
blob - 505d021200e2d4c0db55646e5dc4af5d26e3cb34
blob + 841d45552cad56a433c0d5833d42ee19008abe1f
--- dulwich/errors.py
+++ dulwich/errors.py
)
)
else:
- super().__init__(
- "The remote server unexpectedly closed the connection."
- )
+ super().__init__("The remote server unexpectedly closed the connection.")
self.stderr_lines = stderr_lines
def __eq__(self, other):
command = "flush-pkt"
else:
command = "command %s" % command
- super().__init__(
- "Protocol got unexpected %s" % command
- )
+ super().__init__("Protocol got unexpected %s" % command)
class FileFormatException(Exception):
blob - d15644714fa18bff1d2082c4f663bba67a9248c0
blob + d38dbcc9d1861328232c06866e9958c185877f6b
--- dulwich/file.py
+++ dulwich/file.py
self.abort()
def __del__(self) -> None:
- if not getattr(self, '_closed', True):
- warnings.warn('unclosed %r' % self, ResourceWarning, stacklevel=2)
+ if not getattr(self, "_closed", True):
+ warnings.warn("unclosed %r" % self, ResourceWarning, stacklevel=2)
self.abort()
def __enter__(self):
blob - 5db7ebdf83d217fa6d1d44b0d32e7971536e31f9
blob + 758bc97156ee9c180ecdd44356496dbefecc662f
--- dulwich/graph.py
+++ dulwich/graph.py
return None
def iter(self):
- for (pr, cmt) in self.pq:
+ for pr, cmt in self.pq:
yield (-pr, cmt)
pflags = cstates.get(pcmt, 0)
# if this parent was already visited with no new ancestry/flag information
# do not add it to the working list again
- if ((pflags & cflags) == cflags):
+ if (pflags & cflags) == cflags:
continue
pdt = lookup_stamp(pcmt)
if pdt < min_stamp:
blob - 42c5f98595401d3fb86439db8f1243dd5fadf28c
blob + 39ef19c14e8ef88f8feebe829f948bd438423796
--- dulwich/greenthreads.py
+++ dulwich/greenthreads.py
self.object_store = object_store
p = pool.Pool(size=concurrency)
- have_commits, have_tags = _split_commits_and_tags(object_store, haves, ignore_unknown=True, pool=p)
- want_commits, want_tags = _split_commits_and_tags(object_store, wants, ignore_unknown=False, pool=p)
- all_ancestors: FrozenSet[ObjectID] = frozenset(_collect_ancestors(object_store, have_commits)[0])
+ have_commits, have_tags = _split_commits_and_tags(
+ object_store, haves, ignore_unknown=True, pool=p
+ )
+ want_commits, want_tags = _split_commits_and_tags(
+ object_store, wants, ignore_unknown=False, pool=p
+ )
+ all_ancestors: FrozenSet[ObjectID] = frozenset(
+ _collect_ancestors(object_store, have_commits)[0]
+ )
missing_commits, common_commits = _collect_ancestors(
object_store, want_commits, all_ancestors
)
self.sha_done.add(t)
missing_tags = want_tags.difference(have_tags)
wants = missing_commits.union(missing_tags)
- self.objects_to_send: Set[Tuple[ObjectID, Optional[bytes], Optional[int], bool]] = {(w, None, 0, False) for w in wants}
+ self.objects_to_send: Set[
+ Tuple[ObjectID, Optional[bytes], Optional[int], bool]
+ ] = {(w, None, 0, False) for w in wants}
if progress is None:
self.progress = lambda x: None
else:
blob - 0d813ab98c56fcdf9572ebc54c665b6954f6e955
blob + cd543710d1165ab0a2fe6bab6235c452be7e23ba
--- dulwich/hooks.py
+++ dulwich/hooks.py
try:
ret = subprocess.call(
- [os.path.relpath(self.filepath, self.cwd)] + list(args),
- cwd=self.cwd)
+ [os.path.relpath(self.filepath, self.cwd)] + list(args), cwd=self.cwd
+ )
if ret != 0:
if self.post_exec_callback is not None:
self.post_exec_callback(0, *args)
if (p.returncode != 0) or err_data:
err_fmt = b"post-receive exit code: %d\n" + b"stdout:\n%s\nstderr:\n%s"
err_msg = err_fmt % (p.returncode, out_data, err_data)
- raise HookError(err_msg.decode('utf-8', 'backslashreplace'))
+ raise HookError(err_msg.decode("utf-8", "backslashreplace"))
return out_data
except OSError as err:
raise HookError(repr(err)) from err
blob - 8c0a5bd2fa9a507b28c2059c160f1e7b2ff3e1e7
blob + 2d4ca7e33fb67231eb63921c9928edfbbede5fa0
--- dulwich/ignore.py
+++ dulwich/ignore.py
class IgnoreFilter:
- def __init__(self, patterns: Iterable[bytes], ignorecase: bool = False, path=None) -> None:
+ def __init__(
+ self, patterns: Iterable[bytes], ignorecase: bool = False, path=None
+ ) -> None:
self._patterns: List[Pattern] = []
self._ignorecase = ignorecase
self._path = path
blob - e247941cfab9829e6e1a36cb9915c55e6761994c
blob + 739f5c4f6fba0aa77d91442fb5514a67807dc941
--- dulwich/index.py
+++ dulwich/index.py
this: Optional[IndexEntry]
other: Optional[IndexEntry]
- def __init__(self, ancestor: Optional[IndexEntry] = None,
- this: Optional[IndexEntry] = None,
- other: Optional[IndexEntry] = None) -> None:
+ def __init__(
+ self,
+ ancestor: Optional[IndexEntry] = None,
+ this: Optional[IndexEntry] = None,
+ other: Optional[IndexEntry] = None,
+ ) -> None:
self.ancestor = ancestor
self.this = this
self.other = other
) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
if flags & FLAG_EXTENDED:
if version < 3:
- raise AssertionError(
- 'extended flag set in index with version < 3')
- (extended_flags, ) = struct.unpack(">H", f.read(2))
+ raise AssertionError("extended flag set in index with version < 3")
+ (extended_flags,) = struct.unpack(">H", f.read(2))
else:
extended_flags = 0
name = f.read(flags & FLAG_NAMEMASK)
if entry.extended_flags:
flags |= FLAG_EXTENDED
if flags & FLAG_EXTENDED and version is not None and version < 3:
- raise AssertionError('unable to use extended flags in version < 3')
+ raise AssertionError("unable to use extended flags in version < 3")
f.write(
struct.pack(
b">LLLLLL20sH",
return ret
-def write_index(f: BinaryIO, entries: List[SerializedIndexEntry], version: Optional[int] = None):
+def write_index(
+ f: BinaryIO, entries: List[SerializedIndexEntry], version: Optional[int] = None
+):
"""Write an index file.
Args:
value = entries[key]
if isinstance(value, ConflictedIndexEntry):
if value.ancestor is not None:
- entries_list.append(value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR))
+ entries_list.append(
+ value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
+ )
if value.this is not None:
- entries_list.append(value.this.serialize(key, Stage.MERGE_CONFLICT_THIS))
+ entries_list.append(
+ value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
+ )
if value.other is not None:
- entries_list.append(value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER))
+ entries_list.append(
+ value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
+ )
else:
entries_list.append(value.serialize(key, Stage.NORMAL))
write_index(f, entries_list, version=version)
"""Remove all contents from this index."""
self._byname = {}
- def __setitem__(self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]) -> None:
+ def __setitem__(
+ self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
+ ) -> None:
assert isinstance(name, bytes)
self._byname[name] = value
def __delitem__(self, name: bytes) -> None:
del self._byname[name]
- def iteritems(self) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
+ def iteritems(
+ self
+ ) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
return iter(self._byname.items())
def items(self) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
yield from self._byname.keys()
def changes_from_tree(
- self, object_store, tree: ObjectID, want_unchanged: bool = False):
+ self, object_store, tree: ObjectID, want_unchanged: bool = False
+ ):
"""Find the differences between the contents of this index and a tree.
Args:
other_names = set(names)
if tree is not None:
- for (name, mode, sha) in iter_tree_contents(object_store, tree):
+ for name, mode, sha in iter_tree_contents(object_store, tree):
try:
(other_sha, other_mode) = lookup_entry(name)
except KeyError:
def index_entry_from_stat(
- stat_val, hex_sha: bytes, mode: Optional[int] = None,
+ stat_val,
+ hex_sha: bytes,
+ mode: Optional[int] = None,
):
"""Create a new index entry from a stat value.
)
-if sys.platform == 'win32':
+if sys.platform == "win32":
# On Windows, creating symlinks either requires administrator privileges
# or developer mode. Raise a more helpful error when we're unable to
# create symlinks
# https://github.com/jelmer/dulwich/issues/1005
class WindowsSymlinkPermissionError(PermissionError):
-
def __init__(self, errno, msg, filename) -> None:
super(PermissionError, self).__init__(
- errno, "Unable to create symlink; "
+ errno,
+ "Unable to create symlink; "
"do you have developer mode enabled? %s" % msg,
- filename)
+ filename,
+ )
def symlink(src, dst, target_is_directory=False, *, dir_fd=None):
try:
return os.symlink(
- src, dst, target_is_directory=target_is_directory,
- dir_fd=dir_fd)
+ src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
+ )
except PermissionError as e:
- raise WindowsSymlinkPermissionError(
- e.errno, e.strerror, e.filename) from e
+ raise WindowsSymlinkPermissionError(e.errno, e.strerror, e.filename) from e
else:
symlink = os.symlink
def build_file_from_blob(
- blob: Blob, mode: int, target_path: bytes, *, honor_filemode=True,
- tree_encoding="utf-8", symlink_fn=None
+ blob: Blob,
+ mode: int,
+ target_path: bytes,
+ *,
+ honor_filemode=True,
+ tree_encoding="utf-8",
+ symlink_fn=None,
):
"""Build a file or symlink on disk based on a Git object.
return True
-def validate_path(path: bytes,
- element_validator=validate_path_element_default) -> bool:
+def validate_path(path: bytes, element_validator=validate_path_element_default) -> bool:
"""Default path validator that just checks for .git/."""
parts = path.split(b"/")
for p in parts:
tree_id: bytes,
honor_filemode: bool = True,
validate_path_element=validate_path_element_default,
- symlink_fn=None
+ symlink_fn=None,
):
"""Generate and materialize index from a tree.
obj = object_store[entry.sha]
assert isinstance(obj, Blob)
st = build_file_from_blob(
- obj, entry.mode, full_path,
+ obj,
+ entry.mode,
+ full_path,
honor_filemode=honor_filemode,
symlink_fn=symlink_fn,
)
index.write()
-def blob_from_path_and_mode(fs_path: bytes, mode: int,
- tree_encoding="utf-8"):
+def blob_from_path_and_mode(fs_path: bytes, mode: int, tree_encoding="utf-8"):
"""Create a blob from a path and a stat object.
Args:
def get_unstaged_changes(
- index: Index, root_path: Union[str, bytes],
- filter_blob_callback=None):
+ index: Index, root_path: Union[str, bytes], filter_blob_callback=None
+):
"""Walk through an index and check for differences against working tree.
Args:
def index_entry_from_path(
- path: bytes, object_store: Optional[ObjectContainer] = None
+ path: bytes, object_store: Optional[ObjectContainer] = None
) -> Optional[IndexEntry]:
"""Create an index from a filesystem path.
def iter_fresh_entries(
- paths: Iterable[bytes], root_path: bytes,
- object_store: Optional[ObjectContainer] = None
+ paths: Iterable[bytes],
+ root_path: bytes,
+ object_store: Optional[ObjectContainer] = None,
) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]:
"""Iterate over current versions of index entries on disk.
def iter_fresh_objects(
- paths: Iterable[bytes], root_path: bytes, include_deleted=False,
- object_store=None) -> Iterator[
- Tuple[bytes, Optional[bytes], Optional[int]]]:
+ paths: Iterable[bytes], root_path: bytes, include_deleted=False, object_store=None
+) -> Iterator[Tuple[bytes, Optional[bytes], Optional[int]]]:
"""Iterate over versions of objects on disk referenced by index.
Args:
object_store: Optional object store to report new items to
Returns: Iterator over path, sha, mode
"""
- for path, entry in iter_fresh_entries(
- paths, root_path, object_store=object_store):
+ for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
if entry is None:
if include_deleted:
yield path, None, None
Works as a context manager.
"""
+
def __init__(self, path: Union[bytes, str]) -> None:
self._path = path
blob - b995ee140446db273c79f1ef9734cd5f1a6401e5
blob + adfa91bbbac097743b846275f41055a385103d44
--- dulwich/line_ending.py
+++ dulwich/line_ending.py
super().__init__(config_stack, git_attributes)
if tree:
self.existing_paths = {
- name
- for name, _, _ in iter_tree_contents(object_store, tree)
+ name for name, _, _ in iter_tree_contents(object_store, tree)
}
else:
self.existing_paths = set()
blob - c2e7702606ff584c63afb8407477903731d16013
blob + 3be452b3baeea2d1a98685e46cd1528eb994fdaa
--- dulwich/lru_cache.py
+++ dulwich/lru_cache.py
_null_key = object()
-K = TypeVar('K')
-V = TypeVar('V')
+K = TypeVar("K")
+V = TypeVar("V")
class _LRUNode(Generic[K, V]):
_least_recently_used: Optional[_LRUNode[K, V]]
_most_recently_used: Optional[_LRUNode[K, V]]
- def __init__(self, max_cache: int = 100, after_cleanup_count: Optional[int] = None) -> None:
+ def __init__(
+ self, max_cache: int = 100, after_cleanup_count: Optional[int] = None
+ ) -> None:
self._cache: Dict[K, _LRUNode[K, V]] = {}
# The "HEAD" of the lru linked list
self._most_recently_used = None
yield node
node = node_next
- def add(self, key: K, value: V, cleanup: Optional[Callable[[K, V], None]] = None) -> None:
+ def add(
+ self, key: K, value: V, cleanup: Optional[Callable[[K, V], None]] = None
+ ) -> None:
"""Add a new value to the cache.
Also, if the entry is ever removed from the cache, call
_compute_size: Callable[[V], int]
def __init__(
- self, max_size: int = 1024 * 1024, after_cleanup_size: Optional[int] = None,
- compute_size: Optional[Callable[[V], int]] = None
+ self,
+ max_size: int = 1024 * 1024,
+ after_cleanup_size: Optional[int] = None,
+ compute_size: Optional[Callable[[V], int]] = None,
) -> None:
"""Create a new LRUSizeCache.
self._update_max_size(max_size, after_cleanup_size=after_cleanup_size)
LRUCache.__init__(self, max_cache=max(int(max_size / 512), 1))
- def add(self, key: K, value: V, cleanup: Optional[Callable[[K, V], None]] = None) -> None:
+ def add(
+ self, key: K, value: V, cleanup: Optional[Callable[[K, V], None]] = None
+ ) -> None:
"""Add a new value to the cache.
Also, if the entry is ever removed from the cache, call
max_cache = max(int(max_size / 512), 1)
self._update_max_cache(max_cache)
- def _update_max_size(self, max_size: int, after_cleanup_size: Optional[int] = None) -> None:
+ def _update_max_size(
+ self, max_size: int, after_cleanup_size: Optional[int] = None
+ ) -> None:
self._max_size = max_size
if after_cleanup_size is None:
self._after_cleanup_size = self._max_size * 8 // 10
blob - 219f5007e1c8ac8cef96170dab85a70c12cc4d15
blob + 9ed9a88b44ed6227e0157170502c1ccd03567e0d
--- dulwich/mailmap.py
+++ dulwich/mailmap.py
def __init__(self, map=None) -> None:
self._table: Dict[Tuple[Optional[str], str], Tuple[str, str]] = {}
if map:
- for (canonical_identity, from_identity) in map:
+ for canonical_identity, from_identity in map:
self.add_entry(canonical_identity, from_identity)
def add_entry(self, canonical_identity, from_identity=None):
blob - 9c5ee3d7ade20fa307873f00832bd1fde03f6231
blob + f7bf664d47639394b6217062fb400795ee1dd6b6
--- dulwich/object_store.py
+++ dulwich/object_store.py
class PackContainer(Protocol):
-
- def add_pack(
- self
- ) -> Tuple[BytesIO, Callable[[], None], Callable[[], None]]:
+ def add_pack(self) -> Tuple[BytesIO, Callable[[], None], Callable[[], None]]:
"""Add a new pack."""
"""Object store interface."""
def determine_wants_all(
- self,
- refs: Dict[Ref, ObjectID],
- depth: Optional[int] = None
+ self, refs: Dict[Ref, ObjectID], depth: Optional[int] = None
) -> List[ObjectID]:
def _want_deepen(sha):
if not depth:
(oldpath, newpath), (oldmode, newmode), (oldsha, newsha)
"""
from .diff_tree import tree_changes
+
for change in tree_changes(
self,
source,
"""
warnings.warn(
"Please use dulwich.object_store.iter_tree_contents",
- DeprecationWarning, stacklevel=2)
+ DeprecationWarning,
+ stacklevel=2,
+ )
return iter_tree_contents(self, tree_id, include_trees=include_trees)
- def iterobjects_subset(self, shas: Iterable[bytes], *, allow_missing: bool = False) -> Iterator[ShaFile]:
+ def iterobjects_subset(
+ self, shas: Iterable[bytes], *, allow_missing: bool = False
+ ) -> Iterator[ShaFile]:
for sha in shas:
try:
yield self[sha]
commit.
Returns: Iterator over (sha, path) pairs.
"""
- warnings.warn(
- 'Please use MissingObjectFinder(store)', DeprecationWarning)
+ warnings.warn("Please use MissingObjectFinder(store)", DeprecationWarning)
finder = MissingObjectFinder(
self,
haves=haves,
return haves
def generate_pack_data(
- self, have, want, shallow=None, progress=None,
- ofs_delta=True
+ self, have, want, shallow=None, progress=None, ofs_delta=True
) -> Tuple[int, Iterator[UnpackedObject]]:
"""Generate pack data objects for a set of wants/haves.
# Note that the pack-specific implementation below is more efficient,
# as it reuses deltas
missing_objects = MissingObjectFinder(
- self, haves=have, wants=want, shallow=shallow, progress=progress)
+ self, haves=have, wants=want, shallow=shallow, progress=progress
+ )
object_ids = list(missing_objects)
return pack_objects_to_data(
- [(self[oid], path) for oid, path in object_ids], ofs_delta=ofs_delta,
- progress=progress)
+ [(self[oid], path) for oid, path in object_ids],
+ ofs_delta=ofs_delta,
+ progress=progress,
+ )
def peel_sha(self, sha):
"""Peel all tags from a SHA.
"""
warnings.warn(
"Please use dulwich.object_store.peel_sha()",
- DeprecationWarning, stacklevel=2)
+ DeprecationWarning,
+ stacklevel=2,
+ )
return peel_sha(self, sha)[1]
def _get_depth(
- self, head, get_parents=lambda commit: commit.parents, max_depth=None,
+ self,
+ head,
+ get_parents=lambda commit: commit.parents,
+ max_depth=None,
):
"""Return the current available depth for the given head.
For commits with multiple parents, the largest possible depth will be
_cls, sha = cmt.object
cmt = self[sha]
queue.extend(
- (parent, depth + 1)
- for parent in get_parents(cmt)
- if parent in self
+ (parent, depth + 1) for parent in get_parents(cmt) if parent in self
)
return current_depth
self._pack_cache: Dict[str, Pack] = {}
self.pack_compression_level = pack_compression_level
- def add_pack(
- self
- ) -> Tuple[BytesIO, Callable[[], None], Callable[[], None]]:
+ def add_pack(self) -> Tuple[BytesIO, Callable[[], None], Callable[[], None]]:
"""Add a new pack to this object store."""
raise NotImplementedError(self.add_pack)
- def add_pack_data(self, count: int, unpacked_objects: Iterator[UnpackedObject], progress=None) -> None:
+ def add_pack_data(
+ self, count: int, unpacked_objects: Iterator[UnpackedObject], progress=None
+ ) -> None:
"""Add pack data to this object store.
Args:
prev_pack.close()
def generate_pack_data(
- self, have, want, shallow=None, progress=None,
- ofs_delta=True
+ self, have, want, shallow=None, progress=None, ofs_delta=True
) -> Tuple[int, Iterator[UnpackedObject]]:
"""Generate pack data objects for a set of wants/haves.
progress: Optional progress reporting method
"""
missing_objects = MissingObjectFinder(
- self, haves=have, wants=want, shallow=shallow, progress=progress)
+ self, haves=have, wants=want, shallow=shallow, progress=progress
+ )
remote_has = missing_objects.get_remote_has()
object_ids = list(missing_objects)
return len(object_ids), generate_unpacked_objects(
object_ids,
progress=progress,
ofs_delta=ofs_delta,
- other_haves=remote_has)
+ other_haves=remote_has,
+ )
def _clear_cached_packs(self):
pack_cache = self._pack_cache
pass
raise KeyError(hexsha)
- def iter_unpacked_subset(self, shas, *, include_comp=False, allow_missing: bool = False, convert_ofs_delta: bool = True) -> Iterator[ShaFile]:
+ def iter_unpacked_subset(
+ self,
+ shas,
+ *,
+ include_comp=False,
+ allow_missing: bool = False,
+ convert_ofs_delta: bool = True,
+ ) -> Iterator[ShaFile]:
todo: Set[bytes] = set(shas)
for p in self._iter_cached_packs():
- for unpacked in p.iter_unpacked_subset(todo, include_comp=include_comp, allow_missing=True, convert_ofs_delta=convert_ofs_delta):
+ for unpacked in p.iter_unpacked_subset(
+ todo,
+ include_comp=include_comp,
+ allow_missing=True,
+ convert_ofs_delta=convert_ofs_delta,
+ ):
yield unpacked
hexsha = sha_to_hex(unpacked.sha())
todo.remove(hexsha)
# Maybe something else has added a pack with the object
# in the mean time?
for p in self._update_pack_cache():
- for unpacked in p.iter_unpacked_subset(todo, include_comp=include_comp, allow_missing=True, convert_ofs_delta=convert_ofs_delta):
+ for unpacked in p.iter_unpacked_subset(
+ todo,
+ include_comp=include_comp,
+ allow_missing=True,
+ convert_ofs_delta=convert_ofs_delta,
+ ):
yield unpacked
hexsha = sha_to_hex(unpacked.sha())
todo.remove(hexsha)
for alternate in self.alternates:
- for unpacked in alternate.iter_unpacked_subset(todo, include_comp=include_comp, allow_missing=True, convert_ofs_delta=convert_ofs_delta):
+ for unpacked in alternate.iter_unpacked_subset(
+ todo,
+ include_comp=include_comp,
+ allow_missing=True,
+ convert_ofs_delta=convert_ofs_delta,
+ ):
yield unpacked
hexsha = sha_to_hex(unpacked.sha())
todo.remove(hexsha)
- def iterobjects_subset(self, shas: Iterable[bytes], *, allow_missing: bool = False) -> Iterator[ShaFile]:
+ def iterobjects_subset(
+ self, shas: Iterable[bytes], *, allow_missing: bool = False
+ ) -> Iterator[ShaFile]:
todo: Set[bytes] = set(shas)
for p in self._iter_cached_packs():
for o in p.iterobjects_subset(todo, allow_missing=True):
elif not allow_missing:
raise KeyError(oid)
- def get_unpacked_object(self, sha1: bytes, *, include_comp: bool = False) -> UnpackedObject:
+ def get_unpacked_object(
+ self, sha1: bytes, *, include_comp: bool = False
+ ) -> UnpackedObject:
"""Obtain the unpacked object.
Args:
raise KeyError(hexsha)
def add_objects(
- self, objects: Sequence[Tuple[ShaFile, Optional[str]]],
- progress: Optional[Callable[[str], None]] = None) -> None:
+ self,
+ objects: Sequence[Tuple[ShaFile, Optional[str]]],
+ progress: Optional[Callable[[str], None]] = None,
+ ) -> None:
"""Add a set of objects to this object store.
Args:
class DiskObjectStore(PackBasedObjectStore):
"""Git-style object store that exists on disk."""
- def __init__(self, path, loose_compression_level=-1, pack_compression_level=-1) -> None:
+ def __init__(
+ self, path, loose_compression_level=-1, pack_compression_level=-1
+ ) -> None:
"""Open an object store.
Args:
loose_compression_level: zlib compression level for loose objects
pack_compression_level: zlib compression level for pack objects
"""
- super().__init__(
- pack_compression_level=pack_compression_level
- )
+ super().__init__(pack_compression_level=pack_compression_level)
self.path = path
self.pack_dir = os.path.join(self.path, PACKDIR)
self._alternates = None
entries = []
for i, entry in enumerate(indexer):
if progress is not None:
- progress(("generating index: %d/%d\r" % (i, num_objects)).encode('ascii'))
+ progress(
+ ("generating index: %d/%d\r" % (i, num_objects)).encode("ascii")
+ )
entries.append(entry)
pack_sha, extra_entries = extend_pack(
- f, indexer.ext_refs(), get_raw=self.get_raw, compression_level=self.pack_compression_level,
- progress=progress)
+ f,
+ indexer.ext_refs(),
+ get_raw=self.get_raw,
+ compression_level=self.pack_compression_level,
+ progress=progress,
+ )
f.flush()
try:
fileno = f.fileno()
if f.tell() > 0:
f.seek(0)
with PackData(path, f) as pd:
- indexer = PackIndexer.for_pack_data(pd, resolve_ext_ref=self.get_raw)
+ indexer = PackIndexer.for_pack_data(
+ pd, resolve_ext_ref=self.get_raw
+ )
return self._complete_pack(f, path, len(pd), indexer)
else:
f.close()
call when the pack is finished.
"""
from tempfile import SpooledTemporaryFile
- f = SpooledTemporaryFile(
- max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-')
+ f = SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix="incoming-")
+
def commit():
size = f.tell()
if size > 0:
return f, commit, abort
- def add_pack_data(self, count: int, unpacked_objects: Iterator[UnpackedObject], progress=None) -> None:
+ def add_pack_data(
+ self, count: int, unpacked_objects: Iterator[UnpackedObject], progress=None
+ ) -> None:
"""Add pack data to this object store.
Args:
return tree.lookup_path(lookup_obj, path)
-def _collect_filetree_revs(obj_store: ObjectContainer, tree_sha: ObjectID, kset: Set[ObjectID]) -> None:
+def _collect_filetree_revs(
+ obj_store: ObjectContainer, tree_sha: ObjectID, kset: Set[ObjectID]
+) -> None:
"""Collect SHA1s of files and directories for specified tree.
Args:
_collect_filetree_revs(obj_store, sha, kset)
-def _split_commits_and_tags(obj_store: ObjectContainer, lst, *, ignore_unknown=False) -> Tuple[Set[bytes], Set[bytes], Set[bytes]]:
+def _split_commits_and_tags(
+ obj_store: ObjectContainer, lst, *, ignore_unknown=False
+) -> Tuple[Set[bytes], Set[bytes], Set[bytes]]:
"""Split object id list into three lists with commit, tag, and other SHAs.
Commits referenced by tags are included into commits
# all_ancestors is a set of commits that shall not be sent
# (complete repository up to 'haves')
all_ancestors = _collect_ancestors(
- object_store,
- have_commits, shallow=shallow, get_parents=self._get_parents
+ object_store, have_commits, shallow=shallow, get_parents=self._get_parents
)[0]
# all_missing - complete set of commits between haves and wants
# common - commits from all_ancestors we hit into while
# in fact, what we 'want' is commits, tags, and others
# we've found missing
- self.objects_to_send: Set[Tuple[ObjectID, Optional[bytes], Optional[int], bool]] = {
- (w, None, Commit.type_num, False)
- for w in missing_commits}
+ self.objects_to_send: Set[
+ Tuple[ObjectID, Optional[bytes], Optional[int], bool]
+ ] = {(w, None, Commit.type_num, False) for w in missing_commits}
missing_tags = want_tags.difference(have_tags)
self.objects_to_send.update(
- {(w, None, Tag.type_num, False)
- for w in missing_tags})
+ {(w, None, Tag.type_num, False) for w in missing_tags}
+ )
missing_others = want_others.difference(have_others)
- self.objects_to_send.update(
- {(w, None, None, False)
- for w in missing_others})
+ self.objects_to_send.update({(w, None, None, False) for w in missing_others})
if progress is None:
self.progress = lambda x: None
def get_remote_has(self):
return self.remote_has
- def add_todo(self, entries: Iterable[Tuple[ObjectID, Optional[bytes], Optional[int], bool]]):
+ def add_todo(
+ self, entries: Iterable[Tuple[ObjectID, Optional[bytes], Optional[int], bool]]
+ ):
self.objects_to_send.update([e for e in entries if e[0] not in self.sha_done])
def __next__(self) -> Tuple[bytes, Optional[PackHint]]:
while True:
if not self.objects_to_send:
- self.progress(("counting objects: %d, done.\n" % len(self.sha_done)).encode("ascii"))
+ self.progress(
+ ("counting objects: %d, done.\n" % len(self.sha_done)).encode(
+ "ascii"
+ )
+ )
raise StopIteration
(sha, name, type_num, leaf) = self.objects_to_send.pop()
if sha not in self.sha_done:
elif isinstance(o, Tree):
self.add_todo(
[
- (s, n, (Blob.type_num if stat.S_ISREG(m) else Tree.type_num),
- not stat.S_ISDIR(m))
+ (
+ s,
+ n,
+ (Blob.type_num if stat.S_ISREG(m) else Tree.type_num),
+ not stat.S_ISDIR(m),
+ )
for n, m, s in o.iteritems()
if not S_ISGITLINK(m)
]
self.add_todo([(self._tagged[sha], None, None, True)])
self.sha_done.add(sha)
if len(self.sha_done) % 1000 == 0:
- self.progress(("counting objects: %d\r" % len(self.sha_done)).encode("ascii"))
+ self.progress(
+ ("counting objects: %d\r" % len(self.sha_done)).encode("ascii")
+ )
if type_num is None:
pack_hint = None
else:
# TODO(jelmer): Save up the objects and add them using .add_objects
# rather than with individual calls to .add_object.
nested_changes = {}
- for (path, new_mode, new_sha) in changes:
+ for path, new_mode, new_sha in changes:
try:
(dirname, subpath) = path.split(b"/", 1)
except ValueError:
yield o_id
done.add(o_id)
- def iterobjects_subset(self, shas: Iterable[bytes], *, allow_missing: bool = False) -> Iterator[ShaFile]:
+ def iterobjects_subset(
+ self, shas: Iterable[bytes], *, allow_missing: bool = False
+ ) -> Iterator[ShaFile]:
todo = set(shas)
for b in self.bases:
for o in b.iterobjects_subset(todo, allow_missing=True):
if todo and not allow_missing:
raise KeyError(o.id)
- def iter_unpacked_subset(self, shas: Iterable[bytes], *, include_comp=False, allow_missing: bool = False, convert_ofs_delta=True) -> Iterator[ShaFile]:
+ def iter_unpacked_subset(
+ self,
+ shas: Iterable[bytes],
+ *,
+ include_comp=False,
+ allow_missing: bool = False,
+ convert_ofs_delta=True,
+ ) -> Iterator[ShaFile]:
todo = set(shas)
for b in self.bases:
- for o in b.iter_unpacked_subset(todo, include_comp=include_comp, allow_missing=True, convert_ofs_delta=convert_ofs_delta):
+ for o in b.iter_unpacked_subset(
+ todo,
+ include_comp=include_comp,
+ allow_missing=True,
+ convert_ofs_delta=convert_ofs_delta,
+ ):
yield o
todo.remove(o.id)
if todo and not allow_missing:
import tempfile
pf = tempfile.SpooledTemporaryFile(
- max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-')
+ max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix="incoming-"
+ )
def commit():
if pf.tell() == 0:
pf.seek(0)
p = PackData(pf.name, pf)
entries = p.sorted_entries()
- basename = iter_sha1(entry[0] for entry in entries).decode('ascii')
+ basename = iter_sha1(entry[0] for entry in entries).decode("ascii")
idxf = tempfile.SpooledTemporaryFile(
- max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-')
+ max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix="incoming-"
+ )
checksum = p.get_stored_checksum()
write_pack_index(idxf, entries, checksum)
idxf.seek(0)
- idx = load_pack_index_file(basename + '.idx', idxf)
+ idx = load_pack_index_file(basename + ".idx", idxf)
for pack in self.packs:
if pack.get_stored_checksum() == p.get_stored_checksum():
p.close()
def iter_tree_contents(
- store: ObjectContainer, tree_id: Optional[ObjectID], *, include_trees: bool = False):
+ store: ObjectContainer, tree_id: Optional[ObjectID], *, include_trees: bool = False
+):
"""Iterate the contents of a tree and all subtrees.
Iteration is depth-first pre-order, as in e.g. os.walk.
blob - ff5acb5f1c9aa30f1b7df749037340d67de21032
blob + 1b32a34d61ef3d2323db074d59965b2b292c0eae
--- dulwich/objects.py
+++ dulwich/objects.py
identity: Identity string
error_msg: Error message to use in exception
"""
- email_start = identity.find(b'<')
- email_end = identity.find(b'>')
- if not all([
- email_start >= 1,
- identity[email_start - 1] == b' '[0],
- identity.find(b'<', email_start + 1) == -1,
- email_end == len(identity) - 1,
- b'\0' not in identity,
- b'\n' not in identity,
- ]):
+ email_start = identity.find(b"<")
+ email_end = identity.find(b">")
+ if not all(
+ [
+ email_start >= 1,
+ identity[email_start - 1] == b" "[0],
+ identity.find(b"<", email_start + 1) == -1,
+ email_end == len(identity) - 1,
+ b"\0" not in identity,
+ b"\n" not in identity,
+ ]
+ ):
raise ObjectFormatException(error_msg)
try:
int(size) # sanity check
except ValueError as exc:
- raise ObjectFormatException(
- "Object size not an integer: %s" % exc) from exc
+ raise ObjectFormatException("Object size not an integer: %s" % exc) from exc
obj_class = object_class(type_name)
if not obj_class:
- raise ObjectFormatException("Not a known type: %s" % type_name.decode('ascii'))
+ raise ObjectFormatException(
+ "Not a known type: %s" % type_name.decode("ascii")
+ )
return obj_class()
def _parse_legacy_object(self, map) -> None:
raise ObjectFormatException("Invalid object header, no \\0")
self.set_raw_string(text[header_end + 1 :])
- def as_legacy_object_chunks(
- self, compression_level: int = -1) -> Iterator[bytes]:
+ def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
"""Return chunks representing the object in the experimental format.
Returns: List of strings
"""Return a string representing this object, fit for display."""
return self.as_raw_string()
- def set_raw_string(
- self, text: bytes, sha: Optional[ObjectID] = None) -> None:
+ def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None:
"""Set the contents of this object from a serialized string."""
if not isinstance(text, bytes):
raise TypeError("Expected bytes for text, got %r" % text)
self.set_raw_chunks([text], sha)
def set_raw_chunks(
- self, chunks: List[bytes],
- sha: Optional[ObjectID] = None) -> None:
+ self, chunks: List[bytes], sha: Optional[ObjectID] = None
+ ) -> None:
"""Set the contents of this object from a list of chunks."""
self._chunked_text = chunks
self._deserialize(chunks)
@staticmethod
def from_raw_chunks(
- type_num: int, chunks: List[bytes],
- sha: Optional[ObjectID] = None):
+ type_num: int, chunks: List[bytes], sha: Optional[ObjectID] = None
+ ):
"""Creates an object of the indicated type from the raw chunks given.
Args:
"""Create a new copy of this SHA1 object from its raw string."""
obj_class = object_class(self.type_num)
if obj_class is None:
- raise AssertionError('invalid type num %d' % self.type_num)
+ raise AssertionError("invalid type num %d" % self.type_num)
return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
@property
return ret
-def _parse_message(chunks: Iterable[bytes]) -> Iterator[Union[Tuple[None, None], Tuple[Optional[bytes], bytes]]]:
+def _parse_message(
+ chunks: Iterable[bytes]
+) -> Iterator[Union[Tuple[None, None], Tuple[Optional[bytes], bytes]]]:
"""Parse a message with a list of fields and a body.
Args:
if self._tag_time is None:
headers.append((_TAGGER_HEADER, self._tagger))
else:
- headers.append((_TAGGER_HEADER, format_time_entry(
- self._tagger, self._tag_time,
- (self._tag_timezone, self._tag_timezone_neg_utc))))
+ headers.append(
+ (
+ _TAGGER_HEADER,
+ format_time_entry(
+ self._tagger,
+ self._tag_time,
+ (self._tag_timezone, self._tag_timezone_neg_utc),
+ ),
+ )
+ )
if self.message is None and self._signature is None:
body = None
def sign(self, keyid: Optional[str] = None):
import gpg
+
with gpg.Context(armor=True) as c:
if keyid is not None:
key = c.get_key(keyid)
signature=self._signature,
)
if keyids:
- keys = [
- ctx.get_key(key)
- for key in keyids
- ]
+ keys = [ctx.get_key(key) for key in keyids]
for key in keys:
for subkey in keys:
for sig in result.signatures:
if subkey.can_sign and subkey.fpr == sig.fpr:
return
- raise gpg.errors.MissingSignatures(
- result, keys, results=(data, result)
- )
+ raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])):
try:
mode = int(mode_text, 8)
except ValueError as exc:
- raise ObjectFormatException(
- "Invalid mode '%s'" % mode_text) from exc
+ raise ObjectFormatException("Invalid mode '%s'" % mode_text) from exc
name_end = text.index(b"\0", mode_end)
name = text[mode_end + 1 : name_end]
count = name_end + 21
if not p:
continue
if mode is not None and S_ISGITLINK(mode):
- raise SubmoduleEncountered(b'/'.join(parts[:i]), sha)
+ raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
obj = lookup_obj(sha)
if not isinstance(obj, Tree):
raise NotTreeError(sha)
def format_time_entry(person, time, timezone_info):
"""Format an event."""
(timezone, timezone_neg_utc) = timezone_info
- return b" ".join([
- person,
- str(time).encode("ascii"),
- format_timezone(timezone, timezone_neg_utc)])
+ return b" ".join(
+ [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
+ )
def parse_commit(chunks):
Returns: Tuple of (tree, parents, author_info, commit_info,
encoding, mergetag, gpgsig, message, extra)
"""
- warnings.warn('parse_commit will be removed in 0.22', DeprecationWarning)
+ warnings.warn("parse_commit will be removed in 0.22", DeprecationWarning)
parents = []
extra = []
tree = None
def sign(self, keyid: Optional[str] = None):
import gpg
+
with gpg.Context(armor=True) as c:
if keyid is not None:
key = c.get_key(keyid)
signature=self._gpgsig,
)
if keyids:
- keys = [
- ctx.get_key(key)
- for key in keyids
- ]
+ keys = [ctx.get_key(key) for key in keyids]
for key in keys:
for subkey in keys:
for sig in result.signatures:
if subkey.can_sign and subkey.fpr == sig.fpr:
return
- raise gpg.errors.MissingSignatures(
- result, keys, results=(data, result)
- )
+ raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
def _serialize(self):
headers = []
headers.append((_TREE_HEADER, tree_bytes))
for p in self._parents:
headers.append((_PARENT_HEADER, p))
- headers.append((
- _AUTHOR_HEADER,
- format_time_entry(
- self._author, self._author_time,
- (self._author_timezone, self._author_timezone_neg_utc))))
- headers.append((
- _COMMITTER_HEADER,
- format_time_entry(
- self._committer, self._commit_time,
- (self._commit_timezone, self._commit_timezone_neg_utc))))
+ headers.append(
+ (
+ _AUTHOR_HEADER,
+ format_time_entry(
+ self._author,
+ self._author_time,
+ (self._author_timezone, self._author_timezone_neg_utc),
+ ),
+ )
+ )
+ headers.append(
+ (
+ _COMMITTER_HEADER,
+ format_time_entry(
+ self._committer,
+ self._commit_time,
+ (self._commit_timezone, self._commit_timezone_neg_utc),
+ ),
+ )
+ )
if self.encoding:
headers.append((_ENCODING_HEADER, self.encoding))
for mergetag in self.mergetag:
def _get_extra(self):
"""Return extra settings of this commit."""
warnings.warn(
- 'Commit.extra is deprecated. Use Commit._extra instead.',
- DeprecationWarning, stacklevel=2)
+ "Commit.extra is deprecated. Use Commit._extra instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
return self._extra
extra = property(
blob - b3ae58e50cf3d34b011657be81b3f29bfcc134d3
blob + edaa5a43487acf0952dc4fe1a80aa1bf3b6ce890
--- dulwich/objectspec.py
+++ dulwich/objectspec.py
return o
-def parse_ref(container: Union["Repo", "RefsContainer"], refspec: Union[str, bytes]) -> "Ref":
+def parse_ref(
+ container: Union["Repo", "RefsContainer"], refspec: Union[str, bytes]
+) -> "Ref":
"""Parse a string referring to a reference.
Args:
def parse_reftuple(
- lh_container: Union["Repo", "RefsContainer"],
- rh_container: Union["Repo", "RefsContainer"], refspec: Union[str, bytes],
- force: bool = False) -> Tuple[Optional["Ref"], Optional["Ref"], bool]:
+ lh_container: Union["Repo", "RefsContainer"],
+ rh_container: Union["Repo", "RefsContainer"],
+ refspec: Union[str, bytes],
+ force: bool = False,
+) -> Tuple[Optional["Ref"], Optional["Ref"], bool]:
"""Parse a reftuple spec.
Args:
def parse_reftuples(
- lh_container: Union["Repo", "RefsContainer"],
- rh_container: Union["Repo", "RefsContainer"],
- refspecs: Union[bytes, List[bytes]],
- force: bool = False):
+ lh_container: Union["Repo", "RefsContainer"],
+ rh_container: Union["Repo", "RefsContainer"],
+ refspecs: Union[bytes, List[bytes]],
+ force: bool = False,
+):
"""Parse a list of reftuple specs to a list of reftuples.
Args:
return ret
-def parse_commit_range(repo: "Repo", committishs: Union[str, bytes]) -> Iterator["Commit"]:
+def parse_commit_range(
+ repo: "Repo", committishs: Union[str, bytes]
+) -> Iterator["Commit"]:
"""Parse a string referring to a range of commits.
Args:
blob - 03ae5943b86502b28b939379c86832554b05316f
blob + f05e388b95d8b16a436e359d7e40d548d4aa7775
--- dulwich/pack.py
+++ dulwich/pack.py
class UnresolvedDeltas(Exception):
- """"Delta objects could not be resolved."""
+ """ "Delta objects could not be resolved."""
def __init__(self, shas):
self.shas = shas
class ObjectContainer(Protocol):
-
def add_object(self, obj: ShaFile) -> None:
"""Add a single object to this object store."""
def add_objects(
- self, objects: Sequence[Tuple[ShaFile, Optional[str]]],
- progress: Optional[Callable[[str], None]] = None) -> None:
+ self,
+ objects: Sequence[Tuple[ShaFile, Optional[str]]],
+ progress: Optional[Callable[[str], None]] = None,
+ ) -> None:
"""Add a set of objects to this object store.
Args:
class PackedObjectContainer(ObjectContainer):
-
- def get_unpacked_object(self, sha1: bytes, *, include_comp: bool = False) -> "UnpackedObject":
+ def get_unpacked_object(
+ self, sha1: bytes, *, include_comp: bool = False
+ ) -> "UnpackedObject":
"""Get a raw unresolved object."""
raise NotImplementedError(self.get_unpacked_object)
- def iterobjects_subset(self, shas: Iterable[bytes], *, allow_missing: bool = False) -> Iterator[ShaFile]:
+ def iterobjects_subset(
+ self, shas: Iterable[bytes], *, allow_missing: bool = False
+ ) -> Iterator[ShaFile]:
raise NotImplementedError(self.iterobjects_subset)
def iter_unpacked_subset(
- self, shas: Set[bytes], include_comp: bool = False, allow_missing: bool = False,
- convert_ofs_delta: bool = True) -> Iterator["UnpackedObject"]:
+ self,
+ shas: Set[bytes],
+ include_comp: bool = False,
+ allow_missing: bool = False,
+ convert_ofs_delta: bool = True,
+ ) -> Iterator["UnpackedObject"]:
raise NotImplementedError(self.iter_unpacked_subset)
class UnpackedObjectStream:
-
def __iter__(self) -> Iterator["UnpackedObject"]:
raise NotImplementedError(self.__iter__)
raise NotImplementedError(self.__len__)
-def take_msb_bytes(read: Callable[[int], bytes], crc32: Optional[int] = None) -> Tuple[List[int], Optional[int]]:
+def take_msb_bytes(
+ read: Callable[[int], bytes], crc32: Optional[int] = None
+) -> Tuple[List[int], Optional[int]]:
"""Read bytes marked with most significant bit.
Args:
# TODO(dborowitz): read_zlib_chunks and unpack_object could very well be
# methods of this object.
- def __init__(self, pack_type_num, *, delta_base=None, decomp_len=None, crc32=None, sha=None, decomp_chunks=None, offset=None) -> None:
+ def __init__(
+ self,
+ pack_type_num,
+ *,
+ delta_base=None,
+ decomp_len=None,
+ crc32=None,
+ sha=None,
+ decomp_chunks=None,
+ offset=None,
+ ) -> None:
self.offset = offset
self._sha = sha
self.pack_type_num = pack_type_num
def read_zlib_chunks(
- read_some: Callable[[int], bytes],
- unpacked: UnpackedObject, include_comp: bool = False,
- buffer_size: int = _ZLIB_BUFSIZE
+ read_some: Callable[[int], bytes],
+ unpacked: UnpackedObject,
+ include_comp: bool = False,
+ buffer_size: int = _ZLIB_BUFSIZE,
) -> bytes:
"""Read zlib data from a buffer.
raise NotImplementedError(self.get_pack_checksum)
def object_index(self, sha: bytes) -> int:
- warnings.warn('Please use object_offset instead', DeprecationWarning, stacklevel=2)
+ warnings.warn(
+ "Please use object_offset instead", DeprecationWarning, stacklevel=2
+ )
return self.object_offset(sha)
def object_offset(self, sha: bytes) -> int:
def object_sha1(self, index: int) -> bytes:
"""Return the SHA1 corresponding to the index in the pack file."""
- for (name, offset, crc32) in self.iterentries():
+ for name, offset, crc32 in self.iterentries():
if offset == index:
return name
else:
def _unpack_offset(self, i):
offset = self._pack_offset_table_offset + i * 4
offset = unpack_from(">L", self._contents, offset)[0]
- if offset & (2 ** 31):
- offset = self._pack_offset_largetable_offset + (offset & (2 ** 31 - 1)) * 8
+ if offset & (2**31):
+ offset = self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8
offset = unpack_from(">Q", self._contents, offset)[0]
return offset
else:
delta_base = None
- unpacked = UnpackedObject(type_num, delta_base=delta_base, decomp_len=size, crc32=crc32)
+ unpacked = UnpackedObject(
+ type_num, delta_base=delta_base, decomp_len=size, crc32=crc32
+ )
unused = read_zlib_chunks(
read_some,
unpacked,
if self._delta_iter:
self._delta_iter.record(unpacked)
if progress is not None:
- progress(("copying pack entries: %d/%d\r" % (i, len(self))).encode('ascii'))
+ progress(
+ ("copying pack entries: %d/%d\r" % (i, len(self))).encode("ascii")
+ )
if progress is not None:
- progress(("copied %d pack entries\n" % i).encode('ascii'))
+ progress(("copied %d pack entries\n" % i).encode("ascii"))
def obj_sha(type, chunks):
for _ in range(self._num_objects):
offset = self._file.tell()
- unpacked, unused = unpack_object(self._file.read, compute_crc32=False, include_comp=include_comp)
+ unpacked, unused = unpack_object(
+ self._file.read, compute_crc32=False, include_comp=include_comp
+ )
unpacked.offset = offset
yield unpacked
# Back up over unused data.
self._file.seek(-len(unused), SEEK_CUR)
- def iterentries(self, progress=None, resolve_ext_ref: Optional[ResolveExtRefFn] = None):
+ def iterentries(
+ self, progress=None, resolve_ext_ref: Optional[ResolveExtRefFn] = None
+ ):
"""Yield entries summarizing the contents of this pack.
Args:
progress(i, num_objects)
yield result
- def sorted_entries(self, progress: Optional[ProgressFn] = None, resolve_ext_ref: Optional[ResolveExtRefFn] = None):
+ def sorted_entries(
+ self,
+ progress: Optional[ProgressFn] = None,
+ resolve_ext_ref: Optional[ResolveExtRefFn] = None,
+ ):
"""Return entries in this pack, sorted by SHA.
Args:
object count
Returns: Iterator of tuples with (sha, offset, crc32)
"""
- return sorted(self.iterentries(
- progress=progress, resolve_ext_ref=resolve_ext_ref))
+ return sorted(
+ self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref)
+ )
def create_index_v1(self, filename, progress=None, resolve_ext_ref=None):
"""Create a version 1 file for this data file.
Returns: Checksum of index file
"""
entries = self.sorted_entries(
- progress=progress, resolve_ext_ref=resolve_ext_ref)
+ progress=progress, resolve_ext_ref=resolve_ext_ref
+ )
with GitFile(filename, "wb") as f:
return write_pack_index_v1(f, entries, self.calculate_checksum())
Returns: Checksum of index file
"""
entries = self.sorted_entries(
- progress=progress, resolve_ext_ref=resolve_ext_ref)
+ progress=progress, resolve_ext_ref=resolve_ext_ref
+ )
with GitFile(filename, "wb") as f:
return write_pack_index_v2(f, entries, self.calculate_checksum())
"""
if version == 1:
return self.create_index_v1(
- filename, progress, resolve_ext_ref=resolve_ext_ref)
+ filename, progress, resolve_ext_ref=resolve_ext_ref
+ )
elif version == 2:
return self.create_index_v2(
- filename, progress, resolve_ext_ref=resolve_ext_ref)
+ filename, progress, resolve_ext_ref=resolve_ext_ref
+ )
else:
raise ValueError("unknown index format %d" % version)
if actual != stored:
raise ChecksumMismatch(stored, actual)
- def get_unpacked_object_at(self, offset: int, *, include_comp: bool = False) -> UnpackedObject:
+ def get_unpacked_object_at(
+ self, offset: int, *, include_comp: bool = False
+ ) -> UnpackedObject:
"""Given offset in the packfile return a UnpackedObject."""
assert offset >= self._header_size
self._file.seek(offset)
return (unpacked.pack_type_num, unpacked._obj())
-T = TypeVar('T')
+T = TypeVar("T")
class DeltaChainIterator(Generic[T]):
@classmethod
def for_pack_subset(
- cls, pack: "Pack", shas: Iterable[bytes], *,
- allow_missing: bool = False, resolve_ext_ref=None):
+ cls,
+ pack: "Pack",
+ shas: Iterable[bytes],
+ *,
+ allow_missing: bool = False,
+ resolve_ext_ref=None,
+ ):
walker = cls(None, resolve_ext_ref=resolve_ext_ref)
walker.set_pack_data(pack.data)
todo = set()
def _result(self, unpacked: UnpackedObject) -> T:
raise NotImplementedError
- def _resolve_object(self, offset: int, obj_type_num: int, base_chunks: List[bytes]) -> UnpackedObject:
+ def _resolve_object(
+ self, offset: int, obj_type_num: int, base_chunks: List[bytes]
+ ) -> UnpackedObject:
self._file.seek(offset)
unpacked, _ = unpack_object(
self._file.read,
Returns: Tuple with offset at which the object was written, and crc32
"""
crc32 = 0
- for chunk in pack_object_chunks(
- type, object, compression_level=compression_level):
+ for chunk in pack_object_chunks(type, object, compression_level=compression_level):
write(chunk)
if sha is not None:
sha.update(chunk)
def write_pack(
- filename,
- objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
- *,
- deltify: Optional[bool] = None,
- delta_window_size: Optional[int] = None,
- compression_level: int = -1):
+ filename,
+ objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
+ *,
+ deltify: Optional[bool] = None,
+ delta_window_size: Optional[int] = None,
+ compression_level: int = -1,
+):
"""Write a new pack data file.
Args:
def write_pack_header(write, num_objects):
"""Write a pack header for the given number of objects."""
- if hasattr(write, 'write'):
+ if hasattr(write, "write"):
write = write.write
warnings.warn(
- 'write_pack_header() now takes a write rather than file argument',
- DeprecationWarning, stacklevel=2)
+ "write_pack_header() now takes a write rather than file argument",
+ DeprecationWarning,
+ stacklevel=2,
+ )
for chunk in pack_header_chunks(num_objects):
write(chunk)
def find_reusable_deltas(
- container: PackedObjectContainer,
- object_ids: Set[bytes],
- *, other_haves: Optional[Set[bytes]] = None, progress=None) -> Iterator[UnpackedObject]:
+ container: PackedObjectContainer,
+ object_ids: Set[bytes],
+ *,
+ other_haves: Optional[Set[bytes]] = None,
+ progress=None,
+) -> Iterator[UnpackedObject]:
if other_haves is None:
other_haves = set()
reused = 0
- for i, unpacked in enumerate(container.iter_unpacked_subset(object_ids, allow_missing=True, convert_ofs_delta=True)):
+ for i, unpacked in enumerate(
+ container.iter_unpacked_subset(
+ object_ids, allow_missing=True, convert_ofs_delta=True
+ )
+ ):
if progress is not None and i % 1000 == 0:
- progress(("checking for reusable deltas: %d/%d\r" % (i, len(object_ids))).encode('utf-8'))
+ progress(
+ ("checking for reusable deltas: %d/%d\r" % (i, len(object_ids))).encode(
+ "utf-8"
+ )
+ )
if unpacked.pack_type_num == REF_DELTA:
hexsha = sha_to_hex(unpacked.delta_base)
if hexsha in object_ids or hexsha in other_haves:
yield unpacked
reused += 1
if progress is not None:
- progress(("found %d deltas to reuse\n" % (reused, )).encode('utf-8'))
+ progress(("found %d deltas to reuse\n" % (reused,)).encode("utf-8"))
def deltify_pack_objects(
- objects: Union[Iterator[bytes], Iterator[Tuple[ShaFile, Optional[bytes]]]],
- *, window_size: Optional[int] = None,
- progress=None) -> Iterator[UnpackedObject]:
+ objects: Union[Iterator[bytes], Iterator[Tuple[ShaFile, Optional[bytes]]]],
+ *,
+ window_size: Optional[int] = None,
+ progress=None,
+) -> Iterator[UnpackedObject]:
"""Generate deltas for pack objects.
Args:
Returns: Iterator over type_num, object id, delta_base, content
delta_base is None for full text entries
"""
+
def objects_with_hints():
for e in objects:
if isinstance(e, ShaFile):
yield (e, (e.type_num, None))
else:
yield (e[0], (e[0].type_num, e[1]))
+
yield from deltas_from_sorted_objects(
sort_objects_for_delta(objects_with_hints()),
window_size=window_size,
- progress=progress)
+ progress=progress,
+ )
-def sort_objects_for_delta(objects: Union[Iterator[ShaFile], Iterator[Tuple[ShaFile, Optional[PackHint]]]]) -> Iterator[ShaFile]:
+def sort_objects_for_delta(
+ objects: Union[Iterator[ShaFile], Iterator[Tuple[ShaFile, Optional[PackHint]]]]
+) -> Iterator[ShaFile]:
magic = []
for entry in objects:
if isinstance(entry, tuple):
return (x[3] for x in magic)
-def deltas_from_sorted_objects(objects, window_size: Optional[int] = None, progress=None):
+def deltas_from_sorted_objects(
+ objects, window_size: Optional[int] = None, progress=None
+):
# TODO(jelmer): Use threads
if window_size is None:
window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE
possible_bases: Deque[Tuple[bytes, int, List[bytes]]] = deque()
for i, o in enumerate(objects):
if progress is not None and i % 1000 == 0:
- progress(("generating deltas: %d\r" % (i, )).encode('utf-8'))
+ progress(("generating deltas: %d\r" % (i,)).encode("utf-8"))
raw = o.as_raw_chunks()
winner = raw
winner_len = sum(map(len, winner))
winner_base = base_id
winner = delta
winner_len = sum(map(len, winner))
- yield UnpackedObject(o.type_num, sha=o.sha().digest(), delta_base=winner_base, decomp_len=winner_len, decomp_chunks=winner)
+ yield UnpackedObject(
+ o.type_num,
+ sha=o.sha().digest(),
+ delta_base=winner_base,
+ decomp_len=winner_len,
+ decomp_chunks=winner,
+ )
possible_bases.appendleft((o.sha().digest(), o.type_num, raw))
while len(possible_bases) > window_size:
possible_bases.pop()
def pack_objects_to_data(
- objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
- *,
- deltify: Optional[bool] = None,
- delta_window_size: Optional[int] = None,
- ofs_delta: bool = True,
- progress=None) -> Tuple[int, Iterator[UnpackedObject]]:
+ objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
+ *,
+ deltify: Optional[bool] = None,
+ delta_window_size: Optional[int] = None,
+ ofs_delta: bool = True,
+ progress=None,
+) -> Tuple[int, Iterator[UnpackedObject]]:
"""Create pack data from objects.
Args:
if deltify:
return (
count,
- deltify_pack_objects(iter(objects), window_size=delta_window_size, progress=progress)) # type: ignore
+ deltify_pack_objects(
+ iter(objects), window_size=delta_window_size, progress=progress
+ ),
+ ) # type: ignore
else:
+
def iter_without_path():
for o in objects:
if isinstance(o, tuple):
yield full_unpacked_object(o[0])
else:
yield full_unpacked_object(o)
- return (
- count,
- iter_without_path()
- )
+ return (count, iter_without_path())
+
def generate_unpacked_objects(
- container: PackedObjectContainer,
- object_ids: Sequence[Tuple[ObjectID, Optional[PackHint]]],
- delta_window_size: Optional[int] = None,
- deltify: Optional[bool] = None,
- reuse_deltas: bool = True,
- ofs_delta: bool = True,
- other_haves: Optional[Set[bytes]] = None,
- progress=None) -> Iterator[UnpackedObject]:
+ container: PackedObjectContainer,
+ object_ids: Sequence[Tuple[ObjectID, Optional[PackHint]]],
+ delta_window_size: Optional[int] = None,
+ deltify: Optional[bool] = None,
+ reuse_deltas: bool = True,
+ ofs_delta: bool = True,
+ other_haves: Optional[Set[bytes]] = None,
+ progress=None,
+) -> Iterator[UnpackedObject]:
"""Create pack data from objects.
Args:
"""
todo = dict(object_ids)
if reuse_deltas:
- for unpack in find_reusable_deltas(container, set(todo), other_haves=other_haves, progress=progress):
+ for unpack in find_reusable_deltas(
+ container, set(todo), other_haves=other_haves, progress=progress
+ ):
del todo[sha_to_hex(unpack.sha())]
yield unpack
if deltify is None:
# slow at the moment.
deltify = False
if deltify:
- objects_to_delta = container.iterobjects_subset(todo.keys(), allow_missing=False)
+ objects_to_delta = container.iterobjects_subset(
+ todo.keys(), allow_missing=False
+ )
yield from deltas_from_sorted_objects(
- sort_objects_for_delta(
- (o, todo[o.id])
- for o in objects_to_delta),
+ sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta),
window_size=delta_window_size,
- progress=progress)
+ progress=progress,
+ )
else:
for oid in todo:
yield full_unpacked_object(container[oid])
def full_unpacked_object(o: ShaFile) -> UnpackedObject:
return UnpackedObject(
- o.type_num, delta_base=None, crc32=None,
+ o.type_num,
+ delta_base=None,
+ crc32=None,
decomp_chunks=o.as_raw_chunks(),
- sha=o.sha().digest())
+ sha=o.sha().digest(),
+ )
def write_pack_from_container(
- write,
- container: PackedObjectContainer,
- object_ids: Sequence[Tuple[ObjectID, Optional[PackHint]]],
- delta_window_size: Optional[int] = None,
- deltify: Optional[bool] = None,
- reuse_deltas: bool = True,
- compression_level: int = -1,
- other_haves: Optional[Set[bytes]] = None
+ write,
+ container: PackedObjectContainer,
+ object_ids: Sequence[Tuple[ObjectID, Optional[PackHint]]],
+ delta_window_size: Optional[int] = None,
+ deltify: Optional[bool] = None,
+ reuse_deltas: bool = True,
+ compression_level: int = -1,
+ other_haves: Optional[Set[bytes]] = None,
):
"""Write a new pack data file.
"""
pack_contents_count = len(object_ids)
pack_contents = generate_unpacked_objects(
- container, object_ids, delta_window_size=delta_window_size,
+ container,
+ object_ids,
+ delta_window_size=delta_window_size,
deltify=deltify,
reuse_deltas=reuse_deltas,
- other_haves=other_haves)
+ other_haves=other_haves,
+ )
return write_pack_data(
write,
def write_pack_objects(
- write,
- objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
- *,
- delta_window_size: Optional[int] = None,
- deltify: Optional[bool] = None,
- compression_level: int = -1
+ write,
+ objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
+ *,
+ delta_window_size: Optional[int] = None,
+ deltify: Optional[bool] = None,
+ compression_level: int = -1,
):
"""Write a new pack data file.
compression_level: the zlib compression level to use
Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
"""
- pack_contents_count, pack_contents = pack_objects_to_data(
- objects, deltify=deltify)
+ pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify)
return write_pack_data(
write,
class PackChunkGenerator:
-
- def __init__(self, num_records=None, records=None, progress=None, compression_level=-1, reuse_compressed=True) -> None:
+ def __init__(
+ self,
+ num_records=None,
+ records=None,
+ progress=None,
+ compression_level=-1,
+ reuse_compressed=True,
+ ) -> None:
self.cs = sha1(b"")
self.entries: Dict[Union[int, bytes], Tuple[int, int]] = {}
self._it = self._pack_data_chunks(
- num_records=num_records, records=records, progress=progress, compression_level=compression_level, reuse_compressed=reuse_compressed)
+ num_records=num_records,
+ records=records,
+ progress=progress,
+ compression_level=compression_level,
+ reuse_compressed=reuse_compressed,
+ )
def sha1digest(self):
return self.cs.digest()
def __iter__(self):
return self._it
- def _pack_data_chunks(self, records: Iterator[UnpackedObject], *, num_records=None, progress=None, compression_level: int = -1, reuse_compressed: bool = True) -> Iterator[bytes]:
+ def _pack_data_chunks(
+ self,
+ records: Iterator[UnpackedObject],
+ *,
+ num_records=None,
+ progress=None,
+ compression_level: int = -1,
+ reuse_compressed: bool = True,
+ ) -> Iterator[bytes]:
"""Iterate pack data file chunks.
Args:
"""
# Write the pack
if num_records is None:
- num_records = len(records) # type: ignore
+ num_records = len(records) # type: ignore
offset = 0
for chunk in pack_header_chunks(num_records):
yield chunk
for i, unpacked in enumerate(records):
type_num = unpacked.pack_type_num
if progress is not None and i % 1000 == 0:
- progress(("writing pack data: %d/%d\r" % (i, num_records)).encode("ascii"))
+ progress(
+ ("writing pack data: %d/%d\r" % (i, num_records)).encode("ascii")
+ )
raw: Union[List[bytes], Tuple[int, List[bytes]], Tuple[bytes, List[bytes]]]
if unpacked.delta_base is not None:
try:
if unpacked.comp_chunks is not None and reuse_compressed:
chunks = unpacked.comp_chunks
else:
- chunks = pack_object_chunks(type_num, raw, compression_level=compression_level)
+ chunks = pack_object_chunks(
+ type_num, raw, compression_level=compression_level
+ )
crc32 = 0
object_size = 0
for chunk in chunks:
offset += object_size
if actual_num_records != num_records:
raise AssertionError(
- 'actual records written differs: %d != %d' % (
- actual_num_records, num_records))
+ "actual records written differs: %d != %d"
+ % (actual_num_records, num_records)
+ )
yield self.cs.digest()
-def write_pack_data(write, records: Iterator[UnpackedObject], *, num_records=None, progress=None, compression_level=-1):
+def write_pack_data(
+ write,
+ records: Iterator[UnpackedObject],
+ *,
+ num_records=None,
+ progress=None,
+ compression_level=-1,
+):
"""Write a new pack data file.
Args:
Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
"""
chunk_generator = PackChunkGenerator(
- num_records=num_records, records=records, progress=progress,
- compression_level=compression_level)
+ num_records=num_records,
+ records=records,
+ progress=progress,
+ compression_level=compression_level,
+ )
for chunk in chunk_generator:
write(chunk)
return chunk_generator.entries, chunk_generator.sha1digest()
"""
f = SHA1Writer(f)
fan_out_table = defaultdict(lambda: 0)
- for (name, offset, entry_checksum) in entries:
+ for name, offset, entry_checksum in entries:
fan_out_table[ord(name[:1])] += 1
# Fan-out table
for i in range(0x100):
f.write(struct.pack(">L", fan_out_table[i]))
fan_out_table[i + 1] += fan_out_table[i]
- for (name, offset, entry_checksum) in entries:
+ for name, offset, entry_checksum in entries:
if not (offset <= 0xFFFFFFFF):
raise TypeError("pack format 1 only supports offsets < 2Gb")
f.write(struct.pack(">L20s", offset, name))
target_buf: Target buffer
"""
if isinstance(base_buf, list):
- base_buf = b''.join(base_buf)
+ base_buf = b"".join(base_buf)
if isinstance(target_buf, list):
- target_buf = b''.join(target_buf)
+ target_buf = b"".join(target_buf)
assert isinstance(base_buf, bytes)
assert isinstance(target_buf, bytes)
# write delta header
o = j1
while s > 127:
yield bytes([127])
- yield memoryview(target_buf)[o:o + 127]
+ yield memoryview(target_buf)[o : o + 127]
s -= 127
o += 127
yield bytes([s])
- yield memoryview(target_buf)[o:o + s]
+ yield memoryview(target_buf)[o : o + s]
def apply_delta(src_buf, delta):
def write_pack_index_v2(
- f, entries: Iterable[PackIndexEntry], pack_checksum: bytes) -> bytes:
+ f, entries: Iterable[PackIndexEntry], pack_checksum: bytes
+) -> bytes:
"""Write a new pack index file.
Args:
f.write(b"\377tOc") # Magic!
f.write(struct.pack(">L", 2))
fan_out_table: Dict[int, int] = defaultdict(lambda: 0)
- for (name, offset, entry_checksum) in entries:
+ for name, offset, entry_checksum in entries:
fan_out_table[ord(name[:1])] += 1
# Fan-out table
largetable: List[int] = []
for i in range(0x100):
f.write(struct.pack(b">L", fan_out_table[i]))
fan_out_table[i + 1] += fan_out_table[i]
- for (name, offset, entry_checksum) in entries:
+ for name, offset, entry_checksum in entries:
f.write(name)
- for (name, offset, entry_checksum) in entries:
+ for name, offset, entry_checksum in entries:
f.write(struct.pack(b">L", entry_checksum))
- for (name, offset, entry_checksum) in entries:
- if offset < 2 ** 31:
+ for name, offset, entry_checksum in entries:
+ if offset < 2**31:
f.write(struct.pack(b">L", offset))
else:
- f.write(struct.pack(b">L", 2 ** 31 + len(largetable)))
+ f.write(struct.pack(b">L", 2**31 + len(largetable)))
largetable.append(offset)
for offset in largetable:
f.write(struct.pack(b">Q", offset))
_data: Optional[PackData]
_idx: Optional[PackIndex]
- def __init__(self, basename, resolve_ext_ref: Optional[ResolveExtRefFn] = None) -> None:
+ def __init__(
+ self, basename, resolve_ext_ref: Optional[ResolveExtRefFn] = None
+ ) -> None:
self._basename = basename
self._data = None
self._idx = None
def check_length_and_checksum(self) -> None:
"""Sanity check the length and checksum of the pack index and data."""
- assert len(self.index) == len(self.data), f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)"
+ assert len(self.index) == len(
+ self.data
+ ), f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)"
idx_stored_checksum = self.index.get_pack_checksum()
data_stored_checksum = self.data.get_stored_checksum()
if idx_stored_checksum != data_stored_checksum:
PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref)
)
- def iterobjects_subset(self, shas: Iterable[ObjectID], *, allow_missing: bool = False) -> Iterator[ShaFile]:
+ def iterobjects_subset(
+ self, shas: Iterable[ObjectID], *, allow_missing: bool = False
+ ) -> Iterator[ShaFile]:
return (
uo
- for uo in
- PackInflater.for_pack_subset(
- self, shas, allow_missing=allow_missing,
- resolve_ext_ref=self.resolve_ext_ref)
- if uo.id in shas)
+ for uo in PackInflater.for_pack_subset(
+ self,
+ shas,
+ allow_missing=allow_missing,
+ resolve_ext_ref=self.resolve_ext_ref,
+ )
+ if uo.id in shas
+ )
- def iter_unpacked_subset(self, shas: Iterable[ObjectID], *, include_comp: bool = False, allow_missing: bool = False, convert_ofs_delta: bool = False) -> Iterator[UnpackedObject]:
+ def iter_unpacked_subset(
+ self,
+ shas: Iterable[ObjectID],
+ *,
+ include_comp: bool = False,
+ allow_missing: bool = False,
+ convert_ofs_delta: bool = False,
+ ) -> Iterator[UnpackedObject]:
ofs_pending: Dict[int, List[UnpackedObject]] = defaultdict(list)
ofs: Dict[bytes, int] = {}
todo = set(shas)
raise UnresolvedDeltas(todo)
def iter_unpacked(self, include_comp=False):
- ofs_to_entries = {ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries()}
+ ofs_to_entries = {
+ ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries()
+ }
for unpacked in self.data.iter_unpacked(include_comp=include_comp):
(sha, crc32) = ofs_to_entries[unpacked.offset]
unpacked._sha = sha
raise KeyError(sha)
return offset, type, obj
- def resolve_object(self, offset: int, type: int, obj, get_ref=None) -> Tuple[int, Iterable[bytes]]:
+ def resolve_object(
+ self, offset: int, type: int, obj, get_ref=None
+ ) -> Tuple[int, Iterable[bytes]]:
"""Resolve an object, possibly resolving deltas when necessary.
Returns: Tuple with object type and contents.
self.data._offset_cache[prev_offset] = base_type, chunks
return base_type, chunks
- def entries(self, progress: Optional[ProgressFn] = None) -> Iterator[PackIndexEntry]:
+ def entries(
+ self, progress: Optional[ProgressFn] = None
+ ) -> Iterator[PackIndexEntry]:
"""Yield entries summarizing the contents of this pack.
Args:
Returns: iterator of tuples with (sha, offset, crc32)
"""
return self.data.iterentries(
- progress=progress, resolve_ext_ref=self.resolve_ext_ref)
+ progress=progress, resolve_ext_ref=self.resolve_ext_ref
+ )
- def sorted_entries(self, progress: Optional[ProgressFn] = None) -> Iterator[PackIndexEntry]:
+ def sorted_entries(
+ self, progress: Optional[ProgressFn] = None
+ ) -> Iterator[PackIndexEntry]:
"""Return entries in this pack, sorted by SHA.
Args:
Returns: Iterator of tuples with (sha, offset, crc32)
"""
return self.data.sorted_entries(
- progress=progress, resolve_ext_ref=self.resolve_ext_ref)
+ progress=progress, resolve_ext_ref=self.resolve_ext_ref
+ )
- def get_unpacked_object(self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True) -> UnpackedObject:
+ def get_unpacked_object(
+ self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True
+ ) -> UnpackedObject:
"""Get the unpacked object for a sha.
Args:
return unpacked
-def extend_pack(f: BinaryIO, object_ids: Set[ObjectID], get_raw, *, compression_level=-1, progress=None) -> Tuple[bytes, List]:
+def extend_pack(
+ f: BinaryIO,
+ object_ids: Set[ObjectID],
+ get_raw,
+ *,
+ compression_level=-1,
+ progress=None,
+) -> Tuple[bytes, List]:
"""Extend a pack file with more objects.
The caller should make sure that object_ids does not contain any objects
# Complete the pack.
for i, object_id in enumerate(object_ids):
if progress is not None:
- progress(("writing extra base objects: %d/%d\r" % (i, len(object_ids))).encode("ascii"))
+ progress(
+ ("writing extra base objects: %d/%d\r" % (i, len(object_ids))).encode(
+ "ascii"
+ )
+ )
assert len(object_id) == 20
type_num, data = get_raw(object_id)
offset = f.tell()
blob - 9cfadd86cef71a9ec512158f9f5be548bf5628a9
blob + 753188906ff9e17b2cd845c9d35804f665078955
--- dulwich/patch.py
+++ dulwich/patch.py
started = True
fromdate = f"\t{fromfiledate}" if fromfiledate else ""
todate = f"\t{tofiledate}" if tofiledate else ""
- yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(output_encoding)
- yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(output_encoding)
+ yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
+ output_encoding
+ )
+ yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
+ output_encoding
+ )
first, last = group[0], group[-1]
file1_range = _format_range_unified(first[1], last[2])
file2_range = _format_range_unified(first[3], last[4])
- yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(
- output_encoding
- )
+ yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
for tag, i1, i2, j1, j2 in group:
if tag == "equal":
blob - 5a68f58dc586f1fc082ee0a7a3adcb66c77718d4
blob + b15ef08e6b469721c2ed3551c5ee0dc96d28be13
--- dulwich/porcelain.py
+++ dulwich/porcelain.py
# RFC 2822
import email.utils
+
rfc_2822 = email.utils.parsedate_tz(tz_str)
if rfc_2822:
return rfc_2822[9]
# Supported offsets:
# sHHMM, sHH:MM, sHH
- iso_8601_pattern = re.compile("[0-9] ?([+-])([0-9]{2})(?::(?=[0-9]{2}))?([0-9]{2})?$")
+ iso_8601_pattern = re.compile(
+ "[0-9] ?([+-])([0-9]{2})(?::(?=[0-9]{2}))?([0-9]{2})?$"
+ )
match = re.search(iso_8601_pattern, tz_str)
total_secs = 0
if match:
depth: Optional[int] = None,
branch: Optional[Union[str, bytes]] = None,
config: Optional[Config] = None,
- **kwargs
+ **kwargs,
):
"""Clone a local or remote git repository.
mkdir = not os.path.exists(target)
- (client, path) = get_transport_and_path(
- source, config=config, **kwargs)
+ (client, path) = get_transport_and_path(source, config=config, **kwargs)
return client.clone(
path,
def _canonical_part(url: str) -> str:
- name = url.rsplit('/', 1)[-1]
- if name.endswith('.git'):
+ name = url.rsplit("/", 1)[-1]
+ if name.endswith(".git"):
name = name[:-4]
return name
"""
with open_repo_closing(repo) as r:
config = r.get_config()
- gitmodules_path = os.path.join(r.path, '.gitmodules')
+ gitmodules_path = os.path.join(r.path, ".gitmodules")
for path, url, name in read_submodules(gitmodules_path):
- config.set((b'submodule', name), b'active', True)
- config.set((b'submodule', name), b'url', url)
+ config.set((b"submodule", name), b"active", True)
+ config.set((b"submodule", name), b"url", url)
config.write_to_path()
repo: Path to repository
"""
from .submodule import iter_cached_submodules
+
with open_repo_closing(repo) as r:
for path, sha in iter_cached_submodules(r.object_store, r[r.head()].tree):
yield path, sha.decode(DEFAULT_ENCODING)
tag_time=None,
tag_timezone=None,
sign=False,
- encoding=DEFAULT_ENCODING
+ encoding=DEFAULT_ENCODING,
):
"""Creates a tag in git via dulwich calls.
outstream=default_bytes_out_stream,
errstream=default_bytes_err_stream,
force=False,
- **kwargs
+ **kwargs,
):
"""Remote push with dulwich via dulwich.client.
selected_refs.extend(parse_reftuples(r.refs, refs, refspecs, force=force))
new_refs = {}
# TODO: Handle selected_refs == {None: None}
- for (lh, rh, force_ref) in selected_refs:
+ for lh, rh, force_ref in selected_refs:
if lh is None:
new_refs[rh] = ZERO_SHA
remote_changed_refs[rh] = None
try:
localsha = r.refs[lh]
except KeyError as exc:
- raise Error(
- "No valid ref %s in local repository" % lh
- ) from exc
+ raise Error("No valid ref %s in local repository" % lh) from exc
if not force_ref and rh in refs:
check_diverged(r, refs[rh], localsha)
new_refs[rh] = localsha
errstream=default_bytes_err_stream,
fast_forward=True,
force=False,
- **kwargs
+ **kwargs,
):
"""Pull from remote via dulwich.client.
fetch_result = client.fetch(
path, r, progress=errstream.write, determine_wants=determine_wants
)
- for (lh, rh, force_ref) in selected_refs:
+ for lh, rh, force_ref in selected_refs:
if not force_ref and rh in r.refs:
try:
check_diverged(r, r.refs.follow(rh)[1], fetch_result.refs[lh])
if fast_forward:
raise
else:
- raise NotImplementedError(
- "merge is not yet supported") from exc
+ raise NotImplementedError("merge is not yet supported") from exc
r.refs[rh] = fetch_result.refs[lh]
if selected_refs:
r[b"HEAD"] = fetch_result.refs[selected_refs[0][1]]
prune=False,
prune_tags=False,
force=False,
- **kwargs
+ **kwargs,
):
"""Fetch objects from a remote server.
r.object_store.pack_loose_objects()
-def pack_objects(repo, object_ids, packf, idxf, delta_window_size=None, deltify=None, reuse_deltas=True):
+def pack_objects(
+ repo,
+ object_ids,
+ packf,
+ idxf,
+ delta_window_size=None,
+ deltify=None,
+ reuse_deltas=True,
+):
"""Pack objects into a file.
Args:
"""
def list_tree(store, treeid, base):
- for (name, mode, sha) in store[treeid].iteritems():
+ for name, mode, sha in store[treeid].iteritems():
if base:
name = posixpath.join(base, name)
if name_only:
r.refs.set_symbolic_ref(b"HEAD", to_set)
-def reset_file(repo, file_path: str, target: bytes = b'HEAD',
- symlink_fn=None):
+def reset_file(repo, file_path: str, target: bytes = b"HEAD", symlink_fn=None):
"""Reset the file to specific commit or branch.
Args:
def _update_head_during_checkout_branch(repo, target):
checkout_target = None
- if target == b'HEAD': # Do not update head while trying to checkout to HEAD.
+ if target == b"HEAD": # Do not update head while trying to checkout to HEAD.
pass
elif target in repo.refs.keys(base=LOCAL_BRANCH_PREFIX):
update_head(repo, target)
if config.has_section(section):
checkout_target = target.replace(name + b"/", b"")
try:
- branch_create(repo, checkout_target, (LOCAL_REMOTE_PREFIX + target).decode())
+ branch_create(
+ repo, checkout_target, (LOCAL_REMOTE_PREFIX + target).decode()
+ )
except Error:
pass
update_head(repo, LOCAL_BRANCH_PREFIX + checkout_target)
_update_head_during_checkout_branch(repo, target)
else:
status_report = status(repo)
- changes = list(set(status_report[0]['add'] + status_report[0]['delete'] + status_report[0]['modify'] + status_report[1]))
+ changes = list(
+ set(
+ status_report[0]["add"]
+ + status_report[0]["delete"]
+ + status_report[0]["modify"]
+ + status_report[1]
+ )
+ )
index = 0
while index < len(changes):
change = changes[index]
target_tree.lookup_path(repo.object_store.__getitem__, change)
index += 1
except KeyError:
- raise CheckoutError('Your local changes to the following files would be overwritten by checkout: ' + change.decode())
+ raise CheckoutError(
+ "Your local changes to the following files would be overwritten by checkout: "
+ + change.decode()
+ )
except KeyError:
changes.pop(index)
blob - 8d200544123fdf9835cc72436021e16a512a3fe0
blob + 336dbcceacf6208209201cef919325d4a860da32
--- dulwich/protocol.py
+++ dulwich/protocol.py
def __init__(
self, recv, write, close=None, report_activity=None, rbufsize=_RBUFSIZE
) -> None:
- super().__init__(
- self.read, write, close=close, report_activity=report_activity
- )
+ super().__init__(self.read, write, close=close, report_activity=report_activity)
self._recv = recv
self._rbuf = BytesIO()
self._rbufsize = rbufsize
if capabilities is None:
return sha + b" " + ref + b"\n"
else:
- return (
- sha + b" " + ref + b"\0"
- + format_capability_line(capabilities)
- + b"\n")
+ return sha + b" " + ref + b"\0" + format_capability_line(capabilities) + b"\n"
def format_shallow_line(sha):
blob - 84547adb75956ea918f37f1e42c849f8ac19422c
blob + 42ddcfe02a9c1455fbb6d1746d102cee9ea91854
--- dulwich/refs.py
+++ dulwich/refs.py
"""
raise NotImplementedError(self.set_if_equals)
- def add_if_new(self, name, ref, committer=None, timestamp=None,
- timezone=None, message=None):
+ def add_if_new(
+ self, name, ref, committer=None, timestamp=None, timezone=None, message=None
+ ):
"""Add a new reference only if it does not already exist.
Args:
except ValueError:
break
- if parent == b'refs':
+ if parent == b"refs":
break
parent_filename = self.refpath(parent)
try:
"""Generate info refs."""
# TODO: Avoid recursive import :(
from .object_store import peel_sha
+
for name, sha in sorted(refs.items()):
# get_refs() includes HEAD as a special case, but we don't want to
# advertise it
def strip_peeled_refs(refs):
"""Remove all peeled refs."""
return {
- ref: sha
- for (ref, sha) in refs.items()
- if not ref.endswith(PEELED_TAG_SUFFIX)
+ ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
}
def _set_default_branch(
- refs: RefsContainer, origin: bytes, origin_head: bytes, branch: bytes,
- ref_message: Optional[bytes]) -> bytes:
+ refs: RefsContainer,
+ origin: bytes,
+ origin_head: bytes,
+ branch: bytes,
+ ref_message: Optional[bytes],
+) -> bytes:
"""Set the default branch."""
origin_base = b"refs/remotes/" + origin + b"/"
if branch:
origin_ref = origin_base + branch
if origin_ref in refs:
local_ref = LOCAL_BRANCH_PREFIX + branch
- refs.add_if_new(
- local_ref, refs[origin_ref], ref_message
- )
+ refs.add_if_new(local_ref, refs[origin_ref], ref_message)
head_ref = local_ref
elif LOCAL_TAG_PREFIX + branch in refs:
head_ref = LOCAL_TAG_PREFIX + branch
else:
- raise ValueError(
- "%r is not a valid branch or tag" % os.fsencode(branch)
- )
+ raise ValueError("%r is not a valid branch or tag" % os.fsencode(branch))
elif origin_head:
head_ref = origin_head
if origin_head.startswith(LOCAL_BRANCH_PREFIX):
else:
origin_ref = origin_head
try:
- refs.add_if_new(
- head_ref, refs[origin_ref], ref_message
- )
+ refs.add_if_new(head_ref, refs[origin_ref], ref_message)
except KeyError:
pass
else:
- raise ValueError('neither origin_head nor branch are provided')
+ raise ValueError("neither origin_head nor branch are provided")
return head_ref
_cls, obj = head.object
head = obj.get_object(obj).id
del refs[HEADREF]
- refs.set_if_equals(
- HEADREF, None, head, message=ref_message
- )
+ refs.set_if_equals(HEADREF, None, head, message=ref_message)
else:
# set HEAD to specific branch
try:
for (n, v) in stripped_refs.items()
if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
}
- refs_container.import_refs(LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags)
+ refs_container.import_refs(
+ LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
+ )
def serialize_refs(store, refs):
# TODO: Avoid recursive import :(
from .object_store import peel_sha
+
ret = {}
for ref, sha in refs.items():
try:
unpeeled, peeled = peel_sha(store, sha)
except KeyError:
warnings.warn(
- "ref {} points at non-present sha {}".format(ref.decode("utf-8", "replace"), sha.decode("ascii")),
+ "ref {} points at non-present sha {}".format(
+ ref.decode("utf-8", "replace"), sha.decode("ascii")
+ ),
UserWarning,
)
continue
blob - 4543e33d01f6e8fa70182d30d94ca9fc43bb80ca
blob + b5c5e6ca10902a87bff57566b0e882290b39d546
--- dulwich/repo.py
+++ dulwich/repo.py
def _get_default_identity() -> Tuple[str, str]:
import socket
- for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
+ for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
username = os.environ.get(name)
if username:
break
except KeyError:
fullname = None
else:
- if getattr(entry, 'gecos', None):
+ if getattr(entry, "gecos", None):
fullname = entry.pw_gecos.split(",")[0]
else:
fullname = None
raise InvalidUserIdentity(identity) from exc
if b">" not in snd:
raise InvalidUserIdentity(identity)
- if b'\0' in identity or b'\n' in identity:
+ if b"\0" in identity or b"\n" in identity:
raise InvalidUserIdentity(identity)
remote_has = missing_objects.get_remote_has()
object_ids = list(missing_objects)
return len(object_ids), generate_unpacked_objects(
- self.object_store, object_ids, progress=progress,
- other_haves=remote_has)
+ self.object_store, object_ids, progress=progress, other_haves=remote_has
+ )
def find_missing_objects(
self,
raise TypeError("determine_wants() did not return a list")
shallows: FrozenSet[ObjectID] = getattr(graph_walker, "shallow", frozenset())
- unshallows: FrozenSet[ObjectID] = getattr(graph_walker, "unshallow", frozenset())
+ unshallows: FrozenSet[ObjectID] = getattr(
+ graph_walker, "unshallow", frozenset()
+ )
if wants == []:
# TODO(dborowitz): find a way to short-circuit that doesn't change
return None
class DummyMissingObjectFinder:
-
def get_remote_has(self):
return None
shallow=self.get_shallow(),
progress=progress,
get_tagged=get_tagged,
- get_parents=get_parents)
+ get_parents=get_parents,
+ )
- def generate_pack_data(self, have: List[ObjectID], want: List[ObjectID],
- progress: Optional[Callable[[str], None]] = None,
- ofs_delta: Optional[bool] = None):
+ def generate_pack_data(
+ self,
+ have: List[ObjectID],
+ want: List[ObjectID],
+ progress: Optional[Callable[[str], None]] = None,
+ ofs_delta: Optional[bool] = None,
+ ):
"""Generate pack data objects for a set of wants/haves.
Args:
)
def get_graph_walker(
- self,
- heads: Optional[List[ObjectID]] = None) -> ObjectStoreGraphWalker:
+ self, heads: Optional[List[ObjectID]] = None
+ ) -> ObjectStoreGraphWalker:
"""Retrieve a graph walker.
A graph walker is used by a remote repository (or proxy)
elif cls is Tag:
raise NotTagError(ret)
else:
- raise Exception(
- f"Type invalid: {ret.type_name!r} != {cls.type_name!r}"
- )
+ raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
return ret
def get_object(self, sha: bytes) -> ShaFile:
shallows=self.get_shallow(),
)
- def get_parents(self, sha: bytes,
- commit: Optional[Commit] = None) -> List[bytes]:
+ def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> List[bytes]:
"""Retrieve the parents of a specific commit.
If the specific commit is a graftpoint, the graft parents
local_config = self.get_config()
backends: List[ConfigFile] = [local_config]
- if local_config.get_boolean((b"extensions", ), b"worktreeconfig", False):
+ if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
backends.append(self.get_worktree_config())
backends += StackedConfig.default_backends()
if new_unshallow:
shallow.difference_update(new_unshallow)
if shallow:
- self._put_named_file(
- "shallow", b"".join([sha + b"\n" for sha in shallow])
- )
+ self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
else:
self._del_named_file("shallow")
return cached
return peel_sha(self.object_store, self.refs[ref])[1].id
- def get_walker(self, include: Optional[List[bytes]] = None,
- *args, **kwargs):
+ def get_walker(self, include: Optional[List[bytes]] = None, *args, **kwargs):
"""Obtain a walker for this repository.
Args:
else:
raise ValueError(name)
- def _get_user_identity(self, config: "StackedConfig",
- kind: Optional[str] = None) -> bytes:
+ def _get_user_identity(
+ self, config: "StackedConfig", kind: Optional[str] = None
+ ) -> bytes:
"""Determine the identity to use for new commits."""
warnings.warn(
"use get_user_identity() rather than Repo._get_user_identity",
- DeprecationWarning)
+ DeprecationWarning,
+ )
return get_user_identity(config)
def _add_graftpoints(self, updated_graftpoints: Dict[bytes, List[bytes]]):
self,
root: str,
object_store: Optional[PackBasedObjectStore] = None,
- bare: Optional[bool] = None
+ bare: Optional[bool] = None,
) -> None:
hidden_path = os.path.join(root, CONTROLDIR)
if bare is None:
- if (os.path.isfile(hidden_path)
- or os.path.isdir(os.path.join(hidden_path, OBJECTDIR))):
+ if os.path.isfile(hidden_path) or os.path.isdir(
+ os.path.join(hidden_path, OBJECTDIR)
+ ):
bare = False
- elif (os.path.isdir(os.path.join(root, OBJECTDIR))
- and os.path.isdir(os.path.join(root, REFSDIR))):
+ elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
+ os.path.join(root, REFSDIR)
+ ):
bare = True
else:
raise NotGitRepository(
self.path = root
config = self.get_config()
try:
- repository_format_version = config.get(
- "core",
- "repositoryformatversion"
- )
+ repository_format_version = config.get("core", "repositoryformatversion")
format_version = (
0
if repository_format_version is None
if format_version not in (0, 1):
raise UnsupportedVersion(format_version)
- for extension, _value in config.items((b"extensions", )):
- if extension not in (b'worktreeconfig', ):
+ for extension, _value in config.items((b"extensions",)):
+ if extension not in (b"worktreeconfig",):
raise UnsupportedExtension(extension)
if object_store is None:
# missing index file, which is treated as empty.
return not self.bare
- def stage(self, fs_paths: Union[str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]]) -> None:
+ def stage(
+ self,
+ fs_paths: Union[
+ str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
+ ],
+ ) -> None:
"""Stage a set of paths.
Args:
index = self.open_index()
try:
- tree_id = self[b'HEAD'].tree
+ tree_id = self[b"HEAD"].tree
except KeyError:
# no head mean no commit in the repo
for fs_path in fs_paths:
try:
tree = self.object_store[tree_id]
assert isinstance(tree, Tree)
- tree_entry = tree.lookup_path(
- self.object_store.__getitem__, tree_path)
+ tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path)
except KeyError:
# if tree_entry didn't exist, this file was being added, so
# remove index entry
pass
index_entry = IndexEntry(
- ctime=(self[b'HEAD'].commit_time, 0),
- mtime=(self[b'HEAD'].commit_time, 0),
+ ctime=(self[b"HEAD"].commit_time, 0),
+ mtime=(self[b"HEAD"].commit_time, 0),
dev=st.st_dev if st else 0,
ino=st.st_ino if st else 0,
mode=tree_entry[0],
except BaseException:
if mkdir:
import shutil
+
shutil.rmtree(target_path)
raise
return target
if config.get_boolean(b"core", b"symlinks", True):
symlink_fn = symlink
else:
+
def symlink_fn(source, target): # type: ignore
- with open(target, 'w' + ('b' if isinstance(source, bytes) else '')) as f:
+ with open(
+ target, "w" + ("b" if isinstance(source, bytes) else "")
+ ) as f:
f.write(source)
+
return build_index_from_tree(
self.path,
self.index_path(),
tree,
honor_filemode=honor_filemode,
validate_path_element=validate_path_element,
- symlink_fn=symlink_fn
+ symlink_fn=symlink_fn,
)
def get_worktree_config(self) -> "ConfigFile":
from .config import ConfigFile
+
path = os.path.join(self.commondir(), "config.worktree")
try:
return ConfigFile.from_path(path)
@classmethod
def _init_maybe_bare(
- cls, path, controldir, bare, object_store=None, config=None,
- default_branch=None, symlinks: Optional[bool] = None):
+ cls,
+ path,
+ controldir,
+ bare,
+ object_store=None,
+ config=None,
+ default_branch=None,
+ symlinks: Optional[bool] = None,
+ ):
for d in BASE_DIRECTORIES:
os.mkdir(os.path.join(controldir, *d))
if object_store is None:
if default_branch is None:
if config is None:
from .config import StackedConfig
+
config = StackedConfig.default()
try:
default_branch = config.get("init", "defaultBranch")
return ret
@classmethod
- def init(cls, path: str, *, mkdir: bool = False, config=None, default_branch=None, symlinks: Optional[bool] = None) -> "Repo":
+ def init(
+ cls,
+ path: str,
+ *,
+ mkdir: bool = False,
+ config=None,
+ default_branch=None,
+ symlinks: Optional[bool] = None,
+ ) -> "Repo":
"""Create a new repository.
Args:
os.mkdir(controldir)
_set_filesystem_hidden(controldir)
return cls._init_maybe_bare(
- path, controldir, False, config=config,
+ path,
+ controldir,
+ False,
+ config=config,
default_branch=default_branch,
- symlinks=symlinks)
+ symlinks=symlinks,
+ )
@classmethod
def _init_new_working_directory(cls, path, main_repo, identifier=None, mkdir=False):
return r
@classmethod
- def init_bare(cls, path, *, mkdir=False, object_store=None, config=None, default_branch=None):
+ def init_bare(
+ cls, path, *, mkdir=False, object_store=None, config=None, default_branch=None
+ ):
"""Create a new bare repository.
``path`` should already exist and be an empty directory.
"""
if mkdir:
os.mkdir(path)
- return cls._init_maybe_bare(path, path, True, object_store=object_store, config=config, default_branch=default_branch)
+ return cls._init_maybe_bare(
+ path,
+ path,
+ True,
+ object_store=object_store,
+ config=config,
+ default_branch=default_branch,
+ )
create = init_bare
blob - 015948122c917e22694a4f535a82431e6f57609a
blob + 7a8c0ad512ce8cebd76f55b379a3cbcea9870b65
--- dulwich/server.py
+++ dulwich/server.py
"""
return None
- def find_missing_objects(self, determine_wants, graph_walker, progress, get_tagged=None):
+ def find_missing_objects(
+ self, determine_wants, graph_walker, progress, get_tagged=None
+ ):
"""Yield the objects required for a list of commits.
Args:
class UploadPackHandler(PackHandler):
"""Protocol handler for uploading a pack to the client."""
- def __init__(self, backend, args, proto, stateless_rpc=False, advertise_refs=False) -> None:
- super().__init__(
- backend, proto, stateless_rpc=stateless_rpc
- )
+ def __init__(
+ self, backend, args, proto, stateless_rpc=False, advertise_refs=False
+ ) -> None:
+ super().__init__(backend, proto, stateless_rpc=stateless_rpc)
self.repo = backend.open_repository(args[0])
self._graph_walker = None
self.advertise_refs = advertise_refs
# The provided haves are processed, and it is safe to send side-
# band data now.
if not self.has_capability(CAPABILITY_NO_PROGRESS):
- self.progress = partial(self.proto.write_sideband, SIDE_BAND_CHANNEL_PROGRESS)
+ self.progress = partial(
+ self.proto.write_sideband, SIDE_BAND_CHANNEL_PROGRESS
+ )
- self.write_pack_data = partial(self.proto.write_sideband, SIDE_BAND_CHANNEL_DATA)
+ self.write_pack_data = partial(
+ self.proto.write_sideband, SIDE_BAND_CHANNEL_DATA
+ )
else:
self.write_pack_data = self.proto.write
("counting objects: %d, done.\n" % len(object_ids)).encode("ascii")
)
- write_pack_from_container(self.write_pack_data, self.repo.object_store, object_ids)
+ write_pack_from_container(
+ self.write_pack_data, self.repo.object_store, object_ids
+ )
# we are done
self.proto.write_pkt_line(None)
any calls to next() or ack() are made.
"""
- def __init__(self, handler, object_store: ObjectContainer, get_peeled, get_symrefs) -> None:
+ def __init__(
+ self, handler, object_store: ObjectContainer, get_peeled, get_symrefs
+ ) -> None:
self.handler = handler
self.store: ObjectContainer = object_store
self.get_peeled = get_peeled
# logic.
continue
if i == 0:
- logger.info(
- "Sending capabilities: %s", self.handler.capabilities())
- line = format_ref_line(
- ref, sha,
+ logger.info("Sending capabilities: %s", self.handler.capabilities())
+ line = format_ref_line(
+ ref,
+ sha,
self.handler.capabilities()
- + symref_capabilities(symrefs.items()))
+ + symref_capabilities(symrefs.items()),
+ )
else:
line = format_ref_line(ref, sha)
self.proto.write_pkt_line(line)
if peeled_sha != sha:
self.proto.write_pkt_line(
- format_ref_line(ref + PEELED_TAG_SUFFIX, peeled_sha))
+ format_ref_line(ref + PEELED_TAG_SUFFIX, peeled_sha)
+ )
# i'm done..
self.proto.write_pkt_line(None)
class ReceivePackHandler(PackHandler):
"""Protocol handler for downloading a pack from the client."""
- def __init__(self, backend, args, proto, stateless_rpc=False, advertise_refs=False) -> None:
- super().__init__(
- backend, proto, stateless_rpc=stateless_rpc
- )
+ def __init__(
+ self, backend, args, proto, stateless_rpc=False, advertise_refs=False
+ ) -> None:
+ super().__init__(backend, proto, stateless_rpc=stateless_rpc)
self.repo = backend.open_repository(args[0])
self.advertise_refs = advertise_refs
if output:
self.proto.write_sideband(SIDE_BAND_CHANNEL_PROGRESS, output)
except HookError as err:
- self.proto.write_sideband(SIDE_BAND_CHANNEL_FATAL, str(err).encode('utf-8'))
+ self.proto.write_sideband(SIDE_BAND_CHANNEL_FATAL, str(err).encode("utf-8"))
def handle(self) -> None:
if self.advertise_refs or not self.stateless_rpc:
if not refs:
refs = [(CAPABILITIES_REF, ZERO_SHA)]
- logger.info(
- "Sending capabilities: %s", self.capabilities())
+ logger.info("Sending capabilities: %s", self.capabilities())
self.proto.write_pkt_line(
format_ref_line(
- refs[0][0], refs[0][1],
- self.capabilities() + symref_capabilities(symrefs)))
+ refs[0][0],
+ refs[0][1],
+ self.capabilities() + symref_capabilities(symrefs),
+ )
+ )
for i in range(1, len(refs)):
ref = refs[i]
self.proto.write_pkt_line(format_ref_line(ref[0], ref[1]))
class TCPGitServer(socketserver.TCPServer):
-
allow_reuse_address = True
serve = socketserver.TCPServer.serve_forever
blob - 0bb6f9fb9a9e6cca276f5c8d0976b01ca184708d
blob + 51624253a8b8bf8740c1a4cf15fb8b962c3039a3
--- dulwich/stash.py
+++ dulwich/stash.py
@property
def _reflog_path(self):
- return os.path.join(
- self._repo.commondir(), "logs", os.fsdecode(self._ref)
- )
+ return os.path.join(self._repo.commondir(), "logs", os.fsdecode(self._ref))
def stashes(self):
try:
message=b"Index stash",
merge_heads=[self._repo.head()],
no_verify=True,
- **commit_kwargs
+ **commit_kwargs,
)
# Then, the working tree one.
message=message,
merge_heads=[index_commit_id],
no_verify=True,
- **commit_kwargs
+ **commit_kwargs,
)
return cid
blob - fbe316f8d01d624f34fe5a93c371c7a01d282066
blob + 08968b8fb90f758ae4bf19bedab290e406df5e65
--- dulwich/tests/__init__.py
+++ dulwich/tests/__init__.py
"""Tests for Dulwich."""
__all__ = [
- 'SkipTest',
- 'TestCase',
- 'BlackboxTestCase',
- 'skipIf',
- 'expectedFailure',
+ "SkipTest",
+ "TestCase",
+ "BlackboxTestCase",
+ "skipIf",
+ "expectedFailure",
]
import doctest
package="dulwich.tests",
setUp=setup,
tearDown=teardown,
- *tutorial_files
+ *tutorial_files,
)
blob - 5fdef39db1a6dcd0183d680f87a14f220d2054e6
blob + 201d2163706eebbe5b04038bf47a7a16026ff2b3
--- dulwich/tests/compat/test_client.py
+++ dulwich/tests/compat/test_client.py
def test_send_remove_branch(self):
# This test fails intermittently on my machine, probably due to some sort
# of race condition. Probably also related to #1015
- self.skipTest('skip flaky test; see #1015')
+ self.skipTest("skip flaky test; see #1015")
class TestSSHVendor:
if self.command.lower() == "post":
if nbytes > 0:
data = self.rfile.read(nbytes)
- elif self.headers.get('transfer-encoding') == 'chunked':
+ elif self.headers.get("transfer-encoding") == "chunked":
chunks = []
while True:
line = self.rfile.readline()
chunks.append(chunk[:-2])
if length == 0:
break
- data = b''.join(chunks)
+ data = b"".join(chunks)
env["CONTENT_LENGTH"] = str(len(data))
else:
raise AssertionError
class HTTPGitServer(http.server.HTTPServer):
-
allow_reuse_address = True
def __init__(self, server_address, root_path) -> None:
class DulwichHttpClientTest(CompatTestCase, DulwichClientTestBase):
-
min_git_version = (1, 7, 0, 2)
def setUp(self):
blob - 1bd9f55e7b0af1050019d1a825cdbe97be93c32e
blob + 61731bb430e770a1fecbded55c4bc997d3da0c94
--- dulwich/tests/compat/test_pack.py
+++ dulwich/tests/compat/test_pack.py
orig_blob = orig_pack[a_sha]
new_blob = Blob()
new_blob.data = orig_blob.data + b"x"
- all_to_pack = [(o, None) for o in orig_pack.iterobjects()] + [(new_blob, None)]
+ all_to_pack = [(o, None) for o in orig_pack.iterobjects()] + [
+ (new_blob, None)
+ ]
pack_path = os.path.join(self._tempdir, "pack_with_deltas")
write_pack(pack_path, all_to_pack, deltify=True)
output = run_git_or_fail(["verify-pack", "-v", pack_path])
with self.get_pack(pack1_sha) as orig_pack:
orig_blob = orig_pack[a_sha]
new_blob = Blob()
- new_blob.data = orig_blob.data + (b"x" * 2 ** 20)
+ new_blob.data = orig_blob.data + (b"x" * 2**20)
new_blob_2 = Blob()
new_blob_2.data = new_blob.data + b"y"
all_to_pack = list(orig_pack.pack_tuples()) + [
raise SkipTest("skipping slow, large test")
with self.get_pack(pack1_sha) as orig_pack:
new_blob = Blob()
- new_blob.data = "big blob" + ("x" * 2 ** 25)
+ new_blob.data = "big blob" + ("x" * 2**25)
new_blob_2 = Blob()
new_blob_2.data = new_blob.data + "y"
all_to_pack = list(orig_pack.pack_tuples()) + [
blob - 97e441612c8c3f05c919408b08913f305ef471dc
blob + 5f81e137166827f3e0c0019e93c17d4972c548c4
--- dulwich/tests/compat/test_porcelain.py
+++ dulwich/tests/compat/test_porcelain.py
from .utils import CompatTestCase, run_git_or_fail
-@skipIf(platform.python_implementation() == "PyPy" or sys.platform == "win32", "gpgme not easily available or supported on Windows and PyPy")
+@skipIf(
+ platform.python_implementation() == "PyPy" or sys.platform == "win32",
+ "gpgme not easily available or supported on Windows and PyPy",
+)
class TagCreateSignTestCase(PorcelainGpgTestCase, CompatTestCase):
def setUp(self):
super().setUp()
)
run_git_or_fail(
- [
- f"--git-dir={self.repo.controldir()}",
- "tag",
- "-v",
- "tryme"
- ],
- env={'GNUPGHOME': os.environ['GNUPGHOME']},
+ [f"--git-dir={self.repo.controldir()}", "tag", "-v", "tryme"],
+ env={"GNUPGHOME": os.environ["GNUPGHOME"]},
)
def test_verify(self):
"verifyme",
],
env={
- 'GNUPGHOME': os.environ['GNUPGHOME'],
- 'GIT_COMMITTER_NAME': 'Joe Example',
- 'GIT_COMMITTER_EMAIL': 'joe@example.com',
+ "GNUPGHOME": os.environ["GNUPGHOME"],
+ "GIT_COMMITTER_NAME": "Joe Example",
+ "GIT_COMMITTER_EMAIL": "joe@example.com",
},
)
tag = self.repo[b"refs/tags/verifyme"]
blob - f1390fdb9fb6a022d655dc1d9aa3dac9987eef39
blob + 4904a33d301b7f74fb5f4a300675e5b0b51ea904
--- dulwich/tests/compat/test_repository.py
+++ dulwich/tests/compat/test_repository.py
# Read the config values in the worktree with the git cli and assert they match
# the dulwich-parsed configs
- output_name = run_git_or_fail(["config", "user.name"], cwd=self._mainworktree_repo.path).decode().rstrip("\n")
- output_email = run_git_or_fail(["config", "user.email"], cwd=self._mainworktree_repo.path).decode().rstrip("\n")
+ output_name = (
+ run_git_or_fail(["config", "user.name"], cwd=self._mainworktree_repo.path)
+ .decode()
+ .rstrip("\n")
+ )
+ output_email = (
+ run_git_or_fail(["config", "user.email"], cwd=self._mainworktree_repo.path)
+ .decode()
+ .rstrip("\n")
+ )
self.assertEqual(test_name, output_name)
self.assertEqual(test_email, output_email)
blob - 378c6f57b12084cf27f1d9fb9aa926b68114cba0
blob + 65afe5a36f31a56acb031dc859a0b33d987d3557
--- dulwich/tests/compat/test_utils.py
+++ dulwich/tests/compat/test_utils.py
def run_git(args, **unused_kwargs):
self.assertEqual(["--version"], args)
- return 0, self._version_str, ''
+ return 0, self._version_str, ""
utils.run_git = run_git
blob - 3e1c451f6c6d3ff59d3c20b83ffa6301894b86fa
blob + 0cc8137be5b7a202dc0ce99ebe1e1dffc9a8777f
--- dulwich/tests/compat/utils.py
+++ dulwich/tests/compat/utils.py
_VERSION_LEN = 4
_REPOS_DATA_DIR = os.path.abspath(
os.path.join(
- os.path.dirname(__file__), os.pardir, os.pardir, os.pardir,
- "testdata", "repos")
+ os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, "testdata", "repos"
+ )
)
"""
found_version = git_version(git_path=git_path)
if found_version is None:
- raise SkipTest(
- f"Test requires git >= {required_version}, but c git not found"
- )
+ raise SkipTest(f"Test requires git >= {required_version}, but c git not found")
if len(required_version) > _VERSION_LEN:
raise ValueError(
def run_git(
- args, git_path=_DEFAULT_GIT, input=None, capture_stdout=False,
- capture_stderr=False, **popen_kwargs
+ args,
+ git_path=_DEFAULT_GIT,
+ input=None,
+ capture_stdout=False,
+ capture_stderr=False,
+ **popen_kwargs,
):
"""Run a git command.
if "stderr" not in popen_kwargs:
popen_kwargs["stderr"] = subprocess.STDOUT
returncode, stdout, stderr = run_git(
- args, git_path=git_path, input=input, capture_stdout=True,
- capture_stderr=True, **popen_kwargs
+ args,
+ git_path=git_path,
+ input=input,
+ capture_stdout=True,
+ capture_stderr=True,
+ **popen_kwargs,
)
if returncode != 0:
raise AssertionError(
- "git with args %r failed with %d: stdout=%r stderr=%r" % (args, returncode, stdout, stderr)
+ "git with args %r failed with %d: stdout=%r stderr=%r"
+ % (args, returncode, stdout, stderr)
)
return stdout
blob - 5f6ef31fd0805b2606be48b30ae33bde00126025
blob + bdcd62b39957efcb05719706ae555097d3f370f6
--- dulwich/tests/test_client.py
+++ dulwich/tests/test_client.py
b"0000"
)
self.rin.seek(0)
- ret = self.client.fetch_pack(b"bla", lambda heads, **kwargs: [], None, None, None)
+ ret = self.client.fetch_pack(
+ b"bla", lambda heads, **kwargs: [], None, None, None
+ )
self.assertEqual(
{b"HEAD": b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"}, ret.refs
)
result_repo = c.clone(s.path, target, mkdir=False)
self.addCleanup(result_repo.close)
expected = dict(s.get_refs())
- expected[b'refs/remotes/origin/HEAD'] = expected[b'HEAD']
- expected[b'refs/remotes/origin/master'] = expected[b'refs/heads/master']
+ expected[b"refs/remotes/origin/HEAD"] = expected[b"HEAD"]
+ expected[b"refs/remotes/origin/master"] = expected[b"refs/heads/master"]
self.assertEqual(expected, result_repo.get_refs())
def test_fetch_empty(self):
self.assertEqual(c._password, None)
basic_auth = c.pool_manager.headers["authorization"]
- auth_string = username.encode('ascii') + b":"
+ auth_string = username.encode("ascii") + b":"
b64_credentials = base64.b64encode(auth_string)
expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
self.assertEqual(basic_auth, expected_basic_auth)
def __init__(self) -> None:
self.headers: Dict[str, str] = {}
- def request(self, method, url, fields=None, headers=None, redirect=True, preload_content=True):
+ def request(
+ self,
+ method,
+ url,
+ fields=None,
+ headers=None,
+ redirect=True,
+ preload_content=True,
+ ):
base_url = url[: -len(tail)]
redirect_base_url = test_data[base_url]["location"]
redirect_url = redirect_base_url + tail
def __init__(self) -> None:
self.headers: Dict[str, str] = {}
- def request(self, method, url, fields=None, headers=None, redirect=True, preload_content=True):
+ def request(
+ self,
+ method,
+ url,
+ fields=None,
+ headers=None,
+ redirect=True,
+ preload_content=True,
+ ):
return HTTPResponse(
headers={
"Content-Type": "application/x-git-upload-pack-result; charset=utf-8"
config = ConfigDict()
self.overrideEnv("http_proxy", "http://myproxy:8080")
- self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::/64,192.168.0.0/24,ample.com")
+ self.overrideEnv(
+ "no_proxy", "xyz,abc.def.gh,ff80:1::/64,192.168.0.0/24,ample.com"
+ )
base_url = "http://192.168.0.10/path/port"
manager = default_urllib3_manager(config=config, base_url=base_url)
self.assertNotIsInstance(manager, urllib3.ProxyManager)
config = ConfigDict()
self.overrideEnv("http_proxy", "http://myproxy:8080")
- self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.0/24,ff80:1::/64,ample.com")
+ self.overrideEnv(
+ "no_proxy", "xyz,abc.def.gh,192.168.0.0/24,ff80:1::/64,ample.com"
+ )
base_url = "http://[ff80:1::affe]/path/port"
manager = default_urllib3_manager(config=config, base_url=base_url)
self.assertNotIsInstance(manager, urllib3.ProxyManager)
blob - a0504779241d7815df318aab87762de1fe8b12c8
blob + 7857bb134cbe6f5e9951c618169b06c470fd53e5
--- dulwich/tests/test_config.py
+++ dulwich/tests/test_config.py
def test_from_file_multiple(self):
cf = self.from_file(b"[core]\nfoo = bar\nfoo = blah\n")
self.assertEqual([b"bar", b"blah"], list(cf.get_multivar((b"core",), b"foo")))
- self.assertEqual([], list(cf.get_multivar((b"core", ), b"blah")))
+ self.assertEqual([], list(cf.get_multivar((b"core",), b"blah")))
def test_from_file_utf8_bom(self):
text = "[core]\nfoo = b\u00e4r\n".encode("utf-8-sig")
cf = self.from_file(
b"[alias]\r\n"
b"c = '!f() { \\\r\n"
- b" printf '[git commit -m \\\"%s\\\"]\\n' \\\"$*\\\" && \\\r\n"
- b" git commit -m \\\"$*\\\"; \\\r\n"
- b" }; f'\r\n")
- self.assertEqual(list(cf.sections()), [(b'alias', )])
+ b' printf \'[git commit -m \\"%s\\"]\\n\' \\"$*\\" && \\\r\n'
+ b' git commit -m \\"$*\\"; \\\r\n'
+ b" }; f'\r\n"
+ )
+ self.assertEqual(list(cf.sections()), [(b"alias",)])
self.assertEqual(
- b'\'!f() { printf \'[git commit -m "%s"]\n\' '
- b'"$*" && git commit -m "$*"',
- cf.get((b"alias", ), b"c"))
+ b"'!f() { printf '[git commit -m \"%s\"]\n' " b'"$*" && git commit -m "$*"',
+ cf.get((b"alias",), b"c"),
+ )
def test_quoted(self):
cf = self.from_file(
def test_none(self):
config = ConfigDict()
self.assertEqual(
- 'https://example.com/', apply_instead_of(config, 'https://example.com/'))
+ "https://example.com/", apply_instead_of(config, "https://example.com/")
+ )
def test_apply(self):
config = ConfigDict()
- config.set(
- ('url', 'https://samba.org/'), 'insteadOf', 'https://example.com/')
+ config.set(("url", "https://samba.org/"), "insteadOf", "https://example.com/")
self.assertEqual(
- 'https://samba.org/',
- apply_instead_of(config, 'https://example.com/'))
+ "https://samba.org/", apply_instead_of(config, "https://example.com/")
+ )
def test_apply_multiple(self):
config = ConfigDict()
- config.set(
- ('url', 'https://samba.org/'), 'insteadOf', 'https://blah.com/')
- config.set(
- ('url', 'https://samba.org/'), 'insteadOf', 'https://example.com/')
+ config.set(("url", "https://samba.org/"), "insteadOf", "https://blah.com/")
+ config.set(("url", "https://samba.org/"), "insteadOf", "https://example.com/")
self.assertEqual(
- [b'https://blah.com/', b'https://example.com/'],
- list(config.get_multivar(('url', 'https://samba.org/'), 'insteadOf')))
+ [b"https://blah.com/", b"https://example.com/"],
+ list(config.get_multivar(("url", "https://samba.org/"), "insteadOf")),
+ )
self.assertEqual(
- 'https://samba.org/',
- apply_instead_of(config, 'https://example.com/'))
+ "https://samba.org/", apply_instead_of(config, "https://example.com/")
+ )
blob - fb183bafd71cfb6e237ee7d064e74238c704fad5
blob + 3f06bef8b180b19b246a0e42b444bf011ba63514
--- dulwich/tests/test_credentials.py
+++ dulwich/tests/test_credentials.py
class TestCredentialHelpersUtils(TestCase):
-
def test_match_urls(self):
url = urlparse("https://github.com/jelmer/dulwich/")
url_1 = urlparse("https://github.com/jelmer/dulwich")
config.set(b"credential", b"helper", "bar")
self.assertEqual(
- list(urlmatch_credential_sections(config, "https://github.com")), [
+ list(urlmatch_credential_sections(config, "https://github.com")),
+ [
(b"credential", b"https://github.com"),
(b"credential",),
- ])
+ ],
+ )
self.assertEqual(
- list(urlmatch_credential_sections(config, "https://git.sr.ht")), [
+ list(urlmatch_credential_sections(config, "https://git.sr.ht")),
+ [
(b"credential", b"git.sr.ht"),
(b"credential",),
- ])
+ ],
+ )
self.assertEqual(
- list(urlmatch_credential_sections(config, "missing_url")), [
- (b"credential",)])
+ list(urlmatch_credential_sections(config, "missing_url")),
+ [(b"credential",)],
+ )
blob - a78b2c868b700a98341ab2287377a6fd3bc0937a
blob + 34517cc5404ddc4254187f2573f817a1083256a7
--- dulwich/tests/test_hooks.py
+++ dulwich/tests/test_hooks.py
hook.execute()
def test_hook_commit_msg(self):
-
repo_dir = os.path.join(tempfile.mkdtemp())
os.mkdir(os.path.join(repo_dir, "hooks"))
self.addCleanup(shutil.rmtree, repo_dir)
hook.execute(b"empty commit")
def test_hook_post_commit(self):
-
(fd, path) = tempfile.mkstemp()
os.close(fd)
blob - 13d073596b41169abe203882b9714d3101852daa
blob + f9b6b6025149a40b7b5a0c012bd192d6f458ebbf
--- dulwich/tests/test_ignore.py
+++ dulwich/tests/test_ignore.py
class TranslateTests(TestCase):
def test_translate(self):
- for (pattern, regex) in TRANSLATE_TESTS:
+ for pattern, regex in TRANSLATE_TESTS:
if re.escape(b"/") == b"/":
# Slash is no longer escaped in Python3.7, so undo the escaping
# in the expected return value..
class MatchPatternTests(TestCase):
def test_matches(self):
- for (path, pattern) in POSITIVE_MATCH_TESTS:
+ for path, pattern in POSITIVE_MATCH_TESTS:
self.assertTrue(
match_pattern(path, pattern),
f"path: {path!r}, pattern: {pattern!r}",
)
def test_no_matches(self):
- for (path, pattern) in NEGATIVE_MATCH_TESTS:
+ for path, pattern in NEGATIVE_MATCH_TESTS:
self.assertFalse(
match_pattern(path, pattern),
f"path: {path!r}, pattern: {pattern!r}",
self.addCleanup(shutil.rmtree, tmp_dir)
repo = Repo.init(tmp_dir)
- with open(os.path.join(repo.path, '.gitignore'), 'wb') as f:
- f.write(b'/*\n')
- f.write(b'!/foo\n')
-
- os.mkdir(os.path.join(repo.path, 'foo'))
- with open(os.path.join(repo.path, 'foo', '.gitignore'), 'wb') as f:
- f.write(b'/bar\n')
+ with open(os.path.join(repo.path, ".gitignore"), "wb") as f:
+ f.write(b"/*\n")
+ f.write(b"!/foo\n")
- with open(os.path.join(repo.path, 'foo', 'bar'), 'wb') as f:
- f.write(b'IGNORED')
+ os.mkdir(os.path.join(repo.path, "foo"))
+ with open(os.path.join(repo.path, "foo", ".gitignore"), "wb") as f:
+ f.write(b"/bar\n")
+ with open(os.path.join(repo.path, "foo", "bar"), "wb") as f:
+ f.write(b"IGNORED")
+
m = IgnoreFilterManager.from_repo(repo)
- self.assertTrue(m.is_ignored('foo/bar'))
+ self.assertTrue(m.is_ignored("foo/bar"))
def test_load_ignore_ignorecase(self):
tmp_dir = tempfile.mkdtemp()
blob - afd599bd53dce72edf45bd703a051615de1fe6e1
blob + b9a72b5dbaa40dc9f92cff3a3bde3282260d8de1
--- dulwich/tests/test_index.py
+++ dulwich/tests/test_index.py
class IndexTestCase(TestCase):
-
datadir = os.path.join(os.path.dirname(__file__), "../../testdata/indexes")
def get_simple_index(self, name):
0,
b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
0,
- 0)
+ 0,
+ )
)
]
filename = os.path.join(self.tempdir, "test-simple-write-index")
class ReadIndexDictTests(IndexTestCase):
-
def setUp(self):
IndexTestCase.setUp(self)
self.tempdir = tempfile.mkdtemp()
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Populate repo
filea = Blob.from_string(b"file a")
filee = Blob.from_string(b"d")
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Populate repo
filea = Blob.from_string(b"file a")
fileb = Blob.from_string(b"file b")
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Populate repo
filed = Blob.from_string(b"file d")
filee = Blob.from_string(b"d")
repo_dir_bytes = os.fsencode(repo_dir)
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Populate repo
file = Blob.from_string(b"foo")
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Commit a dummy file then modify it
foo1_fullpath = os.path.join(repo_dir, "foo1")
with open(foo1_fullpath, "wb") as f:
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Commit a dummy file then remove it
foo1_fullpath = os.path.join(repo_dir, "foo1")
with open(foo1_fullpath, "wb") as f:
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Commit a dummy file then modify it
foo1_fullpath = os.path.join(repo_dir, "foo1")
with open(foo1_fullpath, "wb") as f:
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
with Repo.init(repo_dir) as repo:
-
# Commit a dummy file then modify it
foo1_fullpath = os.path.join(repo_dir, "foo1")
with open(foo1_fullpath, "wb") as f:
blob - fdd2a55a0f842f7f81a2f73742d296c41bde9493
blob + f11b205b14eb90d4c9e1673760281ea09a9c7089
--- dulwich/tests/test_missing_obj_finder.py
+++ dulwich/tests/test_missing_obj_finder.py
def assertMissingMatch(self, haves, wants, expected):
for sha, path in MissingObjectFinder(self.store, haves, wants, shallow=set()):
self.assertIn(
- sha,
- expected,
- f"({sha},{path}) erroneously reported as missing"
+ sha, expected, f"({sha},{path}) erroneously reported as missing"
)
expected.remove(sha)
haves = [self.cmt(1).id]
wants = [self.cmt(3).id, bogus_sha]
self.assertRaises(
- KeyError, MissingObjectFinder, self.store, haves, wants, shallow=set())
+ KeyError, MissingObjectFinder, self.store, haves, wants, shallow=set()
+ )
def test_no_changes(self):
self.assertMissingMatch([self.cmt(3).id], [self.cmt(3).id], [])
blob - a2278b513402fd434a79127f2f007b64dcf9377d
blob + 0dc9bea66147f5b6cd3e5a7041d19069a08a1330
--- dulwich/tests/test_object_store.py
+++ dulwich/tests/test_object_store.py
self.store.add_object(testobject)
refs = {b"refs/heads/foo": testobject.id}
with patch.object(self.store, "_get_depth", return_value=1) as m:
+ self.assertEqual([], self.store.determine_wants_all(refs, depth=0))
self.assertEqual(
- [], self.store.determine_wants_all(refs, depth=0)
- )
- self.assertEqual(
[testobject.id],
self.store.determine_wants_all(refs, depth=DEPTH_INFINITE),
)
m.assert_not_called()
- self.assertEqual(
- [], self.store.determine_wants_all(refs, depth=1)
- )
+ self.assertEqual([], self.store.determine_wants_all(refs, depth=1))
m.assert_called_with(testobject.id)
self.assertEqual(
[testobject.id], self.store.determine_wants_all(refs, depth=2)
)
def test_get_depth(self):
- self.assertEqual(
- 0, self.store._get_depth(testobject.id)
- )
+ self.assertEqual(0, self.store._get_depth(testobject.id))
self.store.add_object(testobject)
self.assertEqual(
def test_lookup_submodule(self):
tree_lookup_path(self.get_object, self.tree_id, b"d")[1]
self.assertRaises(
- SubmoduleEncountered, tree_lookup_path, self.get_object,
- self.tree_id, b"d/a")
+ SubmoduleEncountered,
+ tree_lookup_path,
+ self.get_object,
+ self.tree_id,
+ b"d/a",
+ )
def test_lookup_nonexistent(self):
self.assertRaises(
blob - 511c54c082e6ddb79ac4993f626f75d017f7bd42
blob + 7fe1bec88c3d3601b5fdb7ec7d68fb04518c249f
--- dulwich/tests/test_objects.py
+++ dulwich/tests/test_objects.py
)
check_identity(b" <dborowitz@google.com>", "failed to check good identity")
self.assertRaises(
- ObjectFormatException, check_identity, b'<dborowitz@google.com>', 'no space before email'
+ ObjectFormatException,
+ check_identity,
+ b"<dborowitz@google.com>",
+ "no space before email",
)
self.assertRaises(
ObjectFormatException, check_identity, b"Dave Borowitz", "no email"
self.assertRaises(
ObjectFormatException,
check_identity,
- b'Dave<Borowitz <dborowitz@google.com>',
- 'reserved byte in name',
+ b"Dave<Borowitz <dborowitz@google.com>",
+ "reserved byte in name",
)
self.assertRaises(
ObjectFormatException,
check_identity,
- b'Dave>Borowitz <dborowitz@google.com>',
- 'reserved byte in name',
+ b"Dave>Borowitz <dborowitz@google.com>",
+ "reserved byte in name",
)
self.assertRaises(
ObjectFormatException,
check_identity,
- b'Dave\0Borowitz <dborowitz@google.com>',
- 'null byte',
+ b"Dave\0Borowitz <dborowitz@google.com>",
+ "null byte",
)
self.assertRaises(
ObjectFormatException,
check_identity,
- b'Dave\nBorowitz <dborowitz@google.com>',
- 'newline byte',
+ b"Dave\nBorowitz <dborowitz@google.com>",
+ "newline byte",
)
blob - eca8007a6213ccd4c508f7a27f2028c8bf01cfb2
blob + 10015c4ecff99fe88734db60b27417f87f809f6a
--- dulwich/tests/test_objectspec.py
+++ dulwich/tests/test_objectspec.py
def test_from_ref(self):
r = MemoryRepo()
c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
- r.refs[b'refs/heads/foo'] = c1.id
- self.assertEqual(r[c1.tree], parse_tree(r, b'foo'))
+ r.refs[b"refs/heads/foo"] = c1.id
+ self.assertEqual(r[c1.tree], parse_tree(r, b"foo"))
blob - ca87f2a95b81272d6e2204d45976e4c965655482
blob + 394a0eb25956c0b7cfa5648906b7ddadba842df8
--- dulwich/tests/test_pack.py
+++ dulwich/tests/test_pack.py
self.tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tempdir)
- datadir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../testdata/packs"))
+ datadir = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), "../../testdata/packs")
+ )
def get_pack_index(self, sha):
"""Returns a PackIndex from the datadir with the given sha."""
class TestPackDeltas(TestCase):
-
test_string1 = b"The answer was flailing in the wind"
test_string2 = b"The answer was falling down the pipe"
test_string3 = b"zzzzz"
def _test_roundtrip(self, base, target):
self.assertEqual(
- target,
- b"".join(apply_delta(base, list(create_delta(base, target))))
+ target, b"".join(apply_delta(base, list(create_delta(base, target))))
)
def test_nochange(self):
actual = list(p.iter_unpacked())
self.assertEqual(
[
- UnpackedObject(offset=12, pack_type_num=1, decomp_chunks=[commit_data], crc32=None),
- UnpackedObject(offset=138, pack_type_num=2, decomp_chunks=[tree_data], crc32=None),
- UnpackedObject(offset=178, pack_type_num=3, decomp_chunks=[b"test 1\n"], crc32=None),
+ UnpackedObject(
+ offset=12,
+ pack_type_num=1,
+ decomp_chunks=[commit_data],
+ crc32=None,
+ ),
+ UnpackedObject(
+ offset=138,
+ pack_type_num=2,
+ decomp_chunks=[tree_data],
+ crc32=None,
+ ),
+ UnpackedObject(
+ offset=178,
+ pack_type_num=3,
+ decomp_chunks=[b"test 1\n"],
+ crc32=None,
+ ),
],
actual,
)
bad_data = PackData("", file=bad_file)
bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
self.assertRaises(AssertionError, lambda: bad_pack.data)
- self.assertRaises(
- AssertionError, bad_pack.check_length_and_checksum
- )
+ self.assertRaises(AssertionError, bad_pack.check_length_and_checksum)
def test_checksum_mismatch(self):
with self.get_pack_data(pack1_sha) as data:
bad_data = PackData("", file=bad_file)
bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
self.assertRaises(ChecksumMismatch, lambda: bad_pack.data)
- self.assertRaises(
- ChecksumMismatch, bad_pack.check_length_and_checksum
- )
+ self.assertRaises(ChecksumMismatch, bad_pack.check_length_and_checksum)
def test_iterobjects_2(self):
with self.get_pack(pack1_sha) as p:
with self.make_pack(True) as pack:
with PackData(pack._data_path) as data:
data.create_index(
- self.pack_prefix + ".idx", resolve_ext_ref=pack.resolve_ext_ref)
+ self.pack_prefix + ".idx", resolve_ext_ref=pack.resolve_ext_ref
+ )
del self.store[self.blobs[b"bar"].id]
expected = UnpackedObject(
7,
delta_base=b"\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c",
- decomp_chunks=[b'\x03\x07\x90\x03\x041234'],
+ decomp_chunks=[b"\x03\x07\x90\x03\x041234"],
)
expected.offset = 12
got = p.get_unpacked_object(self.blobs[b"foo1234"].id)
expected = UnpackedObject(
7,
delta_base=b"\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c",
- decomp_chunks=[b'\x03\x07\x90\x03\x041234'],
+ decomp_chunks=[b"\x03\x07\x90\x03\x041234"],
)
expected.offset = 12
got = p.get_unpacked_object(self.blobs[b"foo1234"].id)
offset = f.tell()
sha_a = sha1(b"foo")
sha_b = sha_a.copy()
- write_pack_object(f.write, Blob.type_num, b"blob", sha=sha_a, compression_level=6)
+ write_pack_object(
+ f.write, Blob.type_num, b"blob", sha=sha_a, compression_level=6
+ )
self.assertNotEqual(sha_a.digest(), sha_b.digest())
sha_b.update(f.getvalue()[offset:])
self.assertEqual(sha_a.digest(), sha_b.digest())
entry2_sha = hex_to_sha("e98f071751bd77f59967bfa671cd2caebdccc9a2")
entries = [
(entry1_sha, 0xF2972D0830529B87, 24),
- (entry2_sha, (~0xF2972D0830529B87) & (2 ** 64 - 1), 92),
+ (entry2_sha, (~0xF2972D0830529B87) & (2**64 - 1), 92),
]
if not self._supports_large:
self.assertRaises(
class ReadZlibTests(TestCase):
-
decomp = (
b"tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n"
b"parent None\n"
def setUp(self):
super().setUp()
self.read = BytesIO(self.comp + self.extra).read
- self.unpacked = UnpackedObject(Tree.type_num, decomp_len=len(self.decomp), crc32=0)
+ self.unpacked = UnpackedObject(
+ Tree.type_num, decomp_len=len(self.decomp), crc32=0
+ )
def test_decompress_size(self):
good_decomp_len = len(self.decomp)
def test_single(self):
b = Blob.from_string(b"foo")
self.assertEqual(
- [UnpackedObject(b.type_num, sha=b.sha().digest(), delta_base=None, decomp_chunks=b.as_raw_chunks())],
+ [
+ UnpackedObject(
+ b.type_num,
+ sha=b.sha().digest(),
+ delta_base=None,
+ decomp_chunks=b.as_raw_chunks(),
+ )
+ ],
list(deltify_pack_objects([(b, b"")])),
)
delta = list(create_delta(b1.as_raw_chunks(), b2.as_raw_chunks()))
self.assertEqual(
[
- UnpackedObject(b1.type_num, sha=b1.sha().digest(), delta_base=None, decomp_chunks=b1.as_raw_chunks()),
- UnpackedObject(b2.type_num, sha=b2.sha().digest(), delta_base=b1.sha().digest(), decomp_chunks=delta),
+ UnpackedObject(
+ b1.type_num,
+ sha=b1.sha().digest(),
+ delta_base=None,
+ decomp_chunks=b1.as_raw_chunks(),
+ ),
+ UnpackedObject(
+ b2.type_num,
+ sha=b2.sha().digest(),
+ delta_base=b1.sha().digest(),
+ decomp_chunks=delta,
+ ),
],
list(deltify_pack_objects([(b1, b""), (b2, b"")])),
)
unpacked_delta.delta_base,
)
delta = create_delta(b"blob", b"blob1")
- self.assertEqual(b''.join(delta), b"".join(unpacked_delta.decomp_chunks))
+ self.assertEqual(b"".join(delta), b"".join(unpacked_delta.decomp_chunks))
self.assertEqual(entries[1][4], unpacked_delta.crc32)
def test_read_objects_buffered(self):
class TestPackIterator(DeltaChainIterator):
-
_compute_crc32 = True
def __init__(self, *args, **kwargs) -> None:
"Attempted to re-inflate offset %i" % offset
)
self._unpacked_offsets.add(offset)
- return super()._resolve_object(
- offset, pack_type_num, base_chunks
- )
+ return super()._resolve_object(offset, pack_type_num, base_chunks)
class DeltaChainIteratorTests(TestCase):
"""Wrapper around store.get_raw that doesn't allow repeat lookups."""
hex_sha = sha_to_hex(bin_sha)
self.assertNotIn(
- hex_sha,
- self.fetched,
- "Attempted to re-fetch object %s" % hex_sha
+ hex_sha, self.fetched, "Attempted to re-fetch object %s" % hex_sha
)
self.fetched.add(hex_sha)
return self.store.get_raw(hex_sha)
assert data
index = MemoryPackIndex.for_pack(data)
pack = Pack.from_objects(data, index)
- return TestPackIterator.for_pack_subset(pack, subset, resolve_ext_ref=resolve_ext_ref)
+ return TestPackIterator.for_pack_subset(
+ pack, subset, resolve_ext_ref=resolve_ext_ref
+ )
def assertEntriesMatch(self, expected_indexes, entries, pack_iter):
expected = [entries[i] for i in expected_indexes]
f.seek(0)
self.assertEntriesMatch([], entries, self.make_pack_iter_subset(f, []))
f.seek(0)
- self.assertEntriesMatch([1, 0], entries, self.make_pack_iter_subset(f, [entries[0][3], entries[1][3]]))
+ self.assertEntriesMatch(
+ [1, 0],
+ entries,
+ self.make_pack_iter_subset(f, [entries[0][3], entries[1][3]]),
+ )
f.seek(0)
self.assertEntriesMatch(
- [1, 0], entries, self.make_pack_iter_subset(f, [sha_to_hex(entries[0][3]), sha_to_hex(entries[1][3])]))
+ [1, 0],
+ entries,
+ self.make_pack_iter_subset(
+ f, [sha_to_hex(entries[0][3]), sha_to_hex(entries[1][3])]
+ ),
+ )
def test_ofs_deltas(self):
f = BytesIO()
self.assertEntriesMatch([0, 2, 1], entries, self.make_pack_iter(f))
f.seek(0)
self.assertEntriesMatch(
- [0, 2, 1], entries,
- self.make_pack_iter_subset(f, [entries[1][3], entries[2][3]]))
+ [0, 2, 1],
+ entries,
+ self.make_pack_iter_subset(f, [entries[1][3], entries[2][3]]),
+ )
def test_ofs_deltas_chain(self):
f = BytesIO()
(OFS_DELTA, (0, b"blob1")),
(OFS_DELTA, (1, b"blob3")),
(OFS_DELTA, (0, b"bob")),
- ])
+ ],
+ )
# Delta resolution changed to DFS
self.assertEntriesMatch([0, 4, 2, 1, 3], entries, self.make_pack_iter(f))
blob - c6d4ec20444ff9b8737a70fbfb5ab67efbea10e9
blob + adda941b55b816290239709b5465b0cab4526f15
--- dulwich/tests/test_porcelain.py
+++ dulwich/tests/test_porcelain.py
# (e.g. the gpg-agent socket having been deleted). See
# https://github.com/jelmer/dulwich/issues/1000
self.addCleanup(shutil.rmtree, self.gpg_dir, ignore_errors=True)
- self.overrideEnv('GNUPGHOME', self.gpg_dir)
+ self.overrideEnv("GNUPGHOME", self.gpg_dir)
def import_default_key(self):
subprocess.run(
self.assertEqual(commit._commit_timezone, local_timezone)
-@skipIf(platform.python_implementation() == "PyPy" or sys.platform == "win32", "gpgme not easily available or supported on Windows and PyPy")
+@skipIf(
+ platform.python_implementation() == "PyPy" or sys.platform == "win32",
+ "gpgme not easily available or supported on Windows and PyPy",
+)
class CommitSignTests(PorcelainGpgTestCase):
-
def test_default_key(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]]
class TimezoneTests(PorcelainTestCase):
-
def put_envs(self, value):
self.overrideEnv("GIT_AUTHOR_DATE", value)
self.overrideEnv("GIT_COMMITTER_DATE", value)
self.put_envs("0 +0500")
self.overrideEnv("GIT_AUTHOR_DATE", None)
self.overrideEnv("GIT_COMMITTER_DATE", None)
- self.assertTupleEqual((local_timezone, local_timezone), porcelain.get_user_timezones())
+ self.assertTupleEqual(
+ (local_timezone, local_timezone), porcelain.get_user_timezones()
+ )
class CleanTests(PorcelainTestCase):
self.assertEqual(c1.id, target_repo.refs[b"refs/heads/else"])
self.assertEqual(c1.id, target_repo.refs[b"HEAD"])
self.assertEqual(
- {b"HEAD": b"refs/heads/else", b"refs/remotes/origin/HEAD": b"refs/remotes/origin/else"},
+ {
+ b"HEAD": b"refs/heads/else",
+ b"refs/remotes/origin/HEAD": b"refs/remotes/origin/else",
+ },
target_repo.refs.get_symrefs(),
)
)
-@skipIf(platform.python_implementation() == "PyPy" or sys.platform == "win32", "gpgme not easily available or supported on Windows and PyPy")
+@skipIf(
+ platform.python_implementation() == "PyPy" or sys.platform == "win32",
+ "gpgme not easily available or supported on Windows and PyPy",
+)
class TagCreateSignTests(PorcelainGpgTestCase):
-
def test_default_key(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]]
self.assertEqual(b"foo <foo@bar.com>", tag.tagger)
self.assertEqual(b"bar\n", tag.message)
self.assertRecentTimestamp(tag.tag_time)
- tag = self.repo[b'refs/tags/tryme']
+ tag = self.repo[b"refs/tags/tryme"]
# GPG Signatures aren't deterministic, so we can't do a static assertion.
tag.verify()
tag.verify(keyids=[PorcelainGpgTestCase.DEFAULT_KEY_ID])
self.assertEqual(b"foo <foo@bar.com>", tag.tagger)
self.assertEqual(b"bar\n", tag.message)
self.assertRecentTimestamp(tag.tag_time)
- tag = self.repo[b'refs/tags/tryme']
+ tag = self.repo[b"refs/tags/tryme"]
# GPG Signatures aren't deterministic, so we can't do a static assertion.
tag.verify()
class TagCreateTests(PorcelainTestCase):
-
def test_annotated(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]]
class ResetFileTests(PorcelainTestCase):
-
def test_reset_modify_file_to_commit(self):
- file = 'foo'
+ file = "foo"
full_path = os.path.join(self.repo.path, file)
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self.repo, paths=[full_path])
sha = porcelain.commit(
self.repo,
committer=b"Jane <jane@example.com>",
author=b"John <john@example.com>",
)
- with open(full_path, 'a') as f:
- f.write('something new')
+ with open(full_path, "a") as f:
+ f.write("something new")
porcelain.reset_file(self.repo, file, target=sha)
with open(full_path) as f:
- self.assertEqual('hello', f.read())
+ self.assertEqual("hello", f.read())
def test_reset_remove_file_to_commit(self):
- file = 'foo'
+ file = "foo"
full_path = os.path.join(self.repo.path, file)
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self.repo, paths=[full_path])
sha = porcelain.commit(
self.repo,
porcelain.reset_file(self.repo, file, target=sha)
with open(full_path) as f:
- self.assertEqual('hello', f.read())
+ self.assertEqual("hello", f.read())
def test_resetfile_with_dir(self):
- os.mkdir(os.path.join(self.repo.path, 'new_dir'))
- full_path = os.path.join(self.repo.path, 'new_dir', 'foo')
+ os.mkdir(os.path.join(self.repo.path, "new_dir"))
+ full_path = os.path.join(self.repo.path, "new_dir", "foo")
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self.repo, paths=[full_path])
sha = porcelain.commit(
self.repo,
committer=b"Jane <jane@example.com>",
author=b"John <john@example.com>",
)
- with open(full_path, 'a') as f:
- f.write('something new')
+ with open(full_path, "a") as f:
+ f.write("something new")
porcelain.commit(
self.repo,
message=b"unitest 2",
committer=b"Jane <jane@example.com>",
author=b"John <john@example.com>",
)
- porcelain.reset_file(self.repo, os.path.join('new_dir', 'foo'), target=sha)
+ porcelain.reset_file(self.repo, os.path.join("new_dir", "foo"), target=sha)
with open(full_path) as f:
- self.assertEqual('hello', f.read())
+ self.assertEqual("hello", f.read())
def _commit_file_with_content(repo, filename, content):
file_path = os.path.join(repo.path, filename)
- with open(file_path, 'w') as f:
+ with open(file_path, "w") as f:
f.write(content)
porcelain.add(repo, paths=[file_path])
sha = porcelain.commit(
class CheckoutTests(PorcelainTestCase):
-
def setUp(self):
super().setUp()
- self._sha, self._foo_path = _commit_file_with_content(self.repo, 'foo', 'hello\n')
- porcelain.branch_create(self.repo, 'uni')
+ self._sha, self._foo_path = _commit_file_with_content(
+ self.repo, "foo", "hello\n"
+ )
+ porcelain.branch_create(self.repo, "uni")
def test_checkout_to_existing_branch(self):
self.assertEqual(b"master", porcelain.active_branch(self.repo))
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
self.assertEqual(b"uni", porcelain.active_branch(self.repo))
def test_checkout_to_non_existing_branch(self):
self.assertEqual(b"master", porcelain.active_branch(self.repo))
with self.assertRaises(KeyError):
- porcelain.checkout_branch(self.repo, b'bob')
+ porcelain.checkout_branch(self.repo, b"bob")
self.assertEqual(b"master", porcelain.active_branch(self.repo))
def test_checkout_to_branch_with_modified_files(self):
- with open(self._foo_path, 'a') as f:
- f.write('new message\n')
+ with open(self._foo_path, "a") as f:
+ f.write("new message\n")
porcelain.add(self.repo, paths=[self._foo_path])
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': [b"foo"]}, [], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": [b"foo"]}, [], []], status
+ )
# Both branches have file 'foo' checkout should be fine.
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
self.assertEqual(b"uni", porcelain.active_branch(self.repo))
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': [b"foo"]}, [], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": [b"foo"]}, [], []], status
+ )
def test_checkout_with_deleted_files(self):
- porcelain.remove(self.repo.path, [os.path.join(self.repo.path, 'foo')])
+ porcelain.remove(self.repo.path, [os.path.join(self.repo.path, "foo")])
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [b'foo'], 'modify': []}, [], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [b"foo"], "modify": []}, [], []], status
+ )
# Both branches have file 'foo' checkout should be fine.
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
self.assertEqual(b"uni", porcelain.active_branch(self.repo))
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [b"foo"], 'modify': []}, [], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [b"foo"], "modify": []}, [], []], status
+ )
def test_checkout_to_branch_with_added_files(self):
- file_path = os.path.join(self.repo.path, 'bar')
+ file_path = os.path.join(self.repo.path, "bar")
- with open(file_path, 'w') as f:
- f.write('bar content\n')
+ with open(file_path, "w") as f:
+ f.write("bar content\n")
porcelain.add(self.repo, paths=[file_path])
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [b'bar'], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual(
+ [{"add": [b"bar"], "delete": [], "modify": []}, [], []], status
+ )
# Both branches have file 'foo' checkout should be fine.
- porcelain.checkout_branch(self.repo, b'uni')
- self.assertEqual(b'uni', porcelain.active_branch(self.repo))
+ porcelain.checkout_branch(self.repo, b"uni")
+ self.assertEqual(b"uni", porcelain.active_branch(self.repo))
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [b'bar'], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual(
+ [{"add": [b"bar"], "delete": [], "modify": []}, [], []], status
+ )
def test_checkout_to_branch_with_modified_file_not_present(self):
# Commit a new file that the other branch doesn't have.
- _, nee_path = _commit_file_with_content(self.repo, 'nee', 'Good content\n')
+ _, nee_path = _commit_file_with_content(self.repo, "nee", "Good content\n")
# Modify the file the other branch doesn't have.
- with open(nee_path, 'a') as f:
- f.write('bar content\n')
+ with open(nee_path, "a") as f:
+ f.write("bar content\n")
porcelain.add(self.repo, paths=[nee_path])
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': [b'nee']}, [], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": [b"nee"]}, [], []], status
+ )
# 'uni' branch doesn't have 'nee' and it has been modified, should result in the checkout being aborted.
with self.assertRaises(CheckoutError):
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
- self.assertEqual(b'master', porcelain.active_branch(self.repo))
+ self.assertEqual(b"master", porcelain.active_branch(self.repo))
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': [b'nee']}, [], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": [b"nee"]}, [], []], status
+ )
def test_checkout_to_branch_with_modified_file_not_present_forced(self):
# Commit a new file that the other branch doesn't have.
- _, nee_path = _commit_file_with_content(self.repo, 'nee', 'Good content\n')
+ _, nee_path = _commit_file_with_content(self.repo, "nee", "Good content\n")
# Modify the file the other branch doesn't have.
- with open(nee_path, 'a') as f:
- f.write('bar content\n')
+ with open(nee_path, "a") as f:
+ f.write("bar content\n")
porcelain.add(self.repo, paths=[nee_path])
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': [b'nee']}, [], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": [b"nee"]}, [], []], status
+ )
# 'uni' branch doesn't have 'nee' and it has been modified, but we force to reset the entire index.
- porcelain.checkout_branch(self.repo, b'uni', force=True)
+ porcelain.checkout_branch(self.repo, b"uni", force=True)
- self.assertEqual(b'uni', porcelain.active_branch(self.repo))
+ self.assertEqual(b"uni", porcelain.active_branch(self.repo))
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
def test_checkout_to_branch_with_unstaged_files(self):
# Edit `foo`.
- with open(self._foo_path, 'a') as f:
- f.write('new message')
+ with open(self._foo_path, "a") as f:
+ f.write("new message")
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
-
- porcelain.checkout_branch(self.repo, b'uni')
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
+ )
+ porcelain.checkout_branch(self.repo, b"uni")
+
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
+ )
def test_checkout_to_branch_with_untracked_files(self):
- with open(os.path.join(self.repo.path, 'neu'), 'a') as f:
- f.write('new message\n')
+ with open(os.path.join(self.repo.path, "neu"), "a") as f:
+ f.write("new message\n")
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['neu']], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["neu"]], status)
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['neu']], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["neu"]], status)
def test_checkout_to_branch_with_new_files(self):
- porcelain.checkout_branch(self.repo, b'uni')
- sub_directory = os.path.join(self.repo.path, 'sub1')
+ porcelain.checkout_branch(self.repo, b"uni")
+ sub_directory = os.path.join(self.repo.path, "sub1")
os.mkdir(sub_directory)
for index in range(5):
- _commit_file_with_content(self.repo, 'new_file_' + str(index + 1), "Some content\n")
- _commit_file_with_content(self.repo, os.path.join('sub1', 'new_file_' + str(index + 10)), "Good content\n")
+ _commit_file_with_content(
+ self.repo, "new_file_" + str(index + 1), "Some content\n"
+ )
+ _commit_file_with_content(
+ self.repo,
+ os.path.join("sub1", "new_file_" + str(index + 10)),
+ "Good content\n",
+ )
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
- porcelain.checkout_branch(self.repo, b'master')
+ porcelain.checkout_branch(self.repo, b"master")
self.assertEqual(b"master", porcelain.active_branch(self.repo))
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
self.assertEqual(b"uni", porcelain.active_branch(self.repo))
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
def test_checkout_to_branch_with_file_in_sub_directory(self):
- sub_directory = os.path.join(self.repo.path, 'sub1', 'sub2')
+ sub_directory = os.path.join(self.repo.path, "sub1", "sub2")
os.makedirs(sub_directory)
- sub_directory_file = os.path.join(sub_directory, 'neu')
- with open(sub_directory_file, 'w') as f:
- f.write('new message\n')
+ sub_directory_file = os.path.join(sub_directory, "neu")
+ with open(sub_directory_file, "w") as f:
+ f.write("new message\n")
porcelain.add(self.repo, paths=[sub_directory_file])
porcelain.commit(
author=b"John <john@example.com>",
)
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
self.assertTrue(os.path.isdir(sub_directory))
self.assertTrue(os.path.isdir(os.path.dirname(sub_directory)))
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
self.assertFalse(os.path.isdir(sub_directory))
self.assertFalse(os.path.isdir(os.path.dirname(sub_directory)))
- porcelain.checkout_branch(self.repo, b'master')
+ porcelain.checkout_branch(self.repo, b"master")
self.assertTrue(os.path.isdir(sub_directory))
self.assertTrue(os.path.isdir(os.path.dirname(sub_directory)))
def test_checkout_to_branch_with_multiple_files_in_sub_directory(self):
- sub_directory = os.path.join(self.repo.path, 'sub1', 'sub2')
+ sub_directory = os.path.join(self.repo.path, "sub1", "sub2")
os.makedirs(sub_directory)
- sub_directory_file_1 = os.path.join(sub_directory, 'neu')
- with open(sub_directory_file_1, 'w') as f:
- f.write('new message\n')
+ sub_directory_file_1 = os.path.join(sub_directory, "neu")
+ with open(sub_directory_file_1, "w") as f:
+ f.write("new message\n")
- sub_directory_file_2 = os.path.join(sub_directory, 'gus')
- with open(sub_directory_file_2, 'w') as f:
- f.write('alternative message\n')
+ sub_directory_file_2 = os.path.join(sub_directory, "gus")
+ with open(sub_directory_file_2, "w") as f:
+ f.write("alternative message\n")
porcelain.add(self.repo, paths=[sub_directory_file_1, sub_directory_file_2])
porcelain.commit(
author=b"John <john@example.com>",
)
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
self.assertTrue(os.path.isdir(sub_directory))
self.assertTrue(os.path.isdir(os.path.dirname(sub_directory)))
- porcelain.checkout_branch(self.repo, b'uni')
+ porcelain.checkout_branch(self.repo, b"uni")
status = list(porcelain.status(self.repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
self.assertFalse(os.path.isdir(sub_directory))
self.assertFalse(os.path.isdir(os.path.dirname(sub_directory)))
def _commit_something_wrong(self):
- with open(self._foo_path, 'a') as f:
- f.write('something wrong')
+ with open(self._foo_path, "a") as f:
+ f.write("something wrong")
porcelain.add(self.repo, paths=[self._foo_path])
return porcelain.commit(
def test_checkout_remote_branch_then_master_then_remote_branch_again(self):
target_repo = self._checkout_remote_branch()
self.assertEqual(b"foo", porcelain.active_branch(target_repo))
- _commit_file_with_content(target_repo, 'bar', 'something\n')
- self.assertTrue(os.path.isfile(os.path.join(target_repo.path, 'bar')))
+ _commit_file_with_content(target_repo, "bar", "something\n")
+ self.assertTrue(os.path.isfile(os.path.join(target_repo.path, "bar")))
porcelain.checkout_branch(target_repo, b"master")
self.assertEqual(b"master", porcelain.active_branch(target_repo))
- self.assertFalse(os.path.isfile(os.path.join(target_repo.path, 'bar')))
+ self.assertFalse(os.path.isfile(os.path.join(target_repo.path, "bar")))
porcelain.checkout_branch(target_repo, b"origin/foo")
self.assertEqual(b"foo", porcelain.active_branch(target_repo))
- self.assertTrue(os.path.isfile(os.path.join(target_repo.path, 'bar')))
+ self.assertTrue(os.path.isfile(os.path.join(target_repo.path, "bar")))
target_repo.close()
class SubmoduleTests(PorcelainTestCase):
-
def test_empty(self):
porcelain.commit(
repo=self.repo.path,
def test_add(self):
porcelain.submodule_add(self.repo, "../bar.git", "bar")
- with open('%s/.gitmodules' % self.repo.path) as f:
- self.assertEqual("""\
+ with open("%s/.gitmodules" % self.repo.path) as f:
+ self.assertEqual(
+ """\
[submodule "bar"]
\turl = ../bar.git
\tpath = bar
-""", f.read())
+""",
+ f.read(),
+ )
def test_init(self):
porcelain.submodule_add(self.repo, "../bar.git", "bar")
porcelain.add(repo=self.repo.path, paths=[file_path])
results = porcelain.status(self.repo)
- self.assertDictEqual({"add": [b"crlf-new"], "delete": [], "modify": []}, results.staged)
+ self.assertDictEqual(
+ {"add": [b"crlf-new"], "delete": [], "modify": []}, results.staged
+ )
self.assertListEqual(results.unstaged, [])
self.assertListEqual(results.untracked, [])
),
)
self.assertEqual(
- {os.path.join('nested', 'ignored'),
- os.path.join('nested', 'with'),
- os.path.join('nested', 'manager')},
+ {
+ os.path.join("nested", "ignored"),
+ os.path.join("nested", "with"),
+ os.path.join("nested", "manager"),
+ },
set(
porcelain.get_untracked_paths(
self.repo.path,
self.repo.path,
self.repo.open_index(),
)
- )
+ ),
)
self.assertEqual(
{".gitignore", "notignored"},
self.repo.open_index(),
exclude_ignored=True,
)
- )
+ ),
)
def test_get_untracked_paths_invalid_untracked_files(self):
def test_get_untracked_paths_normal(self):
with self.assertRaises(NotImplementedError):
- _, _, _ = porcelain.status(
- repo=self.repo.path, untracked_files="normal"
- )
+ _, _, _ = porcelain.status(repo=self.repo.path, untracked_files="normal")
+
# TODO(jelmer): Add test for dulwich.porcelain.daemon
[c1] = build_commit_graph(self.repo.object_store, [[1]])
self.repo[b"HEAD"] = c1.id
porcelain.branch_create(self.repo, b"foo")
- self.assertEqual(
- {b"master", b"foo"}, set(porcelain.branch_list(self.repo))
- )
+ self.assertEqual({b"master", b"foo"}, set(porcelain.branch_list(self.repo)))
class BranchCreateTests(PorcelainTestCase):
[c1] = build_commit_graph(self.repo.object_store, [[1]])
self.repo[b"HEAD"] = c1.id
porcelain.branch_create(self.repo, b"foo")
- self.assertEqual(
- {b"master", b"foo"}, set(porcelain.branch_list(self.repo))
- )
+ self.assertEqual({b"master", b"foo"}, set(porcelain.branch_list(self.repo)))
class BranchDeleteTests(PorcelainTestCase):
class FindUniqueAbbrevTests(PorcelainTestCase):
-
def test_simple(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]]
)
self.repo.refs[b"HEAD"] = c3.id
self.assertEqual(
- c1.id.decode('ascii')[:7],
- porcelain.find_unique_abbrev(self.repo.object_store, c1.id))
+ c1.id.decode("ascii")[:7],
+ porcelain.find_unique_abbrev(self.repo.object_store, c1.id),
+ )
class PackRefsTests(PorcelainTestCase):
class ServerTests(PorcelainTestCase):
@contextlib.contextmanager
def _serving(self):
- with make_server('localhost', 0, self.app) as server:
+ with make_server("localhost", 0, self.app) as server:
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
self.app = make_wsgi_chain(backend)
def test_pull(self):
- c1, = build_commit_graph(self.served_repo.object_store, [[1]])
+ (c1,) = build_commit_graph(self.served_repo.object_store, [[1]])
self.served_repo.refs[b"refs/heads/master"] = c1.id
with self._serving() as url:
porcelain.pull(self.repo, url, "master")
def test_push(self):
- c1, = build_commit_graph(self.repo.object_store, [[1]])
+ (c1,) = build_commit_graph(self.repo.object_store, [[1]])
self.repo.refs[b"refs/heads/master"] = c1.id
with self._serving() as url:
blob - 44f5616b78a7288d83f96ba9a3b7d333491a0c92
blob + 68478c575214674154a778e092de853cb2e7b937
--- dulwich/tests/test_refs.py
+++ dulwich/tests/test_refs.py
def test_delete_refs_container(self):
# We shouldn't delete the refs directory
- self._refs[b'refs/heads/blah'] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
+ self._refs[b"refs/heads/blah"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
for ref in self._refs.allkeys():
del self._refs[ref]
- self.assertTrue(os.path.exists(os.path.join(self._refs.path, b'refs')))
+ self.assertTrue(os.path.exists(os.path.join(self._refs.path, b"refs")))
def test_setitem_packed(self):
with open(os.path.join(self._refs.path, b"packed-refs"), "w") as f:
def test_set_overwrite_loop(self):
self.assertRaises(SymrefLoop, self._refs.follow, b"refs/heads/loop")
- self._refs[b'refs/heads/loop'] = (
- b"42d06bd4b77fed026b154d16493e5deab78f02ec")
+ self._refs[b"refs/heads/loop"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
self.assertEqual(
- ([b'refs/heads/loop'], b'42d06bd4b77fed026b154d16493e5deab78f02ec'),
- self._refs.follow(b"refs/heads/loop"))
+ ([b"refs/heads/loop"], b"42d06bd4b77fed026b154d16493e5deab78f02ec"),
+ self._refs.follow(b"refs/heads/loop"),
+ )
def test_delitem(self):
RefsContainerTests.test_delitem(self)
class StripPeeledRefsTests(TestCase):
-
all_refs = {
b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
blob - c19fcc1da91f92c7d962e83576465863b11716d9
blob + ba4211bc950dbf00a8d425f9151a9b60b7552dd1
--- dulwich/tests/test_repository.py
+++ dulwich/tests/test_repository.py
self.assertIn(barestr, config_text, "%r" % config_text)
if isinstance(repo, Repo):
- expected_mode = '0o100644' if expect_filemode else '0o100666'
+ expected_mode = "0o100644" if expect_filemode else "0o100666"
expected = {
- 'HEAD': expected_mode,
- 'config': expected_mode,
- 'description': expected_mode,
+ "HEAD": expected_mode,
+ "config": expected_mode,
+ "description": expected_mode,
}
actual = {
- f[len(repo._controldir) + 1:]: oct(os.stat(f).st_mode)
- for f in glob.glob(os.path.join(repo._controldir, '*'))
+ f[len(repo._controldir) + 1 :]: oct(os.stat(f).st_mode)
+ for f in glob.glob(os.path.join(repo._controldir, "*"))
if os.path.isfile(f)
}
def test_clone_no_head(self):
temp_dir = self.mkdtemp()
self.addCleanup(shutil.rmtree, temp_dir)
- repo_dir = os.path.join(os.path.dirname(__file__), "..", "..", "testdata", "repos")
+ repo_dir = os.path.join(
+ os.path.dirname(__file__), "..", "..", "testdata", "repos"
+ )
dest_dir = os.path.join(temp_dir, "a.git")
shutil.copytree(os.path.join(repo_dir, "a.git"), dest_dir, symlinks=True)
r = Repo(dest_dir)
r.clone(tmp_dir, mkdir=False, bare=True)
def test_reset_index_symlink_enabled(self):
- if sys.platform == 'win32':
+ if sys.platform == "win32":
self.skipTest("symlinks are not supported on Windows")
tmp_dir = self.mkdtemp()
self.addCleanup(shutil.rmtree, tmp_dir)
t = o.clone(os.path.join(tmp_dir, "t"), symlinks=True)
o.close()
- bar_path = os.path.join(tmp_dir, 't', 'bar')
- if sys.platform == 'win32':
+ bar_path = os.path.join(tmp_dir, "t", "bar")
+ if sys.platform == "win32":
with open(bar_path) as f:
- self.assertEqual('foo', f.read())
+ self.assertEqual("foo", f.read())
else:
- self.assertEqual('foo', os.readlink(bar_path))
+ self.assertEqual("foo", os.readlink(bar_path))
t.close()
def test_reset_index_symlink_disabled(self):
o.do_commit(b"add symlink")
t = o.clone(os.path.join(tmp_dir, "t"), symlinks=False)
- with open(os.path.join(tmp_dir, "t", 'bar')) as f:
- self.assertEqual('foo', f.read())
+ with open(os.path.join(tmp_dir, "t", "bar")) as f:
+ self.assertEqual("foo", f.read())
t.close()
r.stage(['foo'])
""".format(
executable=sys.executable,
- path=[os.path.join(os.path.dirname(__file__), '..', '..')] + sys.path)
+ path=[os.path.join(os.path.dirname(__file__), "..", "..")] + sys.path,
+ )
repo_dir = os.path.join(self.mkdtemp())
self.addCleanup(shutil.rmtree, repo_dir)
r = Repo.init(repo_dir)
self.addCleanup(r.close)
- with open(os.path.join(repo_dir, 'blah'), 'w') as f:
- f.write('blah')
+ with open(os.path.join(repo_dir, "blah"), "w") as f:
+ f.write("blah")
- r.stage(['blah'])
+ r.stage(["blah"])
pre_commit = os.path.join(r.controldir(), "hooks", "pre-commit")
self.assertEqual([], r[commit_sha].parents)
tree = r[r[commit_sha].tree]
- self.assertEqual({b'blah', b'foo'}, set(tree))
+ self.assertEqual({b"blah", b"foo"}, set(tree))
def test_shell_hook_post_commit(self):
if os.name != "posix":
{b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"},
self._repo.get_shallow(),
)
- self._repo.update_shallow(
- None, [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"]
- )
+ self._repo.update_shallow(None, [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"])
self.assertEqual(set(), self._repo.get_shallow())
self.assertEqual(
False,
r = self._repo
c = r.get_config()
c.set(("core",), "repositoryformatversion", "1")
- c.set(("extensions", ), "worktreeconfig", True)
+ c.set(("extensions",), "worktreeconfig", True)
c.write_to_path()
c = r.get_worktree_config()
c.set(("user",), "repositoryformatversion", "1")
c.set((b"user",), b"name", b"Jelmer")
c.write_to_path()
cs = r.get_config_stack()
- self.assertEqual(cs.get(("user", ), "name"), b"Jelmer")
+ self.assertEqual(cs.get(("user",), "name"), b"Jelmer")
def test_repositoryformatversion_1_extension(self):
r = self._repo
c = r.get_config()
c.set(("core",), "repositoryformatversion", "1")
- c.set(("extensions", ), "unknownextension", True)
+ c.set(("extensions",), "unknownextension", True)
c.write_to_path()
self.assertRaises(UnsupportedExtension, Repo, self._repo_dir)
def test_stage_submodule(self):
r = self._repo
s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
- s.do_commit(b'message')
+ s.do_commit(b"message")
r.stage(["sub"])
self.assertEqual([b"a", b"sub"], list(r.open_index()))
def test_unstage_midify_file_with_dir(self):
- os.mkdir(os.path.join(self._repo.path, 'new_dir'))
- full_path = os.path.join(self._repo.path, 'new_dir', 'foo')
+ os.mkdir(os.path.join(self._repo.path, "new_dir"))
+ full_path = os.path.join(self._repo.path, "new_dir", "foo")
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self._repo, paths=[full_path])
porcelain.commit(
self._repo,
committer=b"Jane <jane@example.com>",
author=b"John <john@example.com>",
)
- with open(full_path, 'a') as f:
- f.write('something new')
- self._repo.unstage(['new_dir/foo'])
+ with open(full_path, "a") as f:
+ f.write("something new")
+ self._repo.unstage(["new_dir/foo"])
status = list(porcelain.status(self._repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'new_dir/foo'], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": []}, [b"new_dir/foo"], []], status
+ )
def test_unstage_while_no_commit(self):
- file = 'foo'
+ file = "foo"
full_path = os.path.join(self._repo.path, file)
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self._repo, paths=[full_path])
self._repo.unstage([file])
status = list(porcelain.status(self._repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['foo']], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
def test_unstage_add_file(self):
- file = 'foo'
+ file = "foo"
full_path = os.path.join(self._repo.path, file)
porcelain.commit(
self._repo,
committer=b"Jane <jane@example.com>",
author=b"John <john@example.com>",
)
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self._repo, paths=[full_path])
self._repo.unstage([file])
status = list(porcelain.status(self._repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['foo']], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
def test_unstage_modify_file(self):
- file = 'foo'
+ file = "foo"
full_path = os.path.join(self._repo.path, file)
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self._repo, paths=[full_path])
porcelain.commit(
self._repo,
committer=b"Jane <jane@example.com>",
author=b"John <john@example.com>",
)
- with open(full_path, 'a') as f:
- f.write('broken')
+ with open(full_path, "a") as f:
+ f.write("broken")
porcelain.add(self._repo, paths=[full_path])
self._repo.unstage([file])
status = list(porcelain.status(self._repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
+ )
def test_unstage_remove_file(self):
- file = 'foo'
+ file = "foo"
full_path = os.path.join(self._repo.path, file)
- with open(full_path, 'w') as f:
- f.write('hello')
+ with open(full_path, "w") as f:
+ f.write("hello")
porcelain.add(self._repo, paths=[full_path])
porcelain.commit(
self._repo,
os.remove(full_path)
self._repo.unstage([file])
status = list(porcelain.status(self._repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
+ self.assertEqual(
+ [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
+ )
def test_reset_index(self):
r = self._repo
- with open(os.path.join(r.path, 'a'), 'wb') as f:
- f.write(b'changed')
- with open(os.path.join(r.path, 'b'), 'wb') as f:
- f.write(b'added')
- r.stage(['a', 'b'])
+ with open(os.path.join(r.path, "a"), "wb") as f:
+ f.write(b"changed")
+ with open(os.path.join(r.path, "b"), "wb") as f:
+ f.write(b"added")
+ r.stage(["a", "b"])
status = list(porcelain.status(self._repo))
- self.assertEqual([{'add': [b'b'], 'delete': [], 'modify': [b'a']}, [], []], status)
+ self.assertEqual(
+ [{"add": [b"b"], "delete": [], "modify": [b"a"]}, [], []], status
+ )
r.reset_index()
status = list(porcelain.status(self._repo))
- self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['b']], status)
+ self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["b"]], status)
@skipIf(
sys.platform in ("win32", "darwin"),
InvalidUserIdentity, check_user_identity, b"Fullname >order<>"
)
self.assertRaises(
- InvalidUserIdentity, check_user_identity, b'Contains\0null byte <>'
+ InvalidUserIdentity, check_user_identity, b"Contains\0null byte <>"
)
self.assertRaises(
- InvalidUserIdentity, check_user_identity, b'Contains\nnewline byte <>'
+ InvalidUserIdentity, check_user_identity, b"Contains\nnewline byte <>"
)
blob - d1d51bcbcacbe2ec1d6a085fe69fbd211de50901
blob + 939c5cd4186d0202b419563faf69ec289ca48a0e
--- dulwich/tests/test_server.py
+++ dulwich/tests/test_server.py
def test_linear(self):
c1, c2, c3 = self.make_linear_commits(3)
- self.assertEqual(
- ({c3.id}, set()), _find_shallow(self._store, [c3.id], 1)
- )
+ self.assertEqual(({c3.id}, set()), _find_shallow(self._store, [c3.id], 1))
self.assertEqual(
({c2.id}, {c3.id}),
_find_shallow(self._store, [c3.id], 2),
class SingleAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
-
impl_cls = SingleAckGraphWalkerImpl
def test_single_ack(self):
class MultiAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
-
impl_cls = MultiAckGraphWalkerImpl
def test_multi_ack(self):
class MultiAckDetailedGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
-
impl_cls = MultiAckDetailedGraphWalkerImpl
def test_multi_ack(self):
blob - b7466e8014b67e0b0cd7ebcb212280e1649cb64a
blob + 5791100d746a75bdecc12ab8ee485010f41a8d35
--- dulwich/tests/test_web.py
+++ dulwich/tests/test_web.py
class HTTPGitRequestTestCase(WebTestCase):
-
# This class tests the contents of the actual cache headers
_req_class = HTTPGitRequest
blob - 33c80969ef287c3a179f1519de409ed973d911ea
blob + ca9ccb5f5583008276f734539a9c8a6ae2a87368
--- dulwich/tests/utils.py
+++ dulwich/tests/utils.py
"""
if temp_dir is None:
temp_dir = tempfile.mkdtemp()
- repo_dir = os.path.join(os.path.dirname(__file__), "..", "..", "testdata", "repos", name)
+ repo_dir = os.path.join(
+ os.path.dirname(__file__), "..", "..", "testdata", "repos", name
+ )
temp_repo_dir = os.path.join(temp_dir, name)
shutil.copytree(repo_dir, temp_repo_dir, symlinks=True)
return Repo(temp_repo_dir)
blob - 795eb4f07bdce6958d2c9ae2cc2dc4b726ba5b8f
blob + c0db19e401531d2cfbda7f60c89e66456dea1e2b
--- dulwich/walk.py
+++ dulwich/walk.py
store,
include: List[bytes],
exclude: Optional[List[bytes]] = None,
- order: str = 'date',
+ order: str = "date",
reverse: bool = False,
max_entries: Optional[int] = None,
paths: Optional[List[bytes]] = None,
blob - a9c0e77d5d67bde52902b3ed7e5459d37771e638
blob + 493c6499ddacbf0246e40b4247b29f4e9bef6672
--- dulwich/web.py
+++ dulwich/web.py
class ChunkReader:
-
def __init__(self, f) -> None:
self._iter = _chunk_iter(f)
self._buffer: List[bytes] = []
self._buffer.append(next(self._iter))
except StopIteration:
break
- f = b''.join(self._buffer)
+ f = b"".join(self._buffer)
ret = f[:n]
self._buffer = [f[n:]]
return ret
return
req.nocache()
write = req.respond(HTTP_OK, "application/x-%s-result" % service)
- if req.environ.get('HTTP_TRANSFER_ENCODING') == 'chunked':
+ if req.environ.get("HTTP_TRANSFER_ENCODING") == "chunked":
read = ChunkReader(req.environ["wsgi.input"]).read
else:
read = req.environ["wsgi.input"].read
environ: the WSGI environment for the request.
"""
- def __init__(self, environ, start_response, dumb: bool = False, handlers=None) -> None:
+ def __init__(
+ self, environ, start_response, dumb: bool = False, handlers=None
+ ) -> None:
self.environ = environ
self.dumb = dumb
self.handlers = handlers
("POST", re.compile("/git-receive-pack$")): handle_service_request,
}
- def __init__(self, backend, dumb: bool = False, handlers=None, fallback_app=None) -> None:
+ def __init__(
+ self, backend, dumb: bool = False, handlers=None, fallback_app=None
+ ) -> None:
self.backend = backend
self.dumb = dumb
self.handlers = dict(DEFAULT_HANDLERS)
def __call__(self, environ, start_response):
import gzip
+
if environ.get("HTTP_CONTENT_ENCODING", "") == "gzip":
environ["wsgi.input"] = gzip.GzipFile(
filename=None, fileobj=environ["wsgi.input"], mode="rb"
blob - a074e1b958b0ce7a920c531cca45ac3dd6c441f0
blob + 681bb39fb57fdb006f62067816f704563d5f3104
--- examples/diff.py
+++ examples/diff.py
commit = r[commit_id]
parent_commit = r[commit.parents[0]]
-outstream = getattr(sys.stdout, 'buffer', sys.stdout)
+outstream = getattr(sys.stdout, "buffer", sys.stdout)
write_tree_diff(outstream, r.object_store, parent_commit.tree, commit.tree)
blob - 26c3d2104cfa1ec34357b8c6d62e0299a81215ec
blob + beb3a9a73de0f8200dce55f41f7653fa67388cf3
--- examples/gcs.py
+++ examples/gcs.py
from dulwich.repo import Repo
client = storage.Client()
-bucket = client.get_bucket('mybucket')
+bucket = client.get_bucket("mybucket")
-gcs_object_store = GcsObjectStore(bucket, 'path')
+gcs_object_store = GcsObjectStore(bucket, "path")
r = Repo.init_bare(tempfile.mkdtemp(), object_store=gcs_object_store)
blob - eb024cf433c206cc8657bf4f0962488cd75fe112
blob + d7f0d5cc633f3e1d3a5d7e1513cb8f6219b80cc6
--- examples/latest_change.py
+++ examples/latest_change.py
r = Repo(".")
-path = sys.argv[1].encode('utf-8')
+path = sys.argv[1].encode("utf-8")
w = r.get_walker(paths=[path], max_entries=1)
try:
except StopIteration:
print("No file %s anywhere in history." % sys.argv[1])
else:
- print("{} was last changed by {} at {} (commit {})".format(
- sys.argv[1], c.author, time.ctime(c.author_time), c.id))
+ print(
+ "{} was last changed by {} at {} (commit {})".format(
+ sys.argv[1], c.author, time.ctime(c.author_time), c.id
+ )
+ )
blob - 3a04dcab40392b838f5a2bb3c5bb8e26a8d45af1
blob + ca44bb5b3f8ec4b251d55244b676f6ba6206d84c
--- examples/memoryrepo.py
+++ examples/memoryrepo.py
from dulwich.repo import MemoryRepo
local_repo = MemoryRepo()
-local_repo.refs.set_symbolic_ref(b'HEAD', b'refs/heads/master')
+local_repo.refs.set_symbolic_ref(b"HEAD", b"refs/heads/master")
fetch_result = porcelain.fetch(local_repo, sys.argv[1])
-local_repo.refs[b'refs/heads/master'] = fetch_result.refs[b'refs/heads/master']
+local_repo.refs[b"refs/heads/master"] = fetch_result.refs[b"refs/heads/master"]
print(local_repo.refs.as_dict())
-last_tree = local_repo[local_repo[b'HEAD'].tree]
-new_blob = Blob.from_string(b'Some contents')
+last_tree = local_repo[local_repo[b"HEAD"].tree]
+new_blob = Blob.from_string(b"Some contents")
local_repo.object_store.add_object(new_blob)
-last_tree.add(b'test', stat.S_IFREG, new_blob.id)
+last_tree.add(b"test", stat.S_IFREG, new_blob.id)
local_repo.object_store.add_object(last_tree)
local_repo.do_commit(
- message=b'Add a file called \'test\'',
- ref=b'refs/heads/master',
- tree=last_tree.id)
+ message=b"Add a file called 'test'", ref=b"refs/heads/master", tree=last_tree.id
+)
-porcelain.push(local_repo, sys.argv[1], 'master')
+porcelain.push(local_repo, sys.argv[1], "master")
blob - 0a9f32fe9e2db11c7703fef9899283557b3f2b3a
blob + 2e3000b903aa178a80163f74878729e5e6735d8f
--- examples/rename-branch.py
+++ examples/rename-branch.py
from dulwich.pack import pack_objects_to_data
parser = argparse.ArgumentParser()
-parser.add_argument('url', type=str)
-parser.add_argument('old_ref', type=str)
-parser.add_argument('new_ref', type=str)
+parser.add_argument("url", type=str)
+parser.add_argument("old_ref", type=str)
+parser.add_argument("new_ref", type=str)
args = parser.parse_args()
client, path = get_transport_and_path_from_url(args.url)
def update_refs(refs):
- sha = refs[args.old_ref.encode('utf-8')]
- return {
- args.old_ref.encode('utf-8'): ZERO_SHA,
- args.new_ref.encode('utf-8'): sha}
+ sha = refs[args.old_ref.encode("utf-8")]
+ return {args.old_ref.encode("utf-8"): ZERO_SHA, args.new_ref.encode("utf-8"): sha}
client.send_pack(path, update_refs, generate_pack_data)
blob - 00e492bdb2dbed49a37a46ffaf37ed7e5d0465e4
blob + 2b97143d029a31d3c97991e1626ba1091b5aa863
--- setup.py
+++ setup.py
from setuptools import Extension, setup
-if sys.platform == 'darwin' and os.path.exists('/usr/bin/xcodebuild'):
+if sys.platform == "darwin" and os.path.exists("/usr/bin/xcodebuild"):
# XCode 4.0 dropped support for ppc architecture, which is hardcoded in
# distutils.sysconfig
import subprocess
+
p = subprocess.Popen(
- ['/usr/bin/xcodebuild', '-version'], stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, env={})
+ ["/usr/bin/xcodebuild", "-version"],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env={},
+ )
out, err = p.communicate()
for line in out.splitlines():
line = line.decode("utf8")
# Also parse only first digit, because 3.2.1 can't be parsed nicely
- if (line.startswith('Xcode')
- and int(line.split()[1].split('.')[0]) >= 4):
- os.environ['ARCHFLAGS'] = ''
+ if line.startswith("Xcode") and int(line.split()[1].split(".")[0]) >= 4:
+ os.environ["ARCHFLAGS"] = ""
-tests_require = ['fastimport']
+tests_require = ["fastimport"]
-if '__pypy__' not in sys.modules and sys.platform != 'win32':
- tests_require.extend([
- 'gevent', 'geventhttpclient', 'setuptools>=17.1'])
+if "__pypy__" not in sys.modules and sys.platform != "win32":
+ tests_require.extend(["gevent", "geventhttpclient", "setuptools>=17.1"])
-optional = os.environ.get('CIBUILDWHEEL', '0') != '1'
+optional = os.environ.get("CIBUILDWHEEL", "0") != "1"
ext_modules = [
- Extension('dulwich._objects', ['dulwich/_objects.c'],
- optional=optional),
- Extension('dulwich._pack', ['dulwich/_pack.c'],
- optional=optional),
- Extension('dulwich._diff_tree', ['dulwich/_diff_tree.c'],
- optional=optional),
+ Extension("dulwich._objects", ["dulwich/_objects.c"], optional=optional),
+ Extension("dulwich._pack", ["dulwich/_pack.c"], optional=optional),
+ Extension("dulwich._diff_tree", ["dulwich/_diff_tree.c"], optional=optional),
]
# Ideally, setuptools would just provide a way to do this
-if '--pure' in sys.argv:
- sys.argv.remove('--pure')
+if "--pure" in sys.argv:
+ sys.argv.remove("--pure")
ext_modules = []
-setup(package_data={'': ['../docs/tutorial/*.txt', 'py.typed']},
- ext_modules=ext_modules,
- tests_require=tests_require)
+setup(
+ package_data={"": ["../docs/tutorial/*.txt", "py.typed"]},
+ ext_modules=ext_modules,
+ tests_require=tests_require,
+)