commit - ea8fcb760c4e8f2607d9a69832bab0d713bd9da3
commit + f6faf9e1875080da47f5622b6c07da0177adf014
blob - 49cd8d18d723d61d507793ee9f47c0a85ec3b0f3
blob + 4bf310db173ef1c0b2e50aad8ff786d771595bd3
--- docs/conf.py
+++ docs/conf.py
-# -*- coding: utf-8 -*-
#
# dulwich documentation build configuration file, created by
# sphinx-quickstart on Thu Feb 18 23:18:28 2010.
master_doc = 'index'
# General information about the project.
-project = u'dulwich'
-copyright = u'2011-2019 Jelmer Vernooij'
+project = 'dulwich'
+copyright = '2011-2023 Jelmer Vernooij'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# (source start file, target name, title, author, documentclass
# [howto/manual]).
latex_documents = [
- ('index', 'dulwich.tex', u'dulwich Documentation',
+ ('index', 'dulwich.tex', 'dulwich Documentation',
'Jelmer Vernooij', 'manual'),
]
blob - decae7641aff6974e16a3087985bb9a1e283deea
blob + 375e394ca9ad09b2a6f624f61034d780d79fcaab
--- dulwich/archive.py
+++ dulwich/archive.py
from contextlib import closing
-class ChunkedBytesIO(object):
+class ChunkedBytesIO:
"""Turn a list of bytestrings into a file-like object.
This is similar to creating a `BytesIO` from a concatenation of the
for entry in tree.iteritems():
entry_abspath = posixpath.join(root, entry.path)
if stat.S_ISDIR(entry.mode):
- for _ in _walk_tree(store, store[entry.sha], entry_abspath):
- yield _
+ yield from _walk_tree(store, store[entry.sha], entry_abspath)
else:
yield (entry_abspath, entry)
blob - 3ed4e3db95680c313070f72180c9603a33f39682
blob + 7e9e94897cc6f45f0764ffe94fcb74c9b57b57d9
--- dulwich/bundle.py
+++ dulwich/bundle.py
from .pack import PackData, write_pack_data
-class Bundle(object):
+class Bundle:
version: Optional[int] = None
blob - 8d55278b92546fdb117ecd66631fc7c618abf50c
blob + 1f2e12fdc323b7f4c0176729bef23e3def6ca4ed
--- dulwich/cli.py
+++ dulwich/cli.py
pdb.set_trace()
-class Command(object):
+class Command:
"""A Dulwich subcommand."""
def run(self, args):
opts, args = getopt(args, "", [])
opts = dict(opts)
for (obj, msg) in porcelain.fsck("."):
- print("%s: %s" % (obj, msg))
+ print("{}: {}".format(obj, msg))
class cmd_log(Command):
try:
print("\t%s" % x[name])
except KeyError as k:
- print("\t%s: Unable to resolve base %s" % (name, k))
+ print("\t{}: Unable to resolve base {}".format(name, k))
except ApplyDeltaError as e:
- print("\t%s: Unable to apply delta: %r" % (name, e))
+ print("\t{}: Unable to apply delta: {!r}".format(name, e))
class cmd_dump_index(Command):
for kind, names in status.staged.items():
for name in names:
sys.stdout.write(
- "\t%s: %s\n" % (kind, name.decode(sys.getfilesystemencoding()))
+ "\t{}: {}\n".format(kind, name.decode(sys.getfilesystemencoding()))
)
sys.stdout.write("\n")
if status.unstaged:
sys.exit(1)
refs = porcelain.ls_remote(args[0])
for ref in sorted(refs):
- sys.stdout.write("%s\t%s\n" % (ref, refs[ref]))
+ sys.stdout.write("{}\t{}\n".format(ref, refs[ref]))
class cmd_ls_tree(Command):
parser = argparse.ArgumentParser()
parser.parse_args(argv)
for path, sha in porcelain.submodule_list("."):
- sys.stdout.write(' %s %s\n' % (sha, path))
+ sys.stdout.write(' {} {}\n'.format(sha, path))
class cmd_submodule_init(Command):
blob - 23321789d0abed1c829fdc9deb8353fc1ea4609c
blob + 6757a856597c670df24c96956f0d6caf3ccc7323
--- dulwich/client.py
+++ dulwich/client.py
] + COMMON_CAPABILITIES
-class ReportStatusParser(object):
+class ReportStatusParser:
"""Handle status as reported by servers with 'report-status' capability."""
def __init__(self):
refs[ref] = sha
if len(refs) == 0:
- return {}, set([])
+ return {}, set()
if refs == {CAPABILITIES_REF: ZERO_SHA}:
refs = {}
return refs, set(server_capabilities)
-class FetchPackResult(object):
+class FetchPackResult:
"""Result of a fetch-pack operation.
Attributes:
if name in type(self)._FORWARDED_ATTRS:
self._warn_deprecated()
return getattr(self.refs, name)
- return super(FetchPackResult, self).__getattribute__(name)
+ return super().__getattribute__(name)
def __repr__(self):
- return "%s(%r, %r, %r)" % (
+ return "{}({!r}, {!r}, {!r})".format(
self.__class__.__name__,
self.refs,
self.symrefs,
)
-class SendPackResult(object):
+class SendPackResult:
"""Result of a upload-pack operation.
Attributes:
if name in type(self)._FORWARDED_ATTRS:
self._warn_deprecated()
return getattr(self.refs, name)
- return super(SendPackResult, self).__getattribute__(name)
+ return super().__getattribute__(name)
def __repr__(self):
- return "%s(%r, %r)" % (self.__class__.__name__, self.refs, self.agent)
+ return "{}({!r}, {!r})".format(self.__class__.__name__, self.refs, self.agent)
def _read_shallow_updates(pkt_seq):
return (new_shallow, new_unshallow)
-class _v1ReceivePackHeader(object):
+class _v1ReceivePackHeader:
def __init__(self, capabilities, old_refs, new_refs):
self.want = []
old_sha1 = old_refs.get(refname, ZERO_SHA)
if not isinstance(old_sha1, bytes):
raise TypeError(
- "old sha1 for %s is not a bytestring: %r" % (refname, old_sha1)
+ "old sha1 for {!r} is not a bytestring: {!r}".format(refname, old_sha1)
)
new_sha1 = new_refs.get(refname, ZERO_SHA)
if not isinstance(new_sha1, bytes):
raise TypeError(
- "old sha1 for %s is not a bytestring %r" % (refname, new_sha1)
+ "old sha1 for {!r} is not a bytestring {!r}".format(refname, new_sha1)
)
if old_sha1 != new_sha1:
# TODO(durin42): this doesn't correctly degrade if the server doesn't
# support some capabilities. This should work properly with servers
# that don't support multi_ack.
-class GitClient(object):
+class GitClient:
"""Git smart server client."""
def __init__(
def __init__(self, path_encoding=DEFAULT_ENCODING, **kwargs):
self._remote_path_encoding = path_encoding
- super(TraditionalGitClient, self).__init__(**kwargs)
+ super().__init__(**kwargs)
async def _connect(self, cmd, path):
"""Create a connection to the server.
port = TCP_GIT_PORT
self._host = host
self._port = port
- super(TCPGitClient, self).__init__(**kwargs)
+ super().__init__(**kwargs)
@classmethod
def from_parsedurl(cls, parsedurl, **kwargs):
try:
s.connect(sockaddr)
break
- except socket.error as e:
+ except OSError as e:
err = e
if s is not None:
s.close()
return proto, lambda: _fileno_can_read(s), None
-class SubprocessWrapper(object):
+class SubprocessWrapper:
"""A socket-like object that talks to a subprocess via pipes."""
def __init__(self, proc):
old_sha1 = old_refs.get(refname, ZERO_SHA)
if new_sha1 != ZERO_SHA:
if not target.refs.set_if_equals(refname, old_sha1, new_sha1):
- msg = "unable to set %s to %s" % (refname, new_sha1)
+ msg = "unable to set {} to {}".format(refname, new_sha1)
progress(msg)
ref_status[refname] = msg
else:
default_local_git_client_cls = LocalGitClient
-class SSHVendor(object):
+class SSHVendor:
"""A client side SSH implementation."""
def run_command(
"""Refusing to connect to strange SSH hostname."""
def __init__(self, hostname):
- super(StrangeHostname, self).__init__(hostname)
+ super().__init__(hostname)
class SubprocessSSHVendor(SSHVendor):
args.extend(["-i", str(key_filename)])
if username:
- host = "%s@%s" % (username, host)
+ host = "{}@{}".format(username, host)
if host.startswith("-"):
raise StrangeHostname(hostname=host)
args.append(host)
args.extend(["-i", str(key_filename)])
if username:
- host = "%s@%s" % (username, host)
+ host = "{}@{}".format(username, host)
if host.startswith("-"):
raise StrangeHostname(hostname=host)
args.append(host)
self.ssh_command = ssh_command or os.environ.get(
"GIT_SSH_COMMAND", os.environ.get("GIT_SSH")
)
- super(SSHGitClient, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.alternative_paths = {}
if vendor is not None:
self.ssh_vendor = vendor
kwargs["username"] = urlunquote(username)
netloc = parsedurl.hostname
if parsedurl.port:
- netloc = "%s:%s" % (netloc, parsedurl.port)
+ netloc = "{}:{}".format(netloc, parsedurl.port)
if parsedurl.username:
- netloc = "%s@%s" % (parsedurl.username, netloc)
+ netloc = "{}@{}".format(parsedurl.username, netloc)
parsedurl = parsedurl._replace(netloc=netloc)
return cls(urlunparse(parsedurl), **kwargs)
def __repr__(self):
- return "%s(%r, dumb=%r)" % (
+ return "{}({!r}, dumb={!r})".format(
type(self).__name__,
self._base_url,
self.dumb,
self.config = config
- super(Urllib3HttpGitClient, self).__init__(
+ super().__init__(
base_url=base_url, dumb=dumb, **kwargs)
def _get_url(self, path):
blob - eb566efd96711e7e9e9bd607f170b73f6df7c599
blob + d6d1c562b6365070f9fe8bf136c2af40f8b56e28
--- dulwich/cloud/gcs.py
+++ dulwich/cloud/gcs.py
class GcsObjectStore(BucketBasedObjectStore):
def __init__(self, bucket, subpath=''):
- super(GcsObjectStore, self).__init__()
+ super().__init__()
self.bucket = bucket
self.subpath = subpath
def __repr__(self):
- return "%s(%r, subpath=%r)" % (
+ return "{}({!r}, subpath={!r})".format(
type(self).__name__, self.bucket, self.subpath)
def _remove_pack(self, name):
name, ext = posixpath.splitext(posixpath.basename(blob.name))
packs.setdefault(name, set()).add(ext)
for name, exts in packs.items():
- if exts == set(['.pack', '.idx']):
+ if exts == {'.pack', '.idx'}:
yield name
def _load_pack_data(self, name):
blob - 4cea1f6f449651c86de535b29cd5a837f0287fe9
blob + 88b8d106cea7e6c2a6c7833bb158b11b12a15d2a
--- dulwich/config.py
+++ dulwich/config.py
ValueLike = Union[bytes, str]
-class Config(object):
+class Config:
"""A Git configuration."""
def get(self, section: SectionLike, name: NameLike) -> Value:
self._values = CaseInsensitiveOrderedMultiDict.make(values)
def __repr__(self) -> str:
- return "%s(%r)" % (self.__class__.__name__, self._values)
+ return "{}({!r})".format(self.__class__.__name__, self._values)
def __eq__(self, other: object) -> bool:
return isinstance(other, self.__class__) and other._values == self._values
] = None,
encoding: Union[str, None] = None
) -> None:
- super(ConfigFile, self).__init__(values=values, encoding=encoding)
+ super().__init__(values=values, encoding=encoding)
self.path: Optional[str] = None
@classmethod # noqa: C901
self.writable = writable
def __repr__(self) -> str:
- return "<%s for %r>" % (self.__class__.__name__, self.backends)
+ return "<{} for {!r}>".format(self.__class__.__name__, self.backends)
@classmethod
def default(cls) -> "StackedConfig":
blob - 8293b526d3bdd847c5cbda519677ea427c0554c3
blob + 9246e6219310868139ba04f314a7f89122fd8bfc
--- dulwich/contrib/diffstat.py
+++ dulwich/contrib/diffstat.py
#!/usr/bin/env python
-# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab
# Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
import sys
import re
-from typing import Optional
+from typing import Optional, Tuple, List
# only needs to detect git style diffs as this is for
# use with dulwich
# properly interface with diffstat routine
-def _parse_patch(lines):
+def _parse_patch(lines: List[bytes]) -> Tuple[List[bytes], List[bool], List[Tuple[int, int]]]:
"""Parse a git style diff or patch to generate diff stats.
Args:
blob - 541d99b5e48559077b79173832a51595c66cd62f
blob + d5b83939f8873be0f345d16220594b0eacbcf8a4
--- dulwich/contrib/paramiko_vendor.py
+++ dulwich/contrib/paramiko_vendor.py
import paramiko.client
-class _ParamikoWrapper(object):
+class _ParamikoWrapper:
def __init__(self, client, channel):
self.client = client
self.channel = channel
self.channel.close()
-class ParamikoSSHVendor(object):
+class ParamikoSSHVendor:
# http://docs.paramiko.org/en/2.4/api/client.html
def __init__(self, **kwargs):
blob - 5b2734e7ec053960d6c3ceb6f7280d916d7de4c5
blob + 2c966a10b02a29a0939aace3500a3b6391ffe603
--- dulwich/contrib/release_robot.py
+++ dulwich/contrib/release_robot.py
obj = project.get_object(value) # dulwich object from SHA-1
# don't just check if object is "tag" b/c it could be a "commit"
# instead check if "tags" is in the ref-name
- if u"tags" not in key:
+ if "tags" not in key:
# skip ref if not a tag
continue
# strip the leading text from refs to get "tag name"
- _, tag = key.rsplit(u"/", 1)
+ _, tag = key.rsplit("/", 1)
# check if tag object is "commit" or "tag" pointing to a "commit"
try:
commit = obj.object # a tuple (commit class, commit id)
blob - c7f0183a24082ae1b6e78f35444aa1341ce2e187
blob + bb58d75c4c5fb3c566f6650b10e8eab1c9510dc4
--- dulwich/contrib/requests_vendor.py
+++ dulwich/contrib/requests_vendor.py
if username is not None:
self.session.auth = (username, password)
- super(RequestsHttpGitClient, self).__init__(
+ super().__init__(
base_url=base_url, dumb=dumb, **kwargs)
def _http_request(self, url, headers=None, data=None, allow_compression=False):
blob - 4deac806b10efbf3a775eb3af3be8189bcb6f633
blob + d97e2374e3d0cf591f20cb0d0914c7ddbd51315c
--- dulwich/contrib/swift.py
+++ dulwich/contrib/swift.py
pass
-class SwiftConnector(object):
+class SwiftConnector:
"""A Connector to swift that manage authentication and errors catching"""
def __init__(self, root, conf):
)
-class SwiftPackReader(object):
+class SwiftPackReader:
"""A SwiftPackReader that mimic read and sync method
The reader allows to read a specified amount of bytes from
self.buff_length = self.buff_length * 2
offset = self.base_offset
r = min(self.base_offset + self.buff_length, self.pack_length)
- ret = self.scon.get_object(self.filename, range="%s-%s" % (offset, r))
+ ret = self.scon.get_object(self.filename, range="{}-{}".format(offset, r))
self.buff = ret
def read(self, length):
def __init__(self, *args, **kwargs):
self.scon = kwargs["scon"]
del kwargs["scon"]
- super(SwiftPack, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self._pack_info_path = self._basename + ".info"
self._pack_info = None
self._pack_info_load = lambda: load_pack_info(self._pack_info_path, self.scon)
Args:
scon: A `SwiftConnector` instance
"""
- super(SwiftObjectStore, self).__init__()
+ super().__init__()
self.scon = scon
self.root = self.scon.root
self.pack_dir = posixpath.join(OBJECTDIR, PACKDIR)
f = self.scon.get_object(self.filename)
if not f:
f = BytesIO(b"")
- super(SwiftInfoRefsContainer, self).__init__(f)
+ super().__init__(f)
def _load_check_ref(self, name, old_ref):
self._check_refname(name)
}
if len(sys.argv) < 2:
- print("Usage: %s <%s> [OPTIONS...]" % (sys.argv[0], "|".join(commands.keys())))
+ print("Usage: {} <{}> [OPTIONS...]".format(sys.argv[0], "|".join(commands.keys())))
sys.exit(1)
cmd = sys.argv[1]
blob - 712b4f6cdc8b7335a9843183ca47f47084ef94d8
blob + b34ad77de57682ad4bd54e25204ffd0816e4fd66
--- dulwich/contrib/test_paramiko_vendor.py
+++ dulwich/contrib/test_paramiko_vendor.py
class Server(paramiko.ServerInterface):
"""http://docs.paramiko.org/en/2.4/api/server.html"""
def __init__(self, commands, *args, **kwargs):
- super(Server, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self.commands = commands
def check_channel_exec_request(self, channel, command):
def _run(self):
try:
conn, addr = self.socket.accept()
- except socket.error:
+ except OSError:
return False
self.transport = paramiko.Transport(conn)
self.addCleanup(self.transport.close)
blob - a6a5f1d67d82a01991b2376302b34fff40058513
blob + 52b420a942b8dcf7a2b1d26131a6127646af1406
--- dulwich/contrib/test_swift.py
+++ dulwich/contrib/test_swift.py
return lambda root, conf: FakeSwiftConnector(root, conf=conf, store=store)
-class Response(object):
+class Response:
def __init__(self, headers={}, status=200, content=None):
self.headers = headers
self.status_code = status
def create_commits(length=1, marker=b"Default"):
data = []
for i in range(0, length):
- _marker = ("%s_%s" % (marker, i)).encode()
+ _marker = ("{}_{}".format(marker, i)).encode()
blob, tree, tag, cmt = create_commit(data, _marker)
data.extend([blob, tree, tag, cmt])
return data
@skipIf(missing_libs, skipmsg)
-class FakeSwiftConnector(object):
+class FakeSwiftConnector:
def __init__(self, root, conf, store=None):
if store:
self.store = store
@skipIf(missing_libs, skipmsg)
class TestSwiftRepo(TestCase):
def setUp(self):
- super(TestSwiftRepo, self).setUp()
+ super().setUp()
self.conf = swift.load_conf(file=StringIO(config_file % def_config_file))
def test_init(self):
@skipIf(missing_libs, skipmsg)
class TestSwiftInfoRefsContainer(TestCase):
def setUp(self):
- super(TestSwiftInfoRefsContainer, self).setUp()
+ super().setUp()
content = (
b"22effb216e3a82f97da599b8885a6cadb488b4c5\trefs/heads/master\n"
b"cca703b0e1399008b53a1a236d6b4584737649e4\trefs/heads/dev"
@skipIf(missing_libs, skipmsg)
class TestSwiftConnector(TestCase):
def setUp(self):
- super(TestSwiftConnector, self).setUp()
+ super().setUp()
self.conf = swift.load_conf(file=StringIO(config_file % def_config_file))
with patch("geventhttpclient.HTTPClient.request", fake_auth_request_v1):
self.conn = swift.SwiftConnector("fakerepo", conf=self.conf)
with patch(
"geventhttpclient.HTTPClient.request",
lambda *args: Response(
- content=json.dumps((({"name": "a"}, {"name": "b"})))
+ content=json.dumps(({"name": "a"}, {"name": "b"}))
),
):
self.assertEqual(len(self.conn.get_container_objects()), 2)
blob - ba403fe81f12a3f3d057c65320b81dc4f06e70e5
blob + 5156800fe8def62d6cae796f08df3cdc7ee393e5
--- dulwich/diff_tree.py
+++ dulwich/diff_tree.py
source and target tree.
"""
if rename_detector is not None and tree1_id is not None and tree2_id is not None:
- for change in rename_detector.changes_with_renames(
+ yield from rename_detector.changes_with_renames(
tree1_id,
tree2_id,
want_unchanged=want_unchanged,
include_trees=include_trees,
- ):
- yield change
+ )
return
entries = walk_trees(
return (path1, path2)
-class RenameDetector(object):
+class RenameDetector:
"""Object for handling rename detection between two trees."""
def __init__(
blob - 6c6b5326b8048c1397549c54a311731ca0ee024f
blob + 03fb03161da4e4d5cd9da2e45166263a581c4ed4
--- dulwich/errors.py
+++ dulwich/errors.py
if self.extra is None:
Exception.__init__(
self,
- "Checksum mismatch: Expected %s, got %s" % (expected, got),
+ "Checksum mismatch: Expected {}, got {}".format(expected, got),
)
else:
Exception.__init__(
self,
- "Checksum mismatch: Expected %s, got %s; %s" % (expected, got, extra),
+ "Checksum mismatch: Expected {}, got {}; {}".format(expected, got, extra),
)
type_name: str
def __init__(self, sha, *args, **kwargs):
- Exception.__init__(self, "%s is not a %s" % (sha, self.type_name))
+ Exception.__init__(self, "{} is not a {}".format(sha, self.type_name))
class NotCommitError(WrongObjectException):
def __init__(self, *args, **kwargs):
self.ref_status = kwargs.pop("ref_status")
- super(UpdateRefsError, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
class HangupException(GitProtocolError):
def __init__(self, stderr_lines=None):
if stderr_lines:
- super(HangupException, self).__init__(
+ super().__init__(
"\n".join(
[line.decode("utf-8", "surrogateescape") for line in stderr_lines]
)
)
else:
- super(HangupException, self).__init__(
+ super().__init__(
"The remote server unexpectedly closed the connection."
)
self.stderr_lines = stderr_lines
command = "flush-pkt"
else:
command = "command %s" % command
- super(UnexpectedCommandError, self).__init__(
+ super().__init__(
"Protocol got unexpected %s" % command
)
blob - 5b596b9aaeef89cf1c7c160d47e87b186f50e3c3
blob + a182b4c820f25ae8f5dabdd2f3e555117048f43d
--- dulwich/fastexport.py
+++ dulwich/fastexport.py
return (name, email.rstrip(b">"))
-class GitFastExporter(object):
+class GitFastExporter:
"""Generate a fast-export output stream for Git objects."""
def __init__(self, outf, store):
blob - 7616c8cbe22f77c6807578ac006ee872f9f9bb66
blob + d5f02012c16399ed7e624f2a6bdb17f665a24805
--- dulwich/file.py
+++ dulwich/file.py
"""Safe access to git files."""
-import io
import os
import sys
"""
if "a" in mode:
- raise IOError("append mode not supported for Git files")
+ raise OSError("append mode not supported for Git files")
if "+" in mode:
- raise IOError("read/write mode not supported for Git files")
+ raise OSError("read/write mode not supported for Git files")
if "b" not in mode:
- raise IOError("text mode not supported for Git files")
+ raise OSError("text mode not supported for Git files")
if "w" in mode:
return _GitFile(filename, mode, bufsize, mask)
else:
- return io.open(filename, mode, bufsize)
+ return open(filename, mode, bufsize)
class FileLocked(Exception):
def __init__(self, filename, lockfilename):
self.filename = filename
self.lockfilename = lockfilename
- super(FileLocked, self).__init__(filename, lockfilename)
+ super().__init__(filename, lockfilename)
-class _GitFile(object):
+class _GitFile:
"""File that follows the git locking protocol for writes.
All writes to a file foo will be written into foo.lock in the same
released. Typically this will happen in a finally block.
"""
- PROXY_PROPERTIES = set(
- [
- "closed",
- "encoding",
- "errors",
- "mode",
- "name",
- "newlines",
- "softspace",
- ]
- )
+ PROXY_PROPERTIES = {
+ "closed",
+ "encoding",
+ "errors",
+ "mode",
+ "name",
+ "newlines",
+ "softspace",
+ }
PROXY_METHODS = (
"__iter__",
"flush",
blob - 01ab382c340e95cadbb2c353ce63495d295fcb13
blob + e15278e7c7a4c3eec5139427fcf5720b06ea3a4a
--- dulwich/graph.py
+++ dulwich/graph.py
#!/usr/bin/env python
-# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab
# Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
#
blob - ec89e8f38e1b75336466ab94633b94e4aad800cf
blob + bf7396d286010e88d61dc0ccf41c79b2168961fc
--- dulwich/greenthreads.py
+++ dulwich/greenthreads.py
self.sha_done.add(t)
missing_tags = want_tags.difference(have_tags)
wants = missing_commits.union(missing_tags)
- self.objects_to_send = set([(w, None, False) for w in wants])
+ self.objects_to_send = {(w, None, False) for w in wants}
if progress is None:
self.progress = lambda x: None
else:
def __init__(self, store, shas, finder, concurrency=1):
self.finder = finder
self.p = pool.Pool(size=concurrency)
- super(GreenThreadsObjectStoreIterator, self).__init__(store, shas)
+ super().__init__(store, shas)
def retrieve(self, args):
sha, path = args
return self.store[sha], path
def __iter__(self):
- for sha, path in self.p.imap_unordered(self.retrieve, self.itershas()):
- yield sha, path
+ yield from self.p.imap_unordered(self.retrieve, self.itershas())
def __len__(self):
if len(self._shas) > 0:
blob - 0b4e39d1e03ab126c96df9cbd7b9b448786f6da7
blob + e4d27648c5863caf186bdb64f40dbd0114b5bdcb
--- dulwich/hooks.py
+++ dulwich/hooks.py
)
-class Hook(object):
+class Hook:
"""Generic hook object."""
def execute(self, *args):
blob - b7b596afc8717926fc74cabaaa9d23d6d7b7b7c7
blob + eed363696651eea552493e2f53a2697e71dd74a7
--- dulwich/ignore.py
+++ dulwich/ignore.py
return Pattern(pattern, ignorecase).match(path)
-class Pattern(object):
+class Pattern:
"""A single ignore pattern."""
def __init__(self, pattern: bytes, ignorecase: bool = False):
)
def __repr__(self) -> str:
- return "%s(%r, %r)" % (
+ return "{}({!r}, {!r})".format(
type(self).__name__,
self.pattern,
self.ignorecase,
return bool(self._re.match(path))
-class IgnoreFilter(object):
+class IgnoreFilter:
def __init__(self, patterns: Iterable[bytes], ignorecase: bool = False, path=None):
self._patterns: List[Pattern] = []
self._ignorecase = ignorecase
def __repr__(self) -> str:
path = getattr(self, "_path", None)
if path is not None:
- return "%s.from_path(%r)" % (type(self).__name__, path)
+ return "{}.from_path({!r})".format(type(self).__name__, path)
else:
return "<%s>" % (type(self).__name__)
-class IgnoreFilterStack(object):
+class IgnoreFilterStack:
"""Check for ignore status in multiple filters."""
def __init__(self, filters):
return get_xdg_config_home_path("git", "ignore")
-class IgnoreFilterManager(object):
+class IgnoreFilterManager:
"""Ignore file manager."""
def __init__(
self._ignorecase = ignorecase
def __repr__(self) -> str:
- return "%s(%s, %r, %r)" % (
+ return "{}({}, {!r}, {!r})".format(
type(self).__name__,
self._top_path,
self._global_filters,
p = os.path.join(self._top_path, path, ".gitignore")
try:
self._path_filters[path] = IgnoreFilter.from_path(p, self._ignorecase)
- except IOError:
+ except OSError:
self._path_filters[path] = None
return self._path_filters[path]
]:
try:
global_filters.append(IgnoreFilter.from_path(os.path.expanduser(p)))
- except IOError:
+ except OSError:
pass
config = repo.get_config_stack()
ignorecase = config.get_boolean((b"core"), (b"ignorecase"), False)
blob - 5ed3b834b91bc1709dc49c7cee2c1511a5a97ec4
blob + d875b4851077035a92c82421f2e8f1fc460ea433
--- dulwich/index.py
+++ dulwich/index.py
(extended_flags, ) = struct.unpack(">H", f.read(2))
else:
extended_flags = 0
- name = f.read((flags & 0x0FFF))
+ name = f.read(flags & 0x0FFF)
# Padding:
if version < 4:
real_size = (f.tell() - beginoffset + 8) & ~7
return ret
-class Index(object):
+class Index:
"""A Git Index file."""
def __init__(self, filename: Union[bytes, str], read=True):
return self._filename
def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, self._filename)
+ return "{}({!r})".format(self.__class__.__name__, self._filename)
def write(self) -> None:
"""Write current contents of index to disk."""
entry = self[path]
return entry.sha, cleanup_mode(entry.mode)
- for (name, mode, sha) in changes_from_tree(
+ yield from changes_from_tree(
self._byname.keys(),
lookup_entry,
object_store,
tree,
want_unchanged=want_unchanged,
- ):
- yield (name, mode, sha)
+ )
def commit(self, object_store):
"""Create a new tree from an index.
index[path] = entry
-class locked_index(object):
+class locked_index:
"""Lock the index while making modifications.
Works as a context manager.
blob - 03b57b26ce5931ce7879ccc7087c025b420d6853
blob + 3a3f4fe9f3ccf9d1d70c0ad43effdf4fd2e6c963
--- dulwich/lfs.py
+++ dulwich/lfs.py
import tempfile
-class LFSStore(object):
+class LFSStore:
"""Stores objects on disk, indexed by SHA256."""
def __init__(self, path):
blob - 1f931b63f20787ec9a40a6d28cbce860623d2a84
blob + 7060795636f2266217fae6be5c69bffcdd7387de
--- dulwich/line_ending.py
+++ dulwich/line_ending.py
return None
-class BlobNormalizer(object):
+class BlobNormalizer:
"""An object to store computation result of which filter to apply based
on configuration, gitattributes, path and operation (checkin or checkout)
"""
blob - 876f37d6e370f96ce361e275cdecf09b56420920
blob + bc9f914b78a2db4e086ec3a444f4b070ada77045
--- dulwich/lru_cache.py
+++ dulwich/lru_cache.py
prev_key = None
else:
prev_key = self.prev.key
- return "%s(%r n:%r p:%r)" % (
+ return "{}({!r} n:{!r} p:{!r})".format(
self.__class__.__name__,
self.key,
self.next_key,
blob - 230e1e6f94b34a3ed774092d76042389b95e04c8
blob + c91ec50bc4665c110b78570173db02d625841947
--- dulwich/mailmap.py
+++ dulwich/mailmap.py
yield parsed_canonical_identity, parsed_from_identity
-class Mailmap(object):
+class Mailmap:
"""Class for accessing a mailmap file."""
def __init__(self, map=None):
blob - b55d35b31e4034d4e152f4cc27adb383e41b9908
blob + 9e2eb72057ef6f2f61542ab5530dcbf6cca4fabd
--- dulwich/object_store.py
+++ dulwich/object_store.py
PACK_MODE = 0o444 if sys.platform != "win32" else 0o644
-class BaseObjectStore(object):
+class BaseObjectStore:
"""Object store interface."""
def determine_wants_all(
def _iter_alternate_objects(self):
"""Iterate over the SHAs of all the objects in alternate stores."""
for alternate in self.alternates:
- for alternate_object in alternate:
- yield alternate_object
+ yield from alternate
def _iter_loose_objects(self):
"""Iterate over the SHAs of all loose objects."""
self._update_pack_cache()
for pack in self._iter_cached_packs():
try:
- for sha in pack:
- yield sha
+ yield from pack
except PackFileDisappeared:
pass
- for sha in self._iter_loose_objects():
- yield sha
- for sha in self._iter_alternate_objects():
- yield sha
+ yield from self._iter_loose_objects()
+ yield from self._iter_alternate_objects()
def contains_loose(self, sha):
"""Check if a particular object is present by SHA1 and is loose.
sha = name
hexsha = None
else:
- raise AssertionError("Invalid object name %r" % (name,))
+ raise AssertionError("Invalid object name {!r}".format(name))
for pack in self._iter_cached_packs():
try:
return pack.get_raw(sha)
loose_compression_level: zlib compression level for loose objects
pack_compression_level: zlib compression level for pack objects
"""
- super(DiskObjectStore, self).__init__(
+ super().__init__(
pack_compression_level=pack_compression_level
)
self.path = path
self.pack_compression_level = pack_compression_level
def __repr__(self):
- return "<%s(%r)>" % (self.__class__.__name__, self.path)
+ return "<{}({!r})>".format(self.__class__.__name__, self.path)
@classmethod
def from_config(cls, path, config):
"""Object store that keeps all objects in memory."""
def __init__(self):
- super(MemoryObjectStore, self).__init__()
+ super().__init__()
self._data = {}
self.pack_compression_level = -1
elif len(sha) == 20:
return sha_to_hex(sha)
else:
- raise ValueError("Invalid sha %r" % (sha,))
+ raise ValueError("Invalid sha {!r}".format(sha))
def contains_loose(self, sha):
"""Check if a particular object is present by SHA1 and is loose."""
commit()
-class ObjectIterator(object):
+class ObjectIterator:
"""Interface for iterating over objects."""
def iterobjects(self):
return (commits, tags, others)
-class MissingObjectFinder(object):
+class MissingObjectFinder:
"""Find the objects missing from another object store.
Args:
wants = missing_commits.union(missing_tags)
wants = wants.union(missing_others)
- self.objects_to_send = set([(w, None, False) for w in wants])
+ self.objects_to_send = {(w, None, False) for w in wants}
if progress is None:
self.progress = lambda x: None
__next__ = next
-class ObjectStoreGraphWalker(object):
+class ObjectStoreGraphWalker:
"""Graph walker that finds what commits are missing from an object store.
Attributes:
"""Ack that a revision and its ancestors are present in the source."""
if len(sha) != 40:
raise ValueError("unexpected sha %r received" % sha)
- ancestors = set([sha])
+ ancestors = {sha}
# stop if we run out of heads to remove
while self.heads:
blob - 01aca1d17386f26a08bf24c0a01b1164d0336182
blob + 06b12ee7bcbf16b49fe1766c76d73659dcaf185d
--- dulwich/objects.py
+++ dulwich/objects.py
ObjectFormatException: Raised when the string is not valid
"""
if not valid_hexsha(hex):
- raise ObjectFormatException("%s %s" % (error_msg, hex))
+ raise ObjectFormatException("{} {}".format(error_msg, hex))
def check_identity(identity: bytes, error_msg: str) -> None:
return b" ".join(items) + b"\n"
-class FixedSha(object):
+class FixedSha:
"""SHA object that behaves like hashlib's but is given a fixed value."""
__slots__ = ("_hexsha", "_sha")
return self._hexsha.decode("ascii")
-class ShaFile(object):
+class ShaFile:
"""A git SHA file."""
__slots__ = ("_chunked_text", "_sha", "_needs_serialization")
return self.sha().hexdigest().encode("ascii")
def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, self.id)
+ return "<{} {}>".format(self.__class__.__name__, self.id)
def __ne__(self, other):
"""Check whether this object does not match the other."""
_chunked_text: List[bytes]
def __init__(self):
- super(Blob, self).__init__()
+ super().__init__()
self._chunked_text = []
self._needs_serialization = False
Raises:
ObjectFormatException: if the object is malformed in some way
"""
- super(Blob, self).check()
+ super().check()
def splitlines(self) -> List[bytes]:
"""Return list of lines in this blob.
_tagger: Optional[bytes]
def __init__(self):
- super(Tag, self).__init__()
+ super().__init__()
self._tagger = None
self._tag_time = None
self._tag_timezone = None
Raises:
ObjectFormatException: if the object is malformed in some way
"""
- super(Tag, self).check()
+ super().check()
assert self._chunked_text is not None
self._check_has_member("_object_sha", "missing object sha")
self._check_has_member("_object_class", "missing object type")
kind = "tree"
else:
kind = "blob"
- return "%04o %s %s\t%s\n" % (
+ return "{:04o} {} {}\t{}\n".format(
mode,
kind,
hexsha.decode("ascii"),
__slots__ = "_entries"
def __init__(self):
- super(Tree, self).__init__()
+ super().__init__()
self._entries = {}
@classmethod
# TODO: list comprehension is for efficiency in the common (small)
# case; if memory efficiency in the large case is a concern, use a
# genexp.
- self._entries = dict([(n, (m, s)) for n, m, s in parsed_entries])
+ self._entries = {n: (m, s) for n, m, s in parsed_entries}
def check(self):
"""Check this object for internal consistency.
Raises:
ObjectFormatException: if the object is malformed in some way
"""
- super(Tree, self).check()
+ super().check()
assert self._chunked_text is not None
last = None
allowed_modes = (
)
def __init__(self):
- super(Commit, self).__init__()
+ super().__init__()
self._parents = []
self._encoding = None
self._mergetag = []
Raises:
ObjectFormatException: if the object is malformed in some way
"""
- super(Commit, self).check()
+ super().check()
assert self._chunked_text is not None
self._check_has_member("_tree", "missing tree")
self._check_has_member("_author", "missing author")
chunks[-1] = chunks[-1][:-2]
for k, v in self.extra:
if b"\n" in k or b"\n" in v:
- raise AssertionError("newline in extra data: %r -> %r" % (k, v))
+ raise AssertionError("newline in extra data: {!r} -> {!r}".format(k, v))
chunks.append(git_line(k, v))
if self.gpgsig:
sig_chunks = self.gpgsig.split(b"\n")
blob - 51f92d8a4371fe11142b4c2932f48eeca079080c
blob + b7fa2e517b6e28b089f0f530952f7051d7a5152d
--- dulwich/pack.py
+++ dulwich/pack.py
self.obj = obj
-class UnpackedObject(object):
+class UnpackedObject:
"""Class encapsulating an object unpacked from a pack file.
These objects should only be created from within unpack_object. Most
return not (self == other)
def __repr__(self):
- data = ["%s=%r" % (s, getattr(self, s)) for s in self.__slots__]
- return "%s(%s)" % (self.__class__.__name__, ", ".join(data))
+ data = ["{}={!r}".format(s, getattr(self, s)) for s in self.__slots__]
+ return "{}({})".format(self.__class__.__name__, ", ".join(data))
_ZLIB_BUFSIZE = 4096
return None
-class PackIndex(object):
+class PackIndex:
"""An index in to a packfile.
Given a sha id of an object a pack index can tell you the location in the
):
return False
- return super(FilePackIndex, self).__eq__(other)
+ return super().__eq__(other)
def close(self):
self._file.close()
"""Version 1 Pack Index file."""
def __init__(self, filename, file=None, contents=None, size=None):
- super(PackIndex1, self).__init__(filename, file, contents, size)
+ super().__init__(filename, file, contents, size)
self.version = 1
self._fan_out_table = self._read_fan_out_table(0)
"""Version 2 Pack Index file."""
def __init__(self, filename, file=None, contents=None, size=None):
- super(PackIndex2, self).__init__(filename, file, contents, size)
+ super().__init__(filename, file, contents, size)
if self._contents[:4] != b"\377tOc":
raise AssertionError("Not a v2 pack index file")
(self.version,) = unpack_from(b">L", self._contents, 4)
return chunks_length(obj)
-class PackStreamReader(object):
+class PackStreamReader:
"""Class to read a pack stream.
The pack is read from a ReceivableProtocol using read() or recv() as
appropriate.
"""
- def __init__(self, read_all, read_some=None, zlib_bufsize=_ZLIB_BUFSIZE):
+ def __init__(self, read_all, read_some=None, zlib_bufsize=_ZLIB_BUFSIZE) -> None:
self.read_all = read_all
if read_some is None:
self.read_some = read_all
delta_iter: Optional DeltaChainIterator to record deltas as we
read them.
"""
- super(PackStreamCopier, self).__init__(read_all, read_some=read_some)
+ super().__init__(read_all, read_some=read_some)
self.outfile = outfile
self._delta_iter = delta_iter
def _read(self, read, size):
"""Read data from the read callback and write it to the file."""
- data = super(PackStreamCopier, self)._read(read, size)
+ data = super()._read(read, size)
self.outfile.write(data)
return data
return sha
-class PackData(object):
+class PackData:
"""The data contained in a packfile.
Pack files can be accessed both sequentially for exploding a pack, and
return (unpacked.pack_type_num, unpacked._obj())
-class DeltaChainIterator(object):
+class DeltaChainIterator:
"""Abstract iterator over pack data based on delta chains.
Each object in the pack is guaranteed to be inflated exactly once,
return unpacked.sha_file()
-class SHA1Reader(object):
+class SHA1Reader:
"""Wrapper for file-like object that remembers the SHA1 of its data."""
def __init__(self, f):
return self.f.tell()
-class SHA1Writer(object):
+class SHA1Writer:
"""Wrapper for file-like object that remembers the SHA1 of its data."""
def __init__(self, f):
write(chunk)
-def deltify_pack_objects(objects, window_size=None, reuse_pack=None):
+def deltify_pack_objects(objects, window_size: Optional[int] = None, reuse_pack=None):
"""Generate deltas for pack objects.
Args:
)
-class PackChunkGenerator(object):
+class PackChunkGenerator:
def __init__(self, num_records=None, records=None, progress=None, compression_level=-1):
self.cs = sha1(b"")
write_pack_index = write_pack_index_v2
-class _PackTupleIterable(object):
+class _PackTupleIterable:
"""Helper for Pack.pack_tuples."""
def __init__(self, iterobjects, length):
return ((o, None) for o in self._iterobjects())
-class Pack(object):
+class Pack:
"""A Git pack object."""
_data_load: Optional[Callable[[], PackData]]
return len(self.index)
def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, self._basename)
+ return "{}({!r})".format(self.__class__.__name__, self._basename)
def __iter__(self):
"""Iterate over all the sha1s of the objects in this pack."""
blob - 0637ec8304e7e118a4e87418e7322c88c7a4c608
blob + bcae70e32166efc77b675b9fa5167f8c376101f1
--- dulwich/patch.py
+++ dulwich/patch.py
beginning = start + 1 # lines start numbering with one
length = stop - start
if length == 1:
- return "{}".format(beginning)
+ return f"{beginning}"
if not length:
beginning -= 1 # empty ranges begin at line just before the range
- return "{},{}".format(beginning, length)
+ return f"{beginning},{length}"
def unified_diff(
for group in SequenceMatcher(None, a, b).get_grouped_opcodes(n):
if not started:
started = True
- fromdate = "\t{}".format(fromfiledate) if fromfiledate else ""
- todate = "\t{}".format(tofiledate) if tofiledate else ""
+ fromdate = f"\t{fromfiledate}" if fromfiledate else ""
+ todate = f"\t{tofiledate}" if tofiledate else ""
yield "--- {}{}{}".format(
fromfile.decode(tree_encoding), fromdate, lineterm
).encode(output_encoding)
first, last = group[0], group[-1]
file1_range = _format_range_unified(first[1], last[2])
file2_range = _format_range_unified(first[3], last[4])
- yield "@@ -{} +{} @@{}".format(file1_range, file2_range, lineterm).encode(
+ yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(
output_encoding
)
blob - c92da4164d347bfdb0030f1173e44e71f9a08f72
blob + 1bccd43efce625263be54fbeea89023a04055fa8
--- dulwich/porcelain.py
+++ dulwich/porcelain.py
"""Porcelain-based error. """
def __init__(self, msg):
- super(Error, self).__init__(msg)
+ super().__init__(msg)
class RemoteExists(Error):
else:
try:
blob = blob_from_path_and_stat(full_path, st)
- except IOError:
+ except OSError:
pass
else:
try:
for key, value in refs.items():
key = key.decode()
obj = r.get_object(value)
- if u"tags" not in key:
+ if "tags" not in key:
continue
- _, tag = key.rsplit(u"/", 1)
+ _, tag = key.rsplit("/", 1)
try:
commit = obj.object
# If there are no tags, return the current commit
if len(sorted_tags) == 0:
- return "g{}".format(find_unique_abbrev(r.object_store, r[r.head()].id))
+ return f"g{find_unique_abbrev(r.object_store, r[r.head()].id)}"
# We're now 0 commits from the top
commit_count = 0
blob - a02ef21cc9362c6204905ac421cb6e0230f5fb11
blob + 2013a0b14437c3e7d8e8e27adef3b40793520923
--- dulwich/protocol.py
+++ dulwich/protocol.py
from os import (
SEEK_END,
)
-import socket
import dulwich
from dulwich.errors import (
return ("%04x" % (len(data) + 4)).encode("ascii") + data
-class Protocol(object):
+class Protocol:
"""Class for interacting with a remote git process over the wire.
Parts of the git wire protocol use 'pkt-lines' to communicate. A pkt-line
pkt_contents = read(size - 4)
except ConnectionResetError as exc:
raise HangupException() from exc
- except socket.error as exc:
+ except OSError as exc:
raise GitProtocolError(exc) from exc
else:
if len(pkt_contents) + 4 != size:
self.write(line)
if self.report_activity:
self.report_activity(len(line), "write")
- except socket.error as exc:
+ except OSError as exc:
raise GitProtocolError(exc) from exc
def write_sideband(self, channel, blob):
def __init__(
self, recv, write, close=None, report_activity=None, rbufsize=_RBUFSIZE
):
- super(ReceivableProtocol, self).__init__(
+ super().__init__(
self.read, write, close=close, report_activity=report_activity
)
self._recv = recv
return SINGLE_ACK
-class BufferedPktLineWriter(object):
+class BufferedPktLineWriter:
"""Writer that wraps its data in pkt-lines and has an independent buffer.
Consecutive calls to write() wrap the data in a pkt-line and then buffers
self._wbuf = BytesIO()
-class PktLineParser(object):
+class PktLineParser:
"""Packet line parser that hands completed packets off to a callback."""
def __init__(self, handle_pkt):
blob - 700563e646ddfa0bd5d5571f8bb87c9736956871
blob + 850747722e33b4d032f087e7c1c18f0d22a6ff78
--- dulwich/refs.py
+++ dulwich/refs.py
return True
-class RefsContainer(object):
+class RefsContainer:
"""A container for refs."""
def __init__(self, logger=None):
"""
def __init__(self, refs, logger=None):
- super(DictRefsContainer, self).__init__(logger=logger)
+ super().__init__(logger=logger)
self._refs = refs
self._peeled = {}
self._watchers = set()
"""Refs container that reads refs from disk."""
def __init__(self, path, worktree_path=None, logger=None):
- super(DiskRefsContainer, self).__init__(logger=logger)
+ super().__init__(logger=logger)
if getattr(path, "encode", None) is not None:
path = os.fsencode(path)
self.path = path
self._peeled_refs = None
def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, self.path)
+ return "{}({!r})".format(self.__class__.__name__, self.path)
def subkeys(self, base):
subkeys = set()
if orig_ref != old_ref:
f.abort()
return False
- except (OSError, IOError):
+ except OSError:
f.abort()
raise
try:
f.write(new_ref + b"\n")
- except (OSError, IOError):
+ except OSError:
f.abort()
raise
self._log(
return False
try:
f.write(ref + b"\n")
- except (OSError, IOError):
+ except OSError:
f.abort()
raise
else:
blob - 3c52cdda0e587dadeddc9ee3c1c8fd84a908e307
blob + f8ccfeedd838706b5d182b6eb7b18de95abe58f3
--- dulwich/repo.py
+++ dulwich/repo.py
fullname = username
email = os.environ.get("EMAIL")
if email is None:
- email = "{}@{}".format(username, socket.gethostname())
+ email = f"{username}@{socket.gethostname()}"
return (fullname, email) # type: ignore
# Could implement other platform specific filesystem hiding here
-class ParentsProvider(object):
+class ParentsProvider:
def __init__(self, store, grafts={}, shallows=[]):
self.store = store
self.grafts = grafts
return commit.parents
-class BaseRepo(object):
+class BaseRepo:
"""Base class for a git repository.
This base class is meant to be used for Repository implementations that e.g.
raise NotTagError(ret)
else:
raise Exception(
- "Type invalid: %r != %r" % (ret.type_name, cls.type_name)
+ "Type invalid: {!r} != {!r}".format(ret.type_name, cls.type_name)
)
return ret
self.bare = bare
if bare is False:
if os.path.isfile(hidden_path):
- with open(hidden_path, "r") as f:
+ with open(hidden_path) as f:
path = read_gitfile(f)
self._controldir = os.path.join(root, path)
else:
blob - 49eff1b4af2cd4a6f9814cb5d36b9ae6b05aaf5a
blob + 43b23fe1012a34267186a6935608f68b40cfd746
--- dulwich/server.py
+++ dulwich/server.py
logger = log_utils.getLogger(__name__)
-class Backend(object):
+class Backend:
"""A backend for the Git smart server implementation."""
def open_repository(self, path):
raise NotImplementedError(self.open_repository)
-class BackendRepo(object):
+class BackendRepo:
"""Repository abstraction used by the Git server.
The methods required here are a subset of those provided by
"""Simple backend looking up Git repositories in the local file system."""
def __init__(self, root=os.sep):
- super(FileSystemBackend, self).__init__()
+ super().__init__()
self.root = (os.path.abspath(root) + os.sep).replace(os.sep * 2, os.sep)
def open_repository(self, path):
normcase_abspath = os.path.normcase(abspath)
normcase_root = os.path.normcase(self.root)
if not normcase_abspath.startswith(normcase_root):
- raise NotGitRepository("Path %r not inside root %r" % (path, self.root))
+ raise NotGitRepository("Path {!r} not inside root {!r}".format(path, self.root))
return Repo(abspath)
-class Handler(object):
+class Handler:
"""Smart protocol command handler base class."""
def __init__(self, backend, proto, stateless_rpc=False):
"""Protocol handler for packs."""
def __init__(self, backend, proto, stateless_rpc=False):
- super(PackHandler, self).__init__(backend, proto, stateless_rpc)
+ super().__init__(backend, proto, stateless_rpc)
self._client_capabilities = None
# Flags needed for the no-done capability
self._done_received = False
"""Protocol handler for uploading a pack to the client."""
def __init__(self, backend, args, proto, stateless_rpc=False, advertise_refs=False):
- super(UploadPackHandler, self).__init__(
+ super().__init__(
backend, proto, stateless_rpc=stateless_rpc
)
self.repo = backend.open_repository(args[0])
def _want_satisfied(store, haves, want, earliest):
o = store[want]
pending = collections.deque([o])
- known = set([want])
+ known = {want}
while pending:
commit = pending.popleft()
if commit.id in haves:
return True
-class _ProtocolGraphWalker(object):
+class _ProtocolGraphWalker:
"""A graph walker that knows the git protocol.
As a graph walker, this class implements ack(), next(), and reset(). It
_GRAPH_WALKER_COMMANDS = (COMMAND_HAVE, COMMAND_DONE, None)
-class SingleAckGraphWalkerImpl(object):
+class SingleAckGraphWalkerImpl:
"""Graph walker implementation that speaks the single-ack protocol."""
def __init__(self, walker):
return True
-class MultiAckGraphWalkerImpl(object):
+class MultiAckGraphWalkerImpl:
"""Graph walker implementation that speaks the multi-ack protocol."""
def __init__(self, walker):
return True
-class MultiAckDetailedGraphWalkerImpl(object):
+class MultiAckDetailedGraphWalkerImpl:
"""Graph walker implementation speaking the multi-ack-detailed protocol."""
def __init__(self, walker):
"""Protocol handler for downloading a pack from the client."""
def __init__(self, backend, args, proto, stateless_rpc=False, advertise_refs=False):
- super(ReceivePackHandler, self).__init__(
+ super().__init__(
backend, proto, stateless_rpc=stateless_rpc
)
self.repo = backend.open_repository(args[0])
class UploadArchiveHandler(Handler):
def __init__(self, backend, args, proto, stateless_rpc=False):
- super(UploadArchiveHandler, self).__init__(backend, proto, stateless_rpc)
+ super().__init__(backend, proto, stateless_rpc)
self.repo = backend.open_repository(args[0])
def handle(self):
blob - 34f410223c0cf68899b239aca8870388b8e11546
blob + 2b0c3697e479f8115e023ae3e58bbf6014660bad
--- dulwich/stash.py
+++ dulwich/stash.py
"""Stash handling."""
-from __future__ import absolute_import
import os
DEFAULT_STASH_REF = b"refs/stash"
-class Stash(object):
+class Stash:
"""A Git stash.
Note that this doesn't currently update the working tree.
blob - d6a1b3b7524b47aaa8969fdef7f1bf8245370763
blob + 0adbc6049f4b6b1ce414eb14daa963dc75b30697
--- dulwich/tests/__init__.py
+++ dulwich/tests/__init__.py
class TestCase(_TestCase):
def setUp(self):
- super(TestCase, self).setUp()
+ super().setUp()
self.overrideEnv("HOME", "/nonexistent")
self.overrideEnv("GIT_CONFIG_NOSYSTEM", "1")
blob - 7837e1b9cfae640c3deef6d21a3c909c2bcf185d
blob + a88d7ebed8003ef93f22f33ffc1a848f440d7cc3
--- dulwich/tests/compat/server_utils.py
+++ dulwich/tests/compat/server_utils.py
from dulwich.tests.compat.utils import require_git_version
-class _StubRepo(object):
+class _StubRepo:
"""A stub repo that just contains a path to tear down."""
def __init__(self, name):
return shallows
-class ServerTests(object):
+class ServerTests:
"""Base tests for testing servers.
Does not inherit from TestCase so tests are not automatically run.
self._new_repo = self.import_repo("server_new.export")
def url(self, port):
- return "%s://localhost:%s/" % (self.protocol, port)
+ return "{}://localhost:{}/".format(self.protocol, port)
def branch_args(self, branches=None):
if branches is None:
branches = ["master", "branch"]
- return ["%s:%s" % (b, b) for b in branches]
+ return ["{}:{}".format(b, b) for b in branches]
def test_push_to_dulwich(self):
self.import_repos()
blob - e9d5d2d169b195a3546aa33e229474629a402f21
blob + 0c1d038f4acbc78b115600d8a518d6655d5ad441
--- dulwich/tests/compat/test_client.py
+++ dulwich/tests/compat/test_client.py
import ctypes
-class DulwichClientTestBase(object):
+class DulwichClientTestBase:
"""Tests for client/server compatibility."""
def setUp(self):
dest.refs.set_if_equals(r[0], None, r[1])
self.assertEqual(
dest.get_shallow(),
- set(
- [
- b"35e0b59e187dd72a0af294aedffc213eaa4d03ff",
- b"514dc6d3fbfe77361bcaef320c4d21b72bc10be9",
- ]
- ),
+ {
+ b"35e0b59e187dd72a0af294aedffc213eaa4d03ff",
+ b"514dc6d3fbfe77361bcaef320c4d21b72bc10be9",
+ },
)
def test_repeat(self):
try:
os.kill(pid, signal.SIGKILL)
os.unlink(self.pidfile)
- except (OSError, IOError):
+ except OSError:
pass
self.process.wait()
self.process.stdout.close()
self.skipTest('skip flaky test; see #1015')
-class TestSSHVendor(object):
+class TestSSHVendor:
@staticmethod
def run_command(
host,
self.server_name = "localhost"
def get_url(self):
- return "http://%s:%s/" % (self.server_name, self.server_port)
+ return "http://{}:{}/".format(self.server_name, self.server_port)
class DulwichHttpClientTest(CompatTestCase, DulwichClientTestBase):
blob - 6da2f0ff3cbbc0c909a697f0c8f974ffd9f01569
blob + 6f95ea3ac043dac8420bb5f86d2e384986665234
--- dulwich/tests/compat/test_pack.py
+++ dulwich/tests/compat/test_pack.py
def setUp(self):
require_git_version((1, 5, 0))
- super(TestPack, self).setUp()
+ super().setUp()
self._tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self._tempdir)
blob - 2f442af4b80ab292cf21cdd24a30f5821124ebfd
blob + c0b0f72e77584693abc65dd8b9bced34ffaff708
--- dulwich/tests/compat/test_patch.py
+++ dulwich/tests/compat/test_patch.py
class CompatPatchTestCase(CompatTestCase):
def setUp(self):
- super(CompatPatchTestCase, self).setUp()
+ super().setUp()
self.test_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.test_dir)
self.repo_path = os.path.join(self.test_dir, "repo")
blob - e0aed812aed92c29043ca7cb0b9fc0093d3b296e
blob + 62349905117a246f721de71b5905d1f6a7ea3a2b
--- dulwich/tests/compat/test_porcelain.py
+++ dulwich/tests/compat/test_porcelain.py
@skipIf(platform.python_implementation() == "PyPy" or sys.platform == "win32", "gpgme not easily available or supported on Windows and PyPy")
class TagCreateSignTestCase(PorcelainGpgTestCase, CompatTestCase):
def setUp(self):
- super(TagCreateSignTestCase, self).setUp()
+ super().setUp()
def test_sign(self):
# Test that dulwich signatures can be verified by CGit
run_git_or_fail(
[
- "--git-dir={}".format(self.repo.controldir()),
+ f"--git-dir={self.repo.controldir()}",
"tag",
"-v",
"tryme"
run_git_or_fail(
[
- "--git-dir={}".format(self.repo.controldir()),
+ f"--git-dir={self.repo.controldir()}",
"tag",
"-u",
PorcelainGpgTestCase.DEFAULT_KEY_ID,
blob - 8d2459d14b4fbb4cb69b8da4625ee33b1f6854e8
blob + a42e5677fa5f7b8bc75c083fc0caf6a70dd8c8ea
--- dulwich/tests/compat/test_repository.py
+++ dulwich/tests/compat/test_repository.py
"""Tests for git repository compatibility."""
def setUp(self):
- super(ObjectStoreTestCase, self).setUp()
+ super().setUp()
self._repo = self.import_repo("server_new.export")
def _run_git(self, args):
return temp_dir
def setUp(self):
- super(WorkingTreeTestCase, self).setUp()
+ super().setUp()
self._worktree_path = self.create_new_worktree(self._repo.path, "branch")
self._worktree_repo = Repo(self._worktree_path)
self.addCleanup(self._worktree_repo.close)
self._repo = self._worktree_repo
def test_refs(self):
- super(WorkingTreeTestCase, self).test_refs()
+ super().test_refs()
self.assertEqual(
self._mainworktree_repo.refs.allkeys(), self._repo.refs.allkeys()
)
min_git_version = (2, 5, 0)
def setUp(self):
- super(InitNewWorkingDirectoryTestCase, self).setUp()
+ super().setUp()
self._other_worktree = self._repo
worktree_repo_path = tempfile.mkdtemp()
self.addCleanup(rmtree_ro, worktree_repo_path)
blob - 5efa414cdaece60573672bc53a01dc9525ed4cb4
blob + 15b8f42e87f0b28ce0d265eebe670be2b36db02c
--- dulwich/tests/compat/test_server.py
+++ dulwich/tests/compat/test_server.py
min_git_version = (1, 7, 0, 2)
def setUp(self):
- super(GitServerSideBand64kTestCase, self).setUp()
+ super().setUp()
# side-band-64k is broken in the windows client.
# https://github.com/msysgit/git/issues/101
# Fix has landed for the 1.9.3 release.
blob - c8746b5c39bf645169a12a27bf227abbdfc3f5e0
blob + 70cd6617bcd265b21f3ded5024bafa8cf33caad9
--- dulwich/tests/compat/test_utils.py
+++ dulwich/tests/compat/test_utils.py
class GitVersionTests(TestCase):
def setUp(self):
- super(GitVersionTests, self).setUp()
+ super().setUp()
self._orig_run_git = utils.run_git
self._version_str = None # tests can override to set stub version
utils.run_git = run_git
def tearDown(self):
- super(GitVersionTests, self).tearDown()
+ super().tearDown()
utils.run_git = self._orig_run_git
def test_git_version_none(self):
blob - 45c2f0da3b3bd89f3383108c3059a50db6e4f5c9
blob + 258b489c6cfdc9ffc304d26ec9b6fb0a1fa89ea3
--- dulwich/tests/compat/test_web.py
+++ dulwich/tests/compat/test_web.py
def setUp(self):
self.o_uph_cap = patch_capabilities(UploadPackHandler, (b"no-done",))
self.o_rph_cap = patch_capabilities(ReceivePackHandler, (b"no-done",))
- super(SmartWebSideBand64kTestCase, self).setUp()
+ super().setUp()
def tearDown(self):
- super(SmartWebSideBand64kTestCase, self).tearDown()
+ super().tearDown()
UploadPackHandler.capabilities = self.o_uph_cap
ReceivePackHandler.capabilities = self.o_rph_cap
blob - 053a5ab2589c9ccacf060435c66f7c2f4606ef79
blob + 4f4e1374561513a61e65a52f4bb934bd248954f6
--- dulwich/tests/compat/utils.py
+++ dulwich/tests/compat/utils.py
found_version = git_version(git_path=git_path)
if found_version is None:
raise SkipTest(
- "Test requires git >= %s, but c git not found" % (required_version,)
+ "Test requires git >= {}, but c git not found".format(required_version)
)
if len(required_version) > _VERSION_LEN:
required_version = ".".join(map(str, required_version))
found_version = ".".join(map(str, found_version))
raise SkipTest(
- "Test requires git >= %s, found %s" % (required_version, found_version)
+ "Test requires git >= {}, found {}".format(required_version, found_version)
)
return True
except socket.timeout:
pass
- except socket.error as e:
+ except OSError as e:
if getattr(e, "errno", False) and e.errno != errno.ECONNREFUSED:
raise
elif e.args[0] != errno.ECONNREFUSED:
min_git_version: Tuple[int, ...] = (1, 5, 0)
def setUp(self):
- super(CompatTestCase, self).setUp()
+ super().setUp()
require_git_version(self.min_git_version)
def assertObjectStoreEqual(self, store1, store2):
blob - 842a986864fc87beab3e175fd8f26aa18d3d3afc
blob + 936759e01d08454de856aeeddad5ae0a1695756d
--- dulwich/tests/test_archive.py
+++ dulwich/tests/test_archive.py
b1 = Blob.from_string(b"somedata")
store.add_object(b1)
t1 = Tree()
- t1.add("ő".encode('utf-8'), 0o100644, b1.id)
+ t1.add("ő".encode(), 0o100644, b1.id)
store.add_object(t1)
stream = b"".join(tar_stream(store, t1, mtime=0))
tf = tarfile.TarFile(fileobj=BytesIO(stream))
blob - a9c6d7933edc60f8628c74115e4f47a0094e4d43
blob + fc5e022dcd692840f0dadaae729cbe5057aa1bfb
--- dulwich/tests/test_blackbox.py
+++ dulwich/tests/test_blackbox.py
"""Blackbox tests for dul-receive-pack."""
def setUp(self):
- super(GitReceivePackTests, self).setUp()
+ super().setUp()
self.path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.path)
self.repo = Repo.init(self.path)
"""Blackbox tests for dul-upload-pack."""
def setUp(self):
- super(GitUploadPackTests, self).setUp()
+ super().setUp()
self.path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.path)
self.repo = Repo.init(self.path)
blob - 579174f821a78353dbee711bc8a115902e517ba5
blob + 737ea11c93dbafa61f707f0f8844192c68567a36
--- dulwich/tests/test_client.py
+++ dulwich/tests/test_client.py
# TODO(durin42): add unit-level tests of GitClient
class GitClientTests(TestCase):
def setUp(self):
- super(GitClientTests, self).setUp()
+ super().setUp()
self.rout = BytesIO()
self.rin = BytesIO()
self.client = DummyClient(lambda x: True, self.rin.read, self.rout.write)
def test_caps(self):
agent_cap = ("agent=dulwich/%d.%d.%d" % dulwich.__version__).encode("ascii")
self.assertEqual(
- set(
- [
- b"multi_ack",
- b"side-band-64k",
- b"ofs-delta",
- b"thin-pack",
- b"multi_ack_detailed",
- b"shallow",
- agent_cap,
- ]
- ),
+ {
+ b"multi_ack",
+ b"side-band-64k",
+ b"ofs-delta",
+ b"thin-pack",
+ b"multi_ack_detailed",
+ b"shallow",
+ agent_cap,
+ },
set(self.client._fetch_capabilities),
)
self.assertEqual(
- set(
- [
- b"delete-refs",
- b"ofs-delta",
- b"report-status",
- b"side-band-64k",
- agent_cap,
- ]
- ),
+ {
+ b"delete-refs",
+ b"ofs-delta",
+ b"report-status",
+ b"side-band-64k",
+ agent_cap,
+ },
set(self.client._send_capabilities),
)
c, path = get_transport_and_path(remote_url)
-class TestSSHVendor(object):
+class TestSSHVendor:
def __init__(self):
self.host = None
self.command = ""
class SSHGitClientTests(TestCase):
def setUp(self):
- super(SSHGitClientTests, self).setUp()
+ super().setUp()
self.server = TestSSHVendor()
self.real_vendor = client.get_ssh_vendor
self.client = SSHGitClient("git.samba.org")
def tearDown(self):
- super(SSHGitClientTests, self).tearDown()
+ super().tearDown()
client.get_ssh_vendor = self.real_vendor
def test_get_url(self):
self.assertEqual("passwd", c._password)
basic_auth = c.pool_manager.headers["authorization"]
- auth_string = "%s:%s" % ("user", "passwd")
+ auth_string = "{}:{}".format("user", "passwd")
b64_credentials = base64.b64encode(auth_string.encode("latin1"))
expected_basic_auth = "Basic %s" % b64_credentials.decode("latin1")
self.assertEqual(basic_auth, expected_basic_auth)
self.assertEqual(original_password, c._password)
basic_auth = c.pool_manager.headers["authorization"]
- auth_string = "%s:%s" % (original_username, original_password)
+ auth_string = "{}:{}".format(original_username, original_password)
b64_credentials = base64.b64encode(auth_string.encode("latin1"))
expected_basic_auth = "Basic %s" % b64_credentials.decode("latin1")
self.assertEqual(basic_auth, expected_basic_auth)
break
else:
raise AssertionError(
- "Expected warning %r not in %r" % (expected_warning, warnings_list)
+ "Expected warning {!r} not in {!r}".format(expected_warning, warnings_list)
)
args = command.proc.args
break
else:
raise AssertionError(
- "Expected warning %r not in %r" % (expected_warning, warnings_list)
+ "Expected warning {!r} not in {!r}".format(expected_warning, warnings_list)
)
args = command.proc.args
blob - 20cecaad0e5aa7b79de59fb7e4ebbd7154ab1f1f
blob + 511554087060d18932e03ca20914e046ce340197
--- dulwich/tests/test_diff_tree.py
+++ dulwich/tests/test_diff_tree.py
class DiffTestCase(TestCase):
def setUp(self):
- super(DiffTestCase, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.empty_tree = self.commit_tree([])
class TreeChangesTest(DiffTestCase):
def setUp(self):
- super(TreeChangesTest, self).setUp()
+ super().setUp()
self.detector = RenameDetector(self.store)
def assertMergeFails(self, merge_entries, name, mode, sha):
block_cache = {}
self.assertEqual(50, _similarity_score(blob1, blob2, block_cache=block_cache))
- self.assertEqual(set([blob1.id, blob2.id]), set(block_cache))
+ self.assertEqual({blob1.id, blob2.id}, set(block_cache))
def fail_chunks():
self.fail("Unexpected call to as_raw_chunks()")
blob - 1dff9b1c5b87739a77529991bb3cedaf98dd0ed1
blob + e3a26018557297ddfada8e6d196fe5640d71319f
--- dulwich/tests/test_fastexport.py
+++ dulwich/tests/test_fastexport.py
"""Tests for the GitFastExporter tests."""
def setUp(self):
- super(GitFastExporterTests, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.stream = BytesIO()
try:
"""Tests for the GitImportProcessor tests."""
def setUp(self):
- super(GitImportProcessorTests, self).setUp()
+ super().setUp()
self.repo = MemoryRepo()
try:
from dulwich.fastexport import GitImportProcessor
blob - cc6d68a5788bad477de2ea137f64e5234083ddfb
blob + d1067d7ba53b4b2218acf34126ddf6bbe3869154
--- dulwich/tests/test_file.py
+++ dulwich/tests/test_file.py
class FancyRenameTests(TestCase):
def setUp(self):
- super(FancyRenameTests, self).setUp()
+ super().setUp()
self._tempdir = tempfile.mkdtemp()
self.foo = self.path("foo")
self.bar = self.path("bar")
def tearDown(self):
shutil.rmtree(self._tempdir)
- super(FancyRenameTests, self).tearDown()
+ super().tearDown()
def path(self, filename):
return os.path.join(self._tempdir, filename)
class GitFileTests(TestCase):
def setUp(self):
- super(GitFileTests, self).setUp()
+ super().setUp()
self._tempdir = tempfile.mkdtemp()
f = open(self.path("foo"), "wb")
f.write(b"foo contents")
def tearDown(self):
shutil.rmtree(self._tempdir)
- super(GitFileTests, self).tearDown()
+ super().tearDown()
def path(self, filename):
return os.path.join(self._tempdir, filename)
f.abort()
try:
f.close()
- except (IOError, OSError):
+ except OSError:
self.fail()
f = GitFile(foo, "wb")
f.close()
try:
f.abort()
- except (IOError, OSError):
+ except OSError:
self.fail()
def test_abort_close_removed(self):
blob - 92fff2905fa07588701cdf19a04c52b9c542398a
blob + e367e7290d8d2cc0d2345abf3adb63badd69c4c6
--- dulwich/tests/test_grafts.py
+++ dulwich/tests/test_grafts.py
)
-class GraftsInRepositoryBase(object):
+class GraftsInRepositoryBase:
def tearDown(self):
- super(GraftsInRepositoryBase, self).tearDown()
+ super().tearDown()
def get_repo_with_grafts(self, grafts):
r = self._repo
class GraftsInRepoTests(GraftsInRepositoryBase, TestCase):
def setUp(self):
- super(GraftsInRepoTests, self).setUp()
+ super().setUp()
self._repo_dir = os.path.join(tempfile.mkdtemp())
r = self._repo = Repo.init(self._repo_dir)
self.addCleanup(shutil.rmtree, self._repo_dir)
class GraftsInMemoryRepoTests(GraftsInRepositoryBase, TestCase):
def setUp(self):
- super(GraftsInMemoryRepoTests, self).setUp()
+ super().setUp()
r = self._repo = MemoryRepo()
self._shas = []
blob - 5d5d76ee5e6da21f4028c41118c8ad2b4ffb22da
blob + 3fb85ba737cea0c9dadd9fbe30fc2e9b052464f7
--- dulwich/tests/test_graph.py
+++ dulwich/tests/test_graph.py
-# -*- coding: utf-8 -*-
# test_index.py -- Tests for merge
# encoding: utf-8
# Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
"1": [],
"0": [],
}
- self.assertEqual(self.run_test(graph, ["4", "5"]), set(["1", "2"]))
+ self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"})
def test_no_common_ancestor(self):
# no common ancestor
"1": ["0"],
"0": [],
}
- self.assertEqual(self.run_test(graph, ["4", "3"]), set([]))
+ self.assertEqual(self.run_test(graph, ["4", "3"]), set())
def test_ancestor(self):
# ancestor
"B": ["A"],
"A": [],
}
- self.assertEqual(self.run_test(graph, ["D", "C"]), set(["C"]))
+ self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"})
def test_direct_parent(self):
# parent
"B": ["A"],
"A": [],
}
- self.assertEqual(self.run_test(graph, ["G", "D"]), set(["D"]))
+ self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"})
def test_another_crossover(self):
# Another cross over
"B": ["A"],
"A": [],
}
- self.assertEqual(self.run_test(graph, ["D", "F"]), set(["E", "C"]))
+ self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"})
def test_three_way_merge_lca(self):
# three way merge commit straight from git docs
}
# assumes a theoretical merge M exists that merges B and C first
# which actually means find the first LCA from either of B OR C with A
- self.assertEqual(self.run_test(graph, ["A", "B", "C"]), set(["1"]))
+ self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"})
def test_octopus(self):
# octopus algorithm test
res = _find_lcas(lookup_parents, cmt, [ca])
next_lcas.extend(res)
lcas = next_lcas[:]
- self.assertEqual(set(lcas), set(["2"]))
+ self.assertEqual(set(lcas), {"2"})
class CanFastForwardTests(TestCase):
blob - 2606aea97affdc0c3080d0cf82b96489da5f485b
blob + 29ff7febf4ee0312c21ad3e3b41c4127c8b225b9
--- dulwich/tests/test_greenthreads.py
+++ dulwich/tests/test_greenthreads.py
@skipIf(not gevent_support, skipmsg)
class TestGreenThreadsObjectStoreIterator(TestCase):
def setUp(self):
- super(TestGreenThreadsObjectStoreIterator, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.cmt_amount = 10
self.objs = init_store(self.store, self.cmt_amount)
@skipIf(not gevent_support, skipmsg)
class TestGreenThreadsMissingObjectFinder(TestCase):
def setUp(self):
- super(TestGreenThreadsMissingObjectFinder, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.cmt_amount = 10
self.objs = init_store(self.store, self.cmt_amount)
blob - 6e416f1d5e43f52d9eeb75674927246b12f99bbf
blob + f1a067b86b952c593193e1efe9a94329d2d81803
--- dulwich/tests/test_hooks.py
+++ dulwich/tests/test_hooks.py
class ShellHookTests(TestCase):
def setUp(self):
- super(ShellHookTests, self).setUp()
+ super().setUp()
if os.name != "posix":
self.skipTest("shell hook tests requires POSIX shell")
self.assertTrue(os.path.exists("/bin/sh"))
blob - b3a6ab135e31ae9c4413e62cb04f0b72db074786
blob + 3dbd0de9a5dfe5162ed4b00a1137bb208ee0eb66
--- dulwich/tests/test_ignore.py
+++ dulwich/tests/test_ignore.py
for (path, pattern) in POSITIVE_MATCH_TESTS:
self.assertTrue(
match_pattern(path, pattern),
- "path: %r, pattern: %r" % (path, pattern),
+ "path: {!r}, pattern: {!r}".format(path, pattern),
)
def test_no_matches(self):
for (path, pattern) in NEGATIVE_MATCH_TESTS:
self.assertFalse(
match_pattern(path, pattern),
- "path: %r, pattern: %r" % (path, pattern),
+ "path: {!r}, pattern: {!r}".format(path, pattern),
)
blob - 920f344f426e676cf6547400a3e9c8e07a612339
blob + 0031d58ff7a6e52c0987df395210d2312ad3ffbe
--- dulwich/tests/test_index.py
+++ dulwich/tests/test_index.py
-# -*- coding: utf-8 -*-
# test_index.py -- Tests for the git index
# encoding: utf-8
# Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@jelmer.uk>
class CommitTreeTests(TestCase):
def setUp(self):
- super(CommitTreeTests, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
def test_single_blob(self):
rootid = commit_tree(self.store, blobs)
self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
- self.assertEqual(set([rootid, blob.id]), set(self.store._data.keys()))
+ self.assertEqual({rootid, blob.id}, set(self.store._data.keys()))
def test_nested(self):
blob = Blob()
self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
- self.assertEqual(set([rootid, dirid, blob.id]), set(self.store._data.keys()))
+ self.assertEqual({rootid, dirid, blob.id}, set(self.store._data.keys()))
class CleanupModeTests(TestCase):
def assertModeEqual(self, expected, got):
- self.assertEqual(expected, got, "%o != %o" % (expected, got))
+ self.assertEqual(expected, got, "{:o} != {:o}".format(expected, got))
def test_file(self):
self.assertModeEqual(0o100644, cleanup_mode(0o100000))
file = Blob.from_string(b"foo")
tree = Tree()
- latin1_name = u"À".encode("latin1")
+ latin1_name = "À".encode("latin1")
latin1_path = os.path.join(repo_dir_bytes, latin1_name)
- utf8_name = u"À".encode("utf8")
+ utf8_name = "À".encode()
utf8_path = os.path.join(repo_dir_bytes, utf8_name)
tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
class TestTreeFSPathConversion(TestCase):
def test_tree_to_fs_path(self):
- tree_path = u"délwíçh/foo".encode("utf8")
+ tree_path = "délwíçh/foo".encode()
fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
self.assertEqual(
fs_path,
- os.fsencode(os.path.join(u"/prefix/path", u"délwíçh", u"foo")),
+ os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
)
def test_fs_to_tree_path_str(self):
- fs_path = os.path.join(os.path.join(u"délwíçh", u"foo"))
+ fs_path = os.path.join(os.path.join("délwíçh", "foo"))
tree_path = _fs_to_tree_path(fs_path)
- self.assertEqual(tree_path, u"délwíçh/foo".encode("utf-8"))
+ self.assertEqual(tree_path, "délwíçh/foo".encode())
def test_fs_to_tree_path_bytes(self):
- fs_path = os.path.join(os.fsencode(os.path.join(u"délwíçh", u"foo")))
+ fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
tree_path = _fs_to_tree_path(fs_path)
- self.assertEqual(tree_path, u"délwíçh/foo".encode("utf-8"))
+ self.assertEqual(tree_path, "délwíçh/foo".encode())
blob - 5ffb6da31c4dbbffcffe34cb459002c0fd91d8eb
blob + c7b3edf8e6b14d6065ba4d74aea3550efc5eb0f9
--- dulwich/tests/test_lfs.py
+++ dulwich/tests/test_lfs.py
class LFSTests(TestCase):
def setUp(self):
- super(LFSTests, self).setUp()
+ super().setUp()
self.test_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.test_dir)
self.lfs = LFSStore.create(self.test_dir)
blob - 1e1141588c691f7dd2e28af5b0a2847a7993b8bf
blob + b36236b46150cac7eb930005e2f762e400c1f550
--- dulwich/tests/test_line_ending.py
+++ dulwich/tests/test_line_ending.py
-# -*- coding: utf-8 -*-
# test_line_ending.py -- Tests for the line ending functions
# encoding: utf-8
# Copyright (C) 2018-2019 Boris Feld <boris.feld@comet.ml>
blob - 817f4d15a1a24e84f6b267875d821213814bbe29
blob + 15826f71c7ba0777d3d4f25b4a5d89626f9697dd
--- dulwich/tests/test_missing_obj_finder.py
+++ dulwich/tests/test_missing_obj_finder.py
class MissingObjectFinderTest(TestCase):
def setUp(self):
- super(MissingObjectFinderTest, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.commits = []
self.assertIn(
sha,
expected,
- "(%s,%s) erroneously reported as missing" % (sha, path)
+ "({},{}) erroneously reported as missing".format(sha, path)
)
expected.remove(sha)
self.assertEqual(
len(expected),
0,
- "some objects are not reported as missing: %s" % (expected,),
+ "some objects are not reported as missing: {}".format(expected),
)
class MOFLinearRepoTest(MissingObjectFinderTest):
def setUp(self):
- super(MOFLinearRepoTest, self).setUp()
+ super().setUp()
# present in 1, removed in 3
f1_1 = make_object(Blob, data=b"f1")
# present in all revisions, changed in 2 and 3
# 5
def setUp(self):
- super(MOFMergeForkRepoTest, self).setUp()
+ super().setUp()
f1_1 = make_object(Blob, data=b"f1")
f1_2 = make_object(Blob, data=b"f1-2")
f1_4 = make_object(Blob, data=b"f1-4")
class MOFTagsTest(MissingObjectFinderTest):
def setUp(self):
- super(MOFTagsTest, self).setUp()
+ super().setUp()
f1_1 = make_object(Blob, data=b"f1")
commit_spec = [[1]]
trees = {1: [(b"f1", f1_1)]}
blob - b8498f0cc8d8dd81b6b2672cbf917c64a6dcb0f2
blob + be067b5110c14e61c9bd63d26e469b5346930641
--- dulwich/tests/test_object_store.py
+++ dulwich/tests/test_object_store.py
testobject = make_object(Blob, data=b"yummy data")
-class ObjectStoreTests(object):
+class ObjectStoreTests:
def test_determine_wants_all(self):
self.assertEqual(
[b"1" * 40],
def test_add_object(self):
self.store.add_object(testobject)
- self.assertEqual(set([testobject.id]), set(self.store))
+ self.assertEqual({testobject.id}, set(self.store))
self.assertIn(testobject.id, self.store)
r = self.store[testobject.id]
self.assertEqual(r, testobject)
def test_add_objects(self):
data = [(testobject, "mypath")]
self.store.add_objects(data)
- self.assertEqual(set([testobject.id]), set(self.store))
+ self.assertEqual({testobject.id}, set(self.store))
self.assertIn(testobject.id, self.store)
r = self.store[testobject.id]
self.assertEqual(r, testobject)
class ObjectStoreGraphWalkerTests(TestCase):
def get_walker(self, heads, parent_map):
- new_parent_map = dict(
- [(k * 40, [(p * 40) for p in ps]) for (k, ps) in parent_map.items()]
- )
+ new_parent_map = {
+ k * 40: [(p * 40) for p in ps] for (k, ps) in parent_map.items()
+ }
return ObjectStoreGraphWalker(
[x * 40 for x in heads], new_parent_map.__getitem__
)
class CommitTreeChangesTests(TestCase):
def setUp(self):
- super(CommitTreeChangesTests, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.blob_a = make_object(Blob, data=b"a")
self.blob_b = make_object(Blob, data=b"b")
blob - 607e18483789c71050944c44ecd84c5bc9c9ec39
blob + daf344f8f344d1fe3012a6f1c0042f898cdd7674
--- dulwich/tests/test_objects.py
+++ dulwich/tests/test_objects.py
def test_iter(self):
t = Tree()
t[b"foo"] = (0o100644, a_sha)
- self.assertEqual(set([b"foo"]), set(t))
+ self.assertEqual({b"foo"}, set(t))
class TagSerializeTests(TestCase):
def test_check_tag_with_overflow_time(self):
"""Date with overflow should raise an ObjectFormatException when checked"""
- author = "Some Dude <some@dude.org> %s +0000" % (MAX_TIME + 1,)
+ author = "Some Dude <some@dude.org> {} +0000".format(MAX_TIME + 1)
tag = Tag.from_string(self.make_tag_text(tagger=(author.encode())))
with self.assertRaises(ObjectFormatException):
tag.check()
self.assertEqual(b"-0440", format_timezone(int(((-4 * 60) - 40) * 60)))
def test_format_timezone_double_negative(self):
- self.assertEqual(b"--700", format_timezone(int(((7 * 60)) * 60), True))
+ self.assertEqual(b"--700", format_timezone(int((7 * 60) * 60), True))
def test_parse_timezone_pdt_half(self):
self.assertEqual((((-4 * 60) - 40) * 60, False), parse_timezone(b"-0440"))
def test_parse_timezone_double_negative(self):
- self.assertEqual((int(((7 * 60)) * 60), False), parse_timezone(b"+700"))
- self.assertEqual((int(((7 * 60)) * 60), True), parse_timezone(b"--700"))
+ self.assertEqual((int((7 * 60) * 60), False), parse_timezone(b"+700"))
+ self.assertEqual((int((7 * 60) * 60), True), parse_timezone(b"--700"))
class ShaFileCopyTests(TestCase):
blob - d3b2af0baf469539d98a979f863242a8d155b4ce
blob + 15f55785c88833e2ebd8acf9446fb6590f41f147
--- dulwich/tests/test_pack.py
+++ dulwich/tests/test_pack.py
"""Base class for testing packs"""
def setUp(self):
- super(PackTests, self).setUp()
+ super().setUp()
self.tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tempdir)
def test_iter(self):
p = self.get_pack_index(pack1_sha)
- self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p))
+ self.assertEqual({tree_sha, commit_sha, a_sha}, set(p))
class TestPackDeltas(TestCase):
with self.get_pack_data(pack1_sha) as p:
entries = {(sha_to_hex(s), o, c) for s, o, c in p.iterentries()}
self.assertEqual(
- set(
- [
- (
- b"6f670c0fb53f9463760b7295fbb814e965fb20c8",
- 178,
- 1373561701,
- ),
- (
- b"b2a2766a2879c209ab1176e7e778b81ae422eeaa",
- 138,
- 912998690,
- ),
- (
- b"f18faa16531ac570a3fdc8c7ca16682548dafd12",
- 12,
- 3775879613,
- ),
- ]
- ),
+ {
+ (
+ b"6f670c0fb53f9463760b7295fbb814e965fb20c8",
+ 178,
+ 1373561701,
+ ),
+ (
+ b"b2a2766a2879c209ab1176e7e778b81ae422eeaa",
+ 138,
+ 912998690,
+ ),
+ (
+ b"f18faa16531ac570a3fdc8c7ca16682548dafd12",
+ 12,
+ 3775879613,
+ ),
+ },
entries,
)
def test_iter(self):
with self.get_pack(pack1_sha) as p:
- self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p))
+ self.assertEqual({tree_sha, commit_sha, a_sha}, set(p))
def test_iterobjects(self):
with self.get_pack(pack1_sha) as p:
- expected = set([p[s] for s in [commit_sha, tree_sha, a_sha]])
+ expected = {p[s] for s in [commit_sha, tree_sha, a_sha]}
self.assertEqual(expected, set(list(p.iterobjects())))
def test_pack_tuples(self):
with self.get_pack(pack1_sha) as p:
tuples = p.pack_tuples()
- expected = set([(p[s], None) for s in [commit_sha, tree_sha, a_sha]])
+ expected = {(p[s], None) for s in [commit_sha, tree_sha, a_sha]}
self.assertEqual(expected, set(list(tuples)))
self.assertEqual(expected, set(list(tuples)))
self.assertEqual(3, len(tuples))
# file should exist
self.assertTrue(os.path.exists(keepfile_name))
- with open(keepfile_name, "r") as f:
+ with open(keepfile_name) as f:
buf = f.read()
self.assertEqual("", buf)
class TestThinPack(PackTests):
def setUp(self):
- super(TestThinPack, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.blobs = {}
for blob in (b"foo", b"bar", b"foo1234", b"bar2468"):
pack_checksum = hex_to_sha("721980e866af9a5f93ad674144e1459b8ba3e7b7")
-class BaseTestPackIndexWriting(object):
+class BaseTestPackIndexWriting:
def assertSucceeds(self, func, *args, **kwargs):
try:
func(*args, **kwargs)
extra = b"nextobject"
def setUp(self):
- super(ReadZlibTests, self).setUp()
+ super().setUp()
self.read = BytesIO(self.comp + self.extra).read
self.unpacked = UnpackedObject(Tree.type_num, None, len(self.decomp), 0)
_compute_crc32 = True
def __init__(self, *args, **kwargs):
- super(TestPackIterator, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self._unpacked_offsets = set()
def _result(self, unpacked):
"Attempted to re-inflate offset %i" % offset
)
self._unpacked_offsets.add(offset)
- return super(TestPackIterator, self)._resolve_object(
+ return super()._resolve_object(
offset, pack_type_num, base_chunks
)
class DeltaChainIteratorTests(TestCase):
def setUp(self):
- super(DeltaChainIteratorTests, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
self.fetched = set()
blob - 8ea4015b547ee7a63119593464f097c8e69f68ae
blob + a4b65d6dc49a5a8876ffdfd0cea07636699eed24
--- dulwich/tests/test_porcelain.py
+++ dulwich/tests/test_porcelain.py
class PorcelainTestCase(TestCase):
def setUp(self):
- super(PorcelainTestCase, self).setUp()
+ super().setUp()
self.test_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.test_dir)
self.repo_path = os.path.join(self.test_dir, "repo")
NON_DEFAULT_KEY_ID = "6A93393F50C5E6ACD3D6FB45B936212EDB4E14C0"
def setUp(self):
- super(PorcelainGpgTestCase, self).setUp()
+ super().setUp()
self.gpg_dir = os.path.join(self.test_dir, "gpg")
os.mkdir(self.gpg_dir, mode=0o700)
# Ignore errors when deleting GNUPGHOME, because of race conditions
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
input=PorcelainGpgTestCase.DEFAULT_KEY,
- universal_newlines=True,
+ text=True,
)
def import_non_default_key(self):
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
input=PorcelainGpgTestCase.NON_DEFAULT_KEY,
- universal_newlines=True,
+ text=True,
)
cwd = os.getcwd()
try:
os.chdir(self.repo.path)
- self.assertEqual(set(["foo", "blah", "adir", ".git"]), set(os.listdir(".")))
+ self.assertEqual({"foo", "blah", "adir", ".git"}, set(os.listdir(".")))
self.assertEqual(
(["foo", os.path.join("adir", "afile")], set()),
porcelain.add(self.repo.path),
],
)
self.assertIn(b"bar", self.repo.open_index())
- self.assertEqual(set(["bar"]), set(added))
- self.assertEqual(set(["foo", os.path.join("subdir", "")]), ignored)
+ self.assertEqual({"bar"}, set(added))
+ self.assertEqual({"foo", os.path.join("subdir", "")}, ignored)
def test_add_file_absolute_path(self):
# Absolute paths are (not yet) supported
f.write('something new')
porcelain.reset_file(self.repo, file, target=sha)
- with open(full_path, 'r') as f:
+ with open(full_path) as f:
self.assertEqual('hello', f.read())
def test_reset_remove_file_to_commit(self):
os.remove(full_path)
porcelain.reset_file(self.repo, file, target=sha)
- with open(full_path, 'r') as f:
+ with open(full_path) as f:
self.assertEqual('hello', f.read())
def test_resetfile_with_dir(self):
author=b"John <john@example.com>",
)
porcelain.reset_file(self.repo, os.path.join('new_dir', 'foo'), target=sha)
- with open(full_path, 'r') as f:
+ with open(full_path) as f:
self.assertEqual('hello', f.read())
def test_add(self):
porcelain.submodule_add(self.repo, "../bar.git", "bar")
- with open('%s/.gitmodules' % self.repo.path, 'r') as f:
+ with open('%s/.gitmodules' % self.repo.path) as f:
self.assertEqual("""\
[submodule "bar"]
\turl = ../bar.git
class PullTests(PorcelainTestCase):
def setUp(self):
- super(PullTests, self).setUp()
+ super().setUp()
# create a file for initial commit
handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
os.close(handle)
os.path.join(self.repo.path, "link"),
)
self.assertEqual(
- set(["ignored", "notignored", ".gitignore", "link"]),
+ {"ignored", "notignored", ".gitignore", "link"},
set(
porcelain.get_untracked_paths(
self.repo.path, self.repo.path, self.repo.open_index()
),
)
self.assertEqual(
- set([".gitignore", "notignored", "link"]),
+ {".gitignore", "notignored", "link"},
set(porcelain.status(self.repo).untracked),
)
self.assertEqual(
- set([".gitignore", "notignored", "ignored", "link"]),
+ {".gitignore", "notignored", "ignored", "link"},
set(porcelain.status(self.repo, ignored=True).untracked),
)
f.write("blop\n")
self.assertEqual(
- set([".gitignore", "notignored", os.path.join("nested", "")]),
+ {".gitignore", "notignored", os.path.join("nested", "")},
set(
porcelain.get_untracked_paths(
self.repo.path, self.repo.path, self.repo.open_index()
),
)
self.assertEqual(
- set([".gitignore", "notignored"]),
+ {".gitignore", "notignored"},
set(
porcelain.get_untracked_paths(
self.repo.path,
),
)
self.assertEqual(
- set(["ignored", "with", "manager"]),
+ {"ignored", "with", "manager"},
set(
porcelain.get_untracked_paths(
subrepo.path, subrepo.path, subrepo.open_index()
),
)
self.assertEqual(
- set([os.path.join('nested', 'ignored'),
+ {os.path.join('nested', 'ignored'),
os.path.join('nested', 'with'),
- os.path.join('nested', 'manager')]),
+ os.path.join('nested', 'manager')},
set(
porcelain.get_untracked_paths(
self.repo.path,
f.write("foo")
self.assertEqual(
- set(
- [
- ".gitignore",
- "notignored",
- "ignored",
- os.path.join("subdir", ""),
- ]
- ),
+ {
+ ".gitignore",
+ "notignored",
+ "ignored",
+ os.path.join("subdir", ""),
+ },
set(
porcelain.get_untracked_paths(
self.repo.path,
)
)
self.assertEqual(
- set([".gitignore", "notignored"]),
+ {".gitignore", "notignored"},
set(
porcelain.get_untracked_paths(
self.repo.path,
class BranchListTests(PorcelainTestCase):
def test_standard(self):
- self.assertEqual(set([]), set(porcelain.branch_list(self.repo)))
+ self.assertEqual(set(), set(porcelain.branch_list(self.repo)))
def test_new_branch(self):
[c1] = build_commit_graph(self.repo.object_store, [[1]])
self.repo[b"HEAD"] = c1.id
porcelain.branch_create(self.repo, b"foo")
self.assertEqual(
- set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))
+ {b"master", b"foo"}, set(porcelain.branch_list(self.repo))
)
self.repo[b"HEAD"] = c1.id
porcelain.branch_create(self.repo, b"foo")
self.assertEqual(
- set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))
+ {b"master", b"foo"}, set(porcelain.branch_list(self.repo))
)
class PathToTreeTests(PorcelainTestCase):
def setUp(self):
- super(PathToTreeTests, self).setUp()
+ super().setUp()
self.fp = os.path.join(self.test_dir, "bar")
with open(self.fp, "w") as f:
f.write("something")
blob - 9985fa4a426a27077c863253417ca8a366df979b
blob + 8875154dd0eeae61f8f57bf5b0842a69ae1a404c
--- dulwich/tests/test_protocol.py
+++ dulwich/tests/test_protocol.py
from dulwich.tests import TestCase
-class BaseProtocolTests(object):
+class BaseProtocolTests:
def test_write_pkt_line_none(self):
self.proto.write_pkt_line(None)
self.assertEqual(self.rout.getvalue(), b"0000")
blob - c8bfcaf527b739e303b22c5d332ab56c96dc3171
blob + 3c230950144b42b17a59680c3dfe80e47569fc19
--- dulwich/tests/test_reflog.py
+++ dulwich/tests/test_reflog.py
# test_reflog.py -- tests for reflog.py
-# encoding: utf-8
# Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
blob - ffd4dbaed933234330451a2d516bf6ff76c37fa5
blob + 9dcb864d01fa60485f1f6a66112b22587946e5a0
--- dulwich/tests/test_refs.py
+++ dulwich/tests/test_refs.py
# test_refs.py -- tests for refs.py
-# encoding: utf-8
# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
}
-class RefsContainerTests(object):
+class RefsContainerTests:
def test_keys(self):
actual_keys = set(self._refs.keys())
self.assertEqual(set(self._refs.allkeys()), actual_keys)
def test_non_ascii(self):
try:
- encoded_ref = os.fsencode(u"refs/tags/schön")
+ encoded_ref = os.fsencode("refs/tags/schön")
except UnicodeEncodeError as exc:
raise SkipTest(
"filesystem encoding doesn't support special character"
blob - f5372ab12f5e82e084836e3b9d774f4afbf1874e
blob + bb1186550917231d4ed89c0da475d34a5e9dc5bd
--- dulwich/tests/test_repository.py
+++ dulwich/tests/test_repository.py
-# -*- coding: utf-8 -*-
# test_repository.py -- tests for repository.py
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
#
self.assertFilesystemHidden(os.path.join(repo_dir, ".git"))
def test_init_mkdir_unicode(self):
- repo_name = u"\xa7"
+ repo_name = "\xa7"
try:
os.fsencode(repo_name)
except UnicodeEncodeError:
``Repo.fetch_objects()``).
"""
- expected_shas = set([b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e"])
+ expected_shas = {b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e"}
# Source for objects.
r_base = self.open_repo("simple_merge.git")
if os.name != "posix":
self.skipTest("shell hook tests requires POSIX shell")
- pre_commit_contents = """#!%(executable)s
+ pre_commit_contents = """#!{executable}
import sys
-sys.path.extend(%(path)r)
+sys.path.extend({path!r})
from dulwich.repo import Repo
with open('foo', 'w') as f:
r = Repo('.')
r.stage(['foo'])
-""" % {
- 'executable': sys.executable,
- 'path': [os.path.join(os.path.dirname(__file__), '..', '..')] + sys.path}
+""".format(
+ executable=sys.executable,
+ path=[os.path.join(os.path.dirname(__file__), '..', '..')] + sys.path)
repo_dir = os.path.join(self.mkdtemp())
self.addCleanup(shutil.rmtree, repo_dir)
self.assertEqual([], r[commit_sha].parents)
tree = r[r[commit_sha].tree]
- self.assertEqual(set([b'blah', b'foo']), set(tree))
+ self.assertEqual({b'blah', b'foo'}, set(tree))
def test_shell_hook_post_commit(self):
if os.name != "posix":
break
else:
raise AssertionError(
- "Expected warning %r not in %r" % (expected_warning, warnings_list)
+ "Expected warning {!r} not in {!r}".format(expected_warning, warnings_list)
)
self.assertEqual([commit_sha], r[commit_sha2].parents)
return os.path.join(tempfile.mkdtemp(), "test")
def setUp(self):
- super(BuildRepoRootTests, self).setUp()
+ super().setUp()
self._repo_dir = self.get_repo_dir()
os.makedirs(self._repo_dir)
r = self._repo = Repo.init(self._repo_dir)
r = self._repo
repo_path_bytes = os.fsencode(r.path)
encodings = ("utf8", "latin1")
- names = [u"À".encode(encoding) for encoding in encodings]
+ names = ["À".encode(encoding) for encoding in encodings]
for name, encoding in zip(names, encodings):
full_path = os.path.join(repo_path_bytes, name)
with open(full_path, "wb") as f:
blob - e6d4076d0a08b29eceeea5e580a0442aaf1e426d
blob + c48890e38d6ddd98cb8c63122181a435a7554236
--- dulwich/tests/test_server.py
+++ dulwich/tests/test_server.py
SIX = b"6" * 40
-class TestProto(object):
+class TestProto:
def __init__(self):
self._output = []
self._received = {0: [], 1: [], 2: [], 3: []}
class HandlerTestCase(TestCase):
def setUp(self):
- super(HandlerTestCase, self).setUp()
+ super().setUp()
self._handler = TestGenericPackHandler()
def assertSucceeds(self, func, *args, **kwargs):
class UploadPackHandlerTestCase(TestCase):
def setUp(self):
- super(UploadPackHandlerTestCase, self).setUp()
+ super().setUp()
self._repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b"/": self._repo})
self._handler = UploadPackHandler(
class FindShallowTests(TestCase):
def setUp(self):
- super(FindShallowTests, self).setUp()
+ super().setUp()
self._store = MemoryObjectStore()
def make_commit(self, **attrs):
c1, c2, c3 = self.make_linear_commits(3)
self.assertEqual(
- (set([c3.id]), set([])), _find_shallow(self._store, [c3.id], 1)
+ ({c3.id}, set()), _find_shallow(self._store, [c3.id], 1)
)
self.assertEqual(
- (set([c2.id]), set([c3.id])),
+ ({c2.id}, {c3.id}),
_find_shallow(self._store, [c3.id], 2),
)
self.assertEqual(
- (set([c1.id]), set([c2.id, c3.id])),
+ ({c1.id}, {c2.id, c3.id}),
_find_shallow(self._store, [c3.id], 3),
)
self.assertEqual(
- (set([]), set([c1.id, c2.id, c3.id])),
+ (set(), {c1.id, c2.id, c3.id}),
_find_shallow(self._store, [c3.id], 4),
)
heads = [a[1].id, b[1].id, c[1].id]
self.assertEqual(
- (set([a[0].id, b[0].id, c[0].id]), set(heads)),
+ ({a[0].id, b[0].id, c[0].id}, set(heads)),
_find_shallow(self._store, heads, 2),
)
# 1 is shallow along the path from 4, but not along the path from 2.
self.assertEqual(
- (set([c1.id]), set([c1.id, c2.id, c3.id, c4.id])),
+ ({c1.id}, {c1.id, c2.id, c3.id, c4.id}),
_find_shallow(self._store, [c2.id, c4.id], 3),
)
c3 = self.make_commit(parents=[c1.id, c2.id])
self.assertEqual(
- (set([c1.id, c2.id]), set([c3.id])),
+ ({c1.id, c2.id}, {c3.id}),
_find_shallow(self._store, [c3.id], 2),
)
self._store.add_object(tag)
self.assertEqual(
- (set([c1.id]), set([c2.id])),
+ ({c1.id}, {c2.id}),
_find_shallow(self._store, [tag.id], 2),
)
class ReceivePackHandlerTestCase(TestCase):
def setUp(self):
- super(ReceivePackHandlerTestCase, self).setUp()
+ super().setUp()
self._repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b"/": self._repo})
self._handler = ReceivePackHandler(
class ProtocolGraphWalkerEmptyTestCase(TestCase):
def setUp(self):
- super(ProtocolGraphWalkerEmptyTestCase, self).setUp()
+ super().setUp()
self._repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b"/": self._repo})
self._walker = _ProtocolGraphWalker(
class ProtocolGraphWalkerTestCase(TestCase):
def setUp(self):
- super(ProtocolGraphWalkerTestCase, self).setUp()
+ super().setUp()
# Create the following commit tree:
# 3---5
# /
def test_handle_shallow_request_no_client_shallows(self):
self._handle_shallow_request([b"deepen 2\n"], [FOUR, FIVE])
- self.assertEqual(set([TWO, THREE]), self._walker.shallow)
+ self.assertEqual({TWO, THREE}, self._walker.shallow)
self.assertReceived(
[
b"shallow " + TWO,
b"deepen 2\n",
]
self._handle_shallow_request(lines, [FOUR, FIVE])
- self.assertEqual(set([TWO, THREE]), self._walker.shallow)
+ self.assertEqual({TWO, THREE}, self._walker.shallow)
self.assertReceived([])
def test_handle_shallow_request_unshallows(self):
b"deepen 3\n",
]
self._handle_shallow_request(lines, [FOUR, FIVE])
- self.assertEqual(set([ONE]), self._walker.shallow)
+ self.assertEqual({ONE}, self._walker.shallow)
self.assertReceived(
[
b"shallow " + ONE,
)
-class TestProtocolGraphWalker(object):
+class TestProtocolGraphWalker:
def __init__(self):
self.acks = []
self.lines = []
"""Base setup and asserts for AckGraphWalker tests."""
def setUp(self):
- super(AckGraphWalkerImplTestCase, self).setUp()
+ super().setUp()
self._walker = TestProtocolGraphWalker()
self._walker.lines = [
(b"have", TWO),
"""Tests for FileSystemBackend."""
def setUp(self):
- super(FileSystemBackendTests, self).setUp()
+ super().setUp()
self.path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.path)
self.repo = Repo.init(self.path)
"""Tests for serve_command."""
def setUp(self):
- super(ServeCommandTests, self).setUp()
+ super().setUp()
self.backend = DictBackend({})
def serve_command(self, handler_cls, args, inf, outf):
"""Tests for update_server_info."""
def setUp(self):
- super(UpdateServerInfoTests, self).setUp()
+ super().setUp()
self.path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.path)
self.repo = Repo.init(self.path)
blob - 847ef5e2e8699091a40fe2d4219d1ea931be0294
blob + be234c15da49064582c70a29ce2b775d995adecc
--- dulwich/tests/test_utils.py
+++ dulwich/tests/test_utils.py
class BuildCommitGraphTest(TestCase):
def setUp(self):
- super(BuildCommitGraphTest, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
def test_linear(self):
blob - d77ede05fee98fa79060791bd06722203ba7f5da
blob + 30ee3d02236637982d088d02411c308e2d26a996
--- dulwich/tests/test_walk.py
+++ dulwich/tests/test_walk.py
)
-class TestWalkEntry(object):
+class TestWalkEntry:
def __init__(self, commit, changes):
self.commit = commit
self.changes = changes
def __repr__(self):
- return "<TestWalkEntry commit=%s, changes=%r>" % (
+ return "<TestWalkEntry commit={}, changes={!r}>".format(
self.commit.id,
self.changes,
)
class WalkerTest(TestCase):
def setUp(self):
- super(WalkerTest, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
def make_commits(self, commit_spec, **kwargs):
class WalkEntryTest(TestCase):
def setUp(self):
- super(WalkEntryTest, self).setUp()
+ super().setUp()
self.store = MemoryObjectStore()
def make_commits(self, commit_spec, **kwargs):
blob - c710f29af6146e5a8b99cf86d5b1e45f15262f9e
blob + 1b02b1aade5feeb9bcf86c3a8a0595d5ac8223ad
--- dulwich/tests/test_web.py
+++ dulwich/tests/test_web.py
)
-class MinimalistWSGIInputStream(object):
+class MinimalistWSGIInputStream:
"""WSGI input stream with no 'seek()' and 'tell()' methods."""
def __init__(self, data):
_req_class: Type[HTTPGitRequest] = TestHTTPGitRequest
def setUp(self):
- super(WebTestCase, self).setUp()
+ super().setUp()
self._environ = {}
self._req = self._req_class(
self._environ, self._start_response, handlers=self._handlers()
self.assertTrue(f.closed)
def test_send_file_error(self):
- class TestFile(object):
+ class TestFile:
def __init__(self, exc_class):
self.closed = False
self._exc_class = exc_class
mat = re.search("^(..)(.{38})$", blob.id.decode("ascii"))
def as_legacy_object_error(self):
- raise IOError
+ raise OSError
self.addCleanup(setattr, Blob, "as_legacy_object", Blob.as_legacy_object)
Blob.as_legacy_object = as_legacy_object_error
self.assertContentTypeEquals("text/plain")
def test_get_info_packs(self):
- class TestPackData(object):
+ class TestPackData:
def __init__(self, sha):
self.filename = "pack-%s.pack" % sha
- class TestPack(object):
+ class TestPack:
def __init__(self, sha):
self.data = TestPackData(sha)
class SmartHandlersTestCase(WebTestCase):
- class _TestUploadPackHandler(object):
+ class _TestUploadPackHandler:
def __init__(
self,
backend,
self._environ["CONTENT_LENGTH"] = content_length
mat = re.search(".*", "/git-upload-pack")
- class Backend(object):
+ class Backend:
def open_repository(self, path):
return None
def test_get_info_refs_unknown(self):
self._environ["QUERY_STRING"] = "service=git-evil-handler"
- class Backend(object):
+ class Backend:
def open_repository(self, url):
return None
self._environ["wsgi.input"] = BytesIO(b"foo")
self._environ["QUERY_STRING"] = "service=git-upload-pack"
- class Backend(object):
+ class Backend:
def open_repository(self, url):
return None
message = "Something not found"
self.assertEqual(message.encode("ascii"), self._req.not_found(message))
self.assertEqual(HTTP_NOT_FOUND, self._status)
- self.assertEqual(set([("Content-Type", "text/plain")]), set(self._headers))
+ self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
def test_forbidden(self):
self._req.cache_forever() # cache headers should be discarded
message = "Something not found"
self.assertEqual(message.encode("ascii"), self._req.forbidden(message))
self.assertEqual(HTTP_FORBIDDEN, self._status)
- self.assertEqual(set([("Content-Type", "text/plain")]), set(self._headers))
+ self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
def test_respond_ok(self):
self._req.respond()
headers=[("X-Foo", "foo"), ("X-Bar", "bar")],
)
self.assertEqual(
- set(
- [
- ("X-Foo", "foo"),
- ("X-Bar", "bar"),
- ("Content-Type", "some/type"),
- ("Expires", "Fri, 01 Jan 1980 00:00:00 GMT"),
- ("Pragma", "no-cache"),
- ("Cache-Control", "no-cache, max-age=0, must-revalidate"),
- ]
- ),
+ {
+ ("X-Foo", "foo"),
+ ("X-Bar", "bar"),
+ ("Content-Type", "some/type"),
+ ("Expires", "Fri, 01 Jan 1980 00:00:00 GMT"),
+ ("Pragma", "no-cache"),
+ ("Cache-Control", "no-cache, max-age=0, must-revalidate"),
+ },
set(self._headers),
)
self.assertEqual(402, self._status)
class HTTPGitApplicationTestCase(TestCase):
def setUp(self):
- super(HTTPGitApplicationTestCase, self).setUp()
+ super().setUp()
self._app = HTTPGitApplication("backend")
self._environ = {
example_text = __doc__.encode("ascii")
def setUp(self):
- super(GunzipTestCase, self).setUp()
+ super().setUp()
self._app = GunzipFilter(self._app)
self._environ["HTTP_CONTENT_ENCODING"] = "gzip"
self._environ["REQUEST_METHOD"] = "POST"
blob - cc42a7c05e3f80418a8daf3d07fa99ed794caf50
blob + a80c6ec0c298a1a6551ed6777979566755518e12
--- dulwich/walk.py
+++ dulwich/walk.py
_MAX_EXTRA_COMMITS = 5
-class WalkEntry(object):
+class WalkEntry:
"""Object encapsulating a single result from a walk."""
def __init__(self, walker, commit):
return self._changes[path_prefix]
def __repr__(self):
- return "<WalkEntry commit=%s, changes=%r>" % (
+ return "<WalkEntry commit={}, changes={!r}>".format(
self.commit.id,
self.changes(),
)
-class _CommitTimeQueue(object):
+class _CommitTimeQueue:
"""Priority queue of WalkEntry objects by commit time."""
def __init__(self, walker: "Walker"):
__next__ = next
-class Walker(object):
+class Walker:
"""Object for performing a walk of commits in a store.
Walker objects are initialized with a store and other options and can then
blob - db160cc689a1982369c0ed0080872ae95eaaa7d3
blob + 5582dda5e71df8dcd4ef861e9531734c444d2adc
--- dulwich/web.py
+++ dulwich/web.py
if not data:
break
yield data
- except IOError:
+ except OSError:
yield req.error("Error reading file")
finally:
f.close()
return
try:
data = object_store[sha].as_legacy_object()
- except IOError:
+ except OSError:
yield req.error("Error reading object")
return
req.cache_forever()
req.nocache()
req.respond(HTTP_OK, "text/plain")
logger.info("Emulating dumb info/refs")
- for text in generate_info_refs(repo):
- yield text
+ yield from generate_info_refs(repo)
def get_info_packs(req, backend, mat):
yield chunk[:-2]
-class ChunkReader(object):
+class ChunkReader:
def __init__(self, f):
self._iter = _chunk_iter(f)
return ret
-class _LengthLimitedFile(object):
+class _LengthLimitedFile:
"""Wrapper class to limit the length of reads from a file-like object.
This is used to ensure EOF is read from the wsgi.input object once
handler.handle()
-class HTTPGitRequest(object):
+class HTTPGitRequest:
"""Class encapsulating the state of a single git HTTP request.
Attributes:
self._cache_headers = cache_forever_headers()
-class HTTPGitApplication(object):
+class HTTPGitApplication:
"""Class encapsulating the state of a git WSGI application.
Attributes:
return handler(req, self.backend, mat)
-class GunzipFilter(object):
+class GunzipFilter:
"""WSGI middleware that unzips gzip-encoded requests before
passing on to the underlying application.
"""
try:
environ["wsgi.input"].tell()
wsgi_input = environ["wsgi.input"]
- except (AttributeError, IOError, NotImplementedError):
+ except (AttributeError, OSError, NotImplementedError):
# The gzip implementation in the standard library of Python 2.x
# requires working '.seek()' and '.tell()' methods on the input
# stream. Read the data into a temporary file to work around
return self.app(environ, start_response)
-class LimitedInputFilter(object):
+class LimitedInputFilter:
"""WSGI middleware that limits the input length of a request to that
specified in Content-Length.
"""
blob - 6467a76c48795db98105c5e6d8a0e621c94f79df
blob + 7dd2381cad00493199e748fe8d648cc224bfd064
--- examples/clone.py
+++ examples/clone.py
if len(args) < 2:
- print("usage: %s host:path path" % (args[0], ))
+ print("usage: {} host:path path".format(args[0]))
sys.exit(1)
elif len(args) < 3:
blob - d0c5c4b96251e4b389b0cca338f54869281d09f0
blob + 14b158f2c202736fb013b801c0617f3afc935952
--- examples/latest_change.py
+++ examples/latest_change.py
from dulwich.repo import Repo
if len(sys.argv) < 2:
- print("usage: %s filename" % (sys.argv[0], ))
+ print("usage: {} filename".format(sys.argv[0]))
sys.exit(1)
r = Repo(".")
except StopIteration:
print("No file %s anywhere in history." % sys.argv[1])
else:
- print("%s was last changed by %s at %s (commit %s)" % (
+ print("{} was last changed by {} at {} (commit {})".format(
sys.argv[1], c.author, time.ctime(c.author_time), c.id))
blob - d99d789f798b7300b8ef9a0dbb5e9ba39258f85b
blob + e023b0736e45d0cf0b94ac01d31ef744cc8fc4f4
--- examples/rename-branch.py
+++ examples/rename-branch.py
client.send_pack(path, update_refs, generate_pack_data)
-print("Renamed %s to %s" % (args.old_ref, args.new_ref))
+print("Renamed {} to {}".format(args.old_ref, args.new_ref))
blob - b59a2c93ecb14d94b092ed3b5802e1f4198d7d99
blob + 46b0745c556c34cfbbdc3cacb2bed08c00e402d0
--- setup.py
+++ setup.py
#!/usr/bin/python3
-# encoding: utf-8
# Setup file for dulwich
# Copyright (C) 2008-2022 Jelmer Vernooij <jelmer@jelmer.uk>