commit - 80396a071cd0a74b3ef207fc454132ec60fdfce9
commit + ff64fa9e4418059400fe503f56ff6ccb8a38f498
blob - 4273eb21cd51bec40e1b41bdf655033190bbf969 (mode 644)
blob + /dev/null
--- .flake8
+++ /dev/null
-[flake8]
-extend-ignore = E203, E266, E501, W293, W291, W503
-max-line-length = 88
-max-complexity = 18
-select = B,C,E,F,W,T4,B9
blob - 3e446cbf91cdcde876da75c530189719a92b4fa3
blob + e03168877b55b9eac34f9a9e68326e49d12d341b
--- .github/workflows/pythontest.yml
+++ .github/workflows/pythontest.yml
if: "matrix.os != 'windows-latest' && matrix.python-version != 'pypy3'"
- name: Style checks
run: |
- pip install --upgrade flake8
- python -m flake8
+ pip install --upgrade ruff
+ python -m ruff check .
- name: Typing checks
run: |
pip install --upgrade mypy types-paramiko types-requests
blob - 493f6405273ca7755867e020dc403490e815a358
blob + 2fc0c2979b814029ff6f71f90065aa944cd71ccc
--- CONTRIBUTING.rst
+++ CONTRIBUTING.rst
Coding style
------------
-Where possible, please follow PEP8 with regard to coding style. Run flake8.
+Where possible, please follow PEP8 with regard to coding style. Run ruff.
Furthermore, triple-quotes should always be """, single quotes are ' unless
using " would result in less escaping within the string.
blob - f40324e037007282fdd2fe5735e7eb062f23b14b
blob + 286891c88c8366f79f4e4c09d3d40dc31b305d42
--- Makefile
+++ Makefile
PYTHON = python3
PYFLAKES = $(PYTHON) -m pyflakes
PEP8 = pep8
-FLAKE8 ?= $(PYTHON) -m flake8
+RUFF ?= $(PYTHON) -m ruff
SETUP = $(PYTHON) setup.py
TESTRUNNER ?= unittest
RUNTEST = PYTHONHASHSEED=random PYTHONPATH=$(shell pwd)$(if $(PYTHONPATH),:$(PYTHONPATH),) $(PYTHON) -m $(TESTRUNNER) $(TEST_OPTIONS)
$(PEP8) dulwich
style:
- $(FLAKE8)
+ $(RUFF) check .
before-push: check
git diff origin/master | $(PEP8) --diff
blob - d15a7595a2d56843bb2d4997f328773f7b3b941b
blob + c5147a64afd2d688ad4d444ca40b2e46c8469bd5
--- disperse.conf
+++ disperse.conf
news_file: "NEWS"
timeout_days: 5
tag_name: "dulwich-$VERSION"
-verify_command: "flake8 && make check"
+verify_command: "make check"
update_version {
path: "dulwich/__init__.py"
match: "^__version__ = \((.*)\)$"
blob - c44e5c471543269feea66f60a5c4f82d6bcc0add
blob + 45b7720e0f03114d8d40c30dfd2b2f905945d092
--- dulwich/bundle.py
+++ dulwich/bundle.py
class Bundle:
- version: Optional[int] = None
+ version: Optional[int]
- capabilities: Dict[str, str] = {}
- prerequisites: List[Tuple[bytes, str]] = []
- references: Dict[str, bytes] = {}
- pack_data: Union[PackData, Sequence[bytes]] = []
+ capabilities: Dict[str, str]
+ prerequisites: List[Tuple[bytes, str]]
+ references: Dict[str, bytes]
+ pack_data: Union[PackData, Sequence[bytes]]
def __repr__(self) -> str:
return (
blob - 208c13c4722d31ef9d785a2a3f84031cad768794
blob + f6a3a4db2352736892658658c33f0c6a7ae268ae
--- dulwich/cli.py
+++ dulwich/cli.py
import signal
import sys
from getopt import getopt
-from typing import Dict, Optional, Type
+from typing import ClassVar, Dict, Optional, Type
from dulwich import porcelain
class SuperCommand(Command):
- subcommands: Dict[str, Type[Command]] = {}
- default_command: Optional[Type[Command]] = None
+ subcommands: ClassVar[Dict[str, Type[Command]]] = {}
+ default_command: ClassVar[Optional[Type[Command]]] = None
def run(self, args):
if not args and not self.default_command:
class cmd_remote(SuperCommand):
- subcommands = {
+ subcommands: ClassVar[Dict[str, Type[Command]]] = {
"add": cmd_remote_add,
}
class cmd_submodule(SuperCommand):
- subcommands = {
+ subcommands: ClassVar[Dict[str, Type[Command]]] = {
"init": cmd_submodule_init,
}
class cmd_stash(SuperCommand):
- subcommands = {
+ subcommands: ClassVar[Dict[str, Type[Command]]] = {
"list": cmd_stash_list,
"pop": cmd_stash_pop,
"push": cmd_stash_push,
blob - 80f0787ac2fa96941a04fe4a5016200784df452c
blob + 0011d02259c9d6d075f7be453fb014257758f0a7
--- dulwich/client.py
+++ dulwich/client.py
IO,
TYPE_CHECKING,
Callable,
+ ClassVar,
Dict,
Iterable,
Iterator,
COMMON_CAPABILITIES = [CAPABILITY_OFS_DELTA, CAPABILITY_SIDE_BAND_64K]
-UPLOAD_CAPABILITIES = [
- CAPABILITY_THIN_PACK,
- CAPABILITY_MULTI_ACK,
- CAPABILITY_MULTI_ACK_DETAILED,
- CAPABILITY_SHALLOW,
-] + COMMON_CAPABILITIES
-RECEIVE_CAPABILITIES = [
- CAPABILITY_REPORT_STATUS,
- CAPABILITY_DELETE_REFS,
-] + COMMON_CAPABILITIES
+UPLOAD_CAPABILITIES = [CAPABILITY_THIN_PACK, CAPABILITY_MULTI_ACK, CAPABILITY_MULTI_ACK_DETAILED, CAPABILITY_SHALLOW, *COMMON_CAPABILITIES]
+RECEIVE_CAPABILITIES = [CAPABILITY_REPORT_STATUS, CAPABILITY_DELETE_REFS, *COMMON_CAPABILITIES]
class ReportStatusParser:
agent: User agent string
"""
- _FORWARDED_ATTRS = [
+ _FORWARDED_ATTRS: ClassVar[Set[str]] = {
"clear",
"copy",
"fromkeys",
"viewitems",
"viewkeys",
"viewvalues",
- ]
+ }
def __init__(
self, refs, symrefs, agent, new_shallow=None, new_unshallow=None
failed to update), or None if it was updated successfully
"""
- _FORWARDED_ATTRS = [
+ _FORWARDED_ATTRS: ClassVar[Set[str]] = {
"clear",
"copy",
"fromkeys",
"viewitems",
"viewkeys",
"viewvalues",
- ]
+ }
def __init__(self, refs, agent=None, ref_status=None) -> None:
self.refs = refs
path = path.decode(self._remote_path_encoding)
if self.git_command is None:
git_command = find_git_command()
- argv = git_command + [service.decode("ascii"), path]
+ argv = [*git_command, service.decode("ascii"), path]
p = subprocess.Popen(
argv,
bufsize=0,
if ssh_command:
import shlex
-
- args = shlex.split(ssh_command, posix=(sys.platform != "win32")) + ["-x"]
+ args = [*shlex.split(ssh_command, posix=sys.platform != "win32"), "-x"]
else:
args = ["ssh", "-x"]
args.append(host)
proc = subprocess.Popen(
- args + [command],
+ [*args, command],
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
):
if ssh_command:
import shlex
-
- args = shlex.split(ssh_command, posix=(sys.platform != "win32")) + ["-ssh"]
+ args = [*shlex.split(ssh_command, posix=sys.platform != "win32"), "-ssh"]
elif sys.platform == "win32":
args = ["plink.exe", "-ssh"]
else:
args.append(host)
proc = subprocess.Popen(
- args + [command],
+ [*args, command],
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
return "git/dulwich/%s" % ".".join([str(x) for x in dulwich.__version__])
-def default_urllib3_manager( # noqa: C901
+def default_urllib3_manager(
config,
pool_manager_cls=None,
proxy_manager_cls=None,
raise GitProtocolError(
"unexpected first line %r from smart server" % pkt
)
- return read_pkt_refs(proto.read_pkt_seq()) + (base_url,)
+ return (*read_pkt_refs(proto.read_pkt_seq()), base_url)
else:
return read_info_refs(resp), set(), base_url
finally:
blob - ce5f03f5c094a63703bd78ac07d78083752dccd2
blob + 57af8991566be1982fe87e2daf2457ae057ba10a
--- dulwich/config.py
+++ dulwich/config.py
super().__init__(values=values, encoding=encoding)
self.path: Optional[str] = None
- @classmethod # noqa: C901
- def from_file(cls, f: BinaryIO) -> "ConfigFile": # noqa: C901
+ @classmethod
+ def from_file(cls, f: BinaryIO) -> "ConfigFile":
"""Read configuration from a file-like object."""
ret = cls()
section: Optional[Section] = None
blob - a6f3b4739777292eba0a3967712e44a7fffe7039
blob + 9280bd026273ea3a650443380885f6c3b4513825
--- dulwich/contrib/swift.py
+++ dulwich/contrib/swift.py
auth_ret_json = json.loads(ret.read())
token = auth_ret_json["access"]["token"]["id"]
catalogs = auth_ret_json["access"]["serviceCatalog"]
- object_store = [
+ object_store = next(
o_store for o_store in catalogs if o_store["type"] == "object-store"
- ][0]
+ )
endpoints = object_store["endpoints"]
- endpoint = [endp for endp in endpoints if endp["region"] == self.region_name][0]
+ endpoint = next(endp for endp in endpoints if endp["region"] == self.region_name)
return endpoint[self.endpoint_type], token
def test_root_exists(self):
blob - e37cd6c97e2f37f9873449ea734d45efd2fe2ca9
blob + 1f8756f4cba991159a3e43a954eb29ac1263e271
--- dulwich/contrib/test_release_robot.py
+++ dulwich/contrib/test_release_robot.py
import tempfile
import time
import unittest
+from typing import ClassVar, Dict, List, Optional, Tuple
from dulwich.contrib import release_robot
# Git repo for dulwich project
test_repo = os.path.join(BASEDIR, "dulwich_test_repo.zip")
committer = b"Mark Mikofski <mark.mikofski@sunpowercorp.com>"
- test_tags = [b"v0.1a", b"v0.1"]
- tag_test_data = {
- test_tags[0]: [1484788003, b"3" * 40, None],
- test_tags[1]: [1484788314, b"1" * 40, (1484788401, b"2" * 40)],
+ test_tags: ClassVar[List[bytes]] = [b"v0.1a", b"v0.1"]
+ tag_test_data: ClassVar[Dict[bytes, Tuple[int, bytes, Optional[Tuple[int, bytes]]]]] = {
+ test_tags[0]: (1484788003, b"3" * 40, None),
+ test_tags[1]: (1484788314, b"1" * 40, (1484788401, b"2" * 40)),
}
@classmethod
blob - d38dbcc9d1861328232c06866e9958c185877f6b
blob + 0c4ace4c01624d026bd4e13ca7f4d964c2538d4c
--- dulwich/file.py
+++ dulwich/file.py
import os
import sys
import warnings
+from typing import ClassVar, Set
def ensure_dir_exists(dirname):
released. Typically this will happen in a finally block.
"""
- PROXY_PROPERTIES = {
+ PROXY_PROPERTIES: ClassVar[Set[str]] = {
"closed",
"encoding",
"errors",
"newlines",
"softspace",
}
- PROXY_METHODS = (
+ PROXY_METHODS: ClassVar[Set[str]] = {
"__iter__",
"flush",
"fileno",
"truncate",
"write",
"writelines",
- )
+ }
def __init__(self, filename, mode, bufsize, mask) -> None:
self._filename = filename
blob - cd543710d1165ab0a2fe6bab6235c452be7e23ba
blob + 16a4b49a9d1a051fc643cd84c4d8b012022226ea
--- dulwich/hooks.py
+++ dulwich/hooks.py
try:
ret = subprocess.call(
- [os.path.relpath(self.filepath, self.cwd)] + list(args), cwd=self.cwd
- )
+ [os.path.relpath(self.filepath, self.cwd), *list(args)],
+ cwd=self.cwd)
if ret != 0:
if self.post_exec_callback is not None:
self.post_exec_callback(0, *args)
blob - eb8f845560f16a77068e337c010920993e3c53f1
blob + d7c042179c00dd629c0dbf766719b6d9c628ab69
--- dulwich/pack.py
+++ dulwich/pack.py
try:
from dulwich._pack import ( # type: ignore # noqa: F811
- apply_delta, # type: ignore # noqa: F811
- bisect_find_sha, # type: ignore # noqa: F811
+ apply_delta, # type: ignore
+ bisect_find_sha, # type: ignore
)
except ImportError:
pass
blob - 336dbcceacf6208209201cef919325d4a860da32
blob + 2725795e7fae60c0f16406e40bbdfc23a5b5ee09
--- dulwich/protocol.py
+++ dulwich/protocol.py
CAPABILITY_NO_PROGRESS,
]
KNOWN_UPLOAD_CAPABILITIES = set(
- COMMON_CAPABILITIES
- + [
- CAPABILITY_THIN_PACK,
- CAPABILITY_MULTI_ACK,
- CAPABILITY_MULTI_ACK_DETAILED,
- CAPABILITY_INCLUDE_TAG,
- CAPABILITY_DEEPEN_SINCE,
- CAPABILITY_SYMREF,
- CAPABILITY_SHALLOW,
- CAPABILITY_DEEPEN_NOT,
- CAPABILITY_DEEPEN_RELATIVE,
- CAPABILITY_ALLOW_TIP_SHA1_IN_WANT,
- CAPABILITY_ALLOW_REACHABLE_SHA1_IN_WANT,
- ]
+ [*COMMON_CAPABILITIES, CAPABILITY_THIN_PACK, CAPABILITY_MULTI_ACK, CAPABILITY_MULTI_ACK_DETAILED, CAPABILITY_INCLUDE_TAG, CAPABILITY_DEEPEN_SINCE, CAPABILITY_SYMREF, CAPABILITY_SHALLOW, CAPABILITY_DEEPEN_NOT, CAPABILITY_DEEPEN_RELATIVE, CAPABILITY_ALLOW_TIP_SHA1_IN_WANT, CAPABILITY_ALLOW_REACHABLE_SHA1_IN_WANT]
)
KNOWN_RECEIVE_CAPABILITIES = set(
- COMMON_CAPABILITIES
- + [
- CAPABILITY_REPORT_STATUS,
- CAPABILITY_DELETE_REFS,
- CAPABILITY_QUIET,
- CAPABILITY_ATOMIC,
- ]
+ [*COMMON_CAPABILITIES, CAPABILITY_REPORT_STATUS, CAPABILITY_DELETE_REFS, CAPABILITY_QUIET, CAPABILITY_ATOMIC]
)
DEPTH_INFINITE = 0x7FFFFFFF
blob - b5c5e6ca10902a87bff57566b0e882290b39d546
blob + 745ba78d451386421aea6f116ecccf863a25a012
--- dulwich/repo.py
+++ dulwich/repo.py
else:
parents = []
- for sha in [commit] + parents:
+ for sha in [commit, *parents]:
check_hexsha(sha, "Invalid graftpoint")
grafts[commit] = parents
"""
# Simple validation
for commit, parents in updated_graftpoints.items():
- for sha in [commit] + parents:
+ for sha in [commit, *parents]:
check_hexsha(sha, "Invalid graftpoint")
self._graftpoints.update(updated_graftpoints)
with f:
return [line.strip() for line in f.readlines() if line.strip()]
- def do_commit( # noqa: C901
+ def do_commit(
self,
message: Optional[bytes] = None,
committer: Optional[bytes] = None,
else:
try:
old_head = self.refs[ref]
- c.parents = [old_head] + merge_heads
+ c.parents = [old_head, *merge_heads]
if sign:
c.sign(keyid)
self.object_store.add_object(c)
blob - 08968b8fb90f758ae4bf19bedab290e406df5e65
blob + e9e5ffb92a76fb7f34b19e595d7b8020781057b7
--- dulwich/tests/__init__.py
+++ dulwich/tests/__init__.py
# If Python itself provides an exception, use that
import unittest
+from typing import ClassVar, List
from unittest import SkipTest, expectedFailure, skipIf
-from unittest import TestCase as _TestCase # noqa: F401
+from unittest import TestCase as _TestCase
class TestCase(_TestCase):
"""Blackbox testing."""
# TODO(jelmer): Include more possible binary paths.
- bin_directories = [
+ bin_directories: ClassVar[List[str]] = [
os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "bin")),
"/usr/bin",
"/usr/local/bin",
# expect the user to set up file associations for .py files.
#
# Save us from all that headache and call python with the bin script.
- argv = [sys.executable, self.bin_path(name)] + args
+ argv = [sys.executable, self.bin_path(name), *args]
return subprocess.Popen(
argv,
stdout=subprocess.PIPE,
blob - 216875a2611bad403eb1d78ac39e18c33b8db788
blob + 59a9beeaaf9a6cc08d34484d74257d70d7526106
--- dulwich/tests/compat/server_utils.py
+++ dulwich/tests/compat/server_utils.py
port = self._start_server(self._old_repo)
run_git_or_fail(
- ["push", self.url(port)] + self.branch_args(),
+ ["push", self.url(port), *self.branch_args()],
cwd=self._new_repo.path,
)
self.assertReposEqual(self._old_repo, self._new_repo)
port = self._start_server(self._old_repo)
run_git_or_fail(
- ["push", self.url(port)] + self.branch_args(),
+ ["push", self.url(port), *self.branch_args()],
cwd=self._new_repo.path,
)
self.assertReposEqual(self._old_repo, self._new_repo)
port = self._start_server(self._new_repo)
run_git_or_fail(
- ["fetch", self.url(port)] + self.branch_args(),
+ ["fetch", self.url(port), *self.branch_args()],
cwd=self._old_repo.path,
)
# flush the pack cache so any new packs are picked up
port = self._start_server(self._new_repo)
run_git_or_fail(
- ["fetch", self.url(port)] + self.branch_args(),
+ ["fetch", self.url(port), *self.branch_args()],
cwd=self._old_repo.path,
)
# flush the pack cache so any new packs are picked up
# Fetching at the same depth is a no-op.
run_git_or_fail(
- ["fetch", "--depth=2", self.url(port)] + self.branch_args(),
+ ["fetch", "--depth=2", self.url(port), *self.branch_args()],
cwd=self._stub_repo.path,
)
expected_shallow = [
# Fetching at the same depth is a no-op.
run_git_or_fail(
- ["fetch", "--depth=2", self.url(port)] + self.branch_args(),
+ ["fetch", "--depth=2", self.url(port), *self.branch_args()],
cwd=self._stub_repo.path,
)
# The whole repo only has depth 4, so it should equal server_new.
run_git_or_fail(
- ["fetch", "--depth=4", self.url(port)] + self.branch_args(),
+ ["fetch", "--depth=4", self.url(port), *self.branch_args()],
cwd=self._stub_repo.path,
)
self.assertEqual([], _get_shallow(clone))
blob - 201d2163706eebbe5b04038bf47a7a16026ff2b3
blob + 353813f4c029b63576e11ef29d626feea4370bbd
--- dulwich/tests/compat/test_client.py
+++ dulwich/tests/compat/test_client.py
cmd = cmd.split("-", 1)
path = path.replace("'", "")
p = subprocess.Popen(
- cmd + [path],
+ [*cmd, path],
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
# Let's be quiet, the test suite is noisy enough already
pass
- def run_backend(self): # noqa: C901
+ def run_backend(self):
"""Call out to git http-backend."""
# Based on CGIHTTPServer.CGIHTTPRequestHandler.run_cgi:
# Copyright (c) 2001-2010 Python Software Foundation;
blob - 61731bb430e770a1fecbded55c4bc997d3da0c94
blob + c75b07f90f1117609f1545a3f81b03239c05817d
--- dulwich/tests/compat/test_pack.py
+++ dulwich/tests/compat/test_pack.py
new_blob.data = orig_blob.data + (b"x" * 2**20)
new_blob_2 = Blob()
new_blob_2.data = new_blob.data + b"y"
- all_to_pack = list(orig_pack.pack_tuples()) + [
- (new_blob, None),
- (new_blob_2, None),
- ]
+ all_to_pack = [*list(orig_pack.pack_tuples()), (new_blob, None), (new_blob_2, None)]
pack_path = os.path.join(self._tempdir, "pack_with_deltas")
write_pack(pack_path, all_to_pack, deltify=True)
output = run_git_or_fail(["verify-pack", "-v", pack_path])
new_blob.data = "big blob" + ("x" * 2**25)
new_blob_2 = Blob()
new_blob_2.data = new_blob.data + "y"
- all_to_pack = list(orig_pack.pack_tuples()) + [
- (new_blob, None),
- (new_blob_2, None),
- ]
+ all_to_pack = [*list(orig_pack.pack_tuples()), (new_blob, None), (new_blob_2, None)]
pack_path = os.path.join(self._tempdir, "pack_with_deltas")
write_pack(pack_path, all_to_pack, deltify=True)
output = run_git_or_fail(["verify-pack", "-v", pack_path])
blob - 0cc8137be5b7a202dc0ce99ebe1e1dffc9a8777f
blob + d7c4a7b61b3ca0b8194fd3cb64116be677a9c2aa
--- dulwich/tests/compat/utils.py
+++ dulwich/tests/compat/utils.py
env["LC_ALL"] = env["LANG"] = "C"
env["PATH"] = os.getenv("PATH")
- args = [git_path] + args
+ args = [git_path, *args]
popen_kwargs["stdin"] = subprocess.PIPE
if capture_stdout:
popen_kwargs["stdout"] = subprocess.PIPE
blob - bdcd62b39957efcb05719706ae555097d3f370f6
blob + c92e69fec98216b95cbb8965b278c9eab5cf8939
--- dulwich/tests/test_client.py
+++ dulwich/tests/test_client.py
binary = ["plink.exe", "-ssh"]
else:
binary = ["plink", "-ssh"]
- expected = binary + [
- "-pw",
- "12345",
- "-i",
- "/tmp/id_rsa",
- "host",
- "git-clone-url",
- ]
+ expected = [*binary, "-pw", "12345", "-i", "/tmp/id_rsa", "host", "git-clone-url"]
self.assertListEqual(expected, args[0])
def test_run_command_password(self):
binary = ["plink.exe", "-ssh"]
else:
binary = ["plink", "-ssh"]
- expected = binary + ["-pw", "12345", "host", "git-clone-url"]
+ expected = [*binary, "-pw", "12345", "host", "git-clone-url"]
vendor = PLinkSSHVendor()
binary = ["plink.exe", "-ssh"]
else:
binary = ["plink", "-ssh"]
- expected = binary + [
- "-P",
- "2200",
- "-i",
- "/tmp/id_rsa",
- "user@host",
- "git-clone-url",
- ]
+ expected = [*binary, "-P", "2200", "-i", "/tmp/id_rsa", "user@host", "git-clone-url"]
vendor = PLinkSSHVendor()
command = vendor.run_command(
blob - 394a0eb25956c0b7cfa5648906b7ddadba842df8
blob + 2c211c2b68449de40c1448785dedaa674d62fb0a
--- dulwich/tests/test_pack.py
+++ dulwich/tests/test_pack.py
f = BytesIO()
entries = build_pack(f, objects_spec)
# Delta resolution changed to DFS
- indices = [0] + list(range(100, 0, -1))
+ indices = [0, *list(range(100, 0, -1))]
self.assertEntriesMatch(indices, entries, self.make_pack_iter(f))
def test_ext_ref(self):
blob - adda941b55b816290239709b5465b0cab4526f15
blob + 5dbae9c579b0df52c91baac1572f87152584e5f9
--- dulwich/tests/test_porcelain.py
+++ dulwich/tests/test_porcelain.py
# Get the change in the target repo corresponding to the add
# this will be in the foo branch.
- change = list(
- tree_changes(
+ change = next(iter(tree_changes(
self.repo,
self.repo[b"HEAD"].tree,
self.repo[b"refs/heads/foo"].tree,
- )
- )[0]
+ )))
self.assertEqual(
os.path.basename(fullpath), change.new.path.decode("ascii")
)
blob - 68478c575214674154a778e092de853cb2e7b937
blob + ec3d0904b937590e547f14feecaa61ce2b827af5
--- dulwich/tests/test_refs.py
+++ dulwich/tests/test_refs.py
import sys
import tempfile
from io import BytesIO
+from typing import ClassVar, Dict
from dulwich import errors
from dulwich.tests import SkipTest, TestCase
class StripPeeledRefsTests(TestCase):
- all_refs = {
+ all_refs: ClassVar[Dict[bytes, bytes]] = {
b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
b"refs/tags/2.0.0^{}": b"0749936d0956c661ac8f8d3483774509c165f89e",
}
- non_peeled_refs = {
+ non_peeled_refs: ClassVar[Dict[bytes, bytes]] = {
b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
blob - ba4211bc950dbf00a8d425f9151a9b60b7552dd1
blob + cb55f4adce7f5cf3f8b5dc3e39dafd17475d5e96
--- dulwich/tests/test_repository.py
+++ dulwich/tests/test_repository.py
r.stage(['foo'])
""".format(
executable=sys.executable,
- path=[os.path.join(os.path.dirname(__file__), "..", "..")] + sys.path,
- )
+ path=[os.path.join(os.path.dirname(__file__), "..", ".."), *sys.path])
repo_dir = os.path.join(self.mkdtemp())
self.addCleanup(shutil.rmtree, repo_dir)
blob - 939c5cd4186d0202b419563faf69ec289ca48a0e
blob + f285b10e53535c4a2665c7991c043e96307edbd3
--- dulwich/tests/test_server.py
+++ dulwich/tests/test_server.py
self.assertRaises(IndexError, self._handler.proto.get_received_line, 2)
def test_no_progress(self):
- caps = list(self._handler.required_capabilities()) + [b"no-progress"]
+ caps = [*list(self._handler.required_capabilities()), b"no-progress"]
self._handler.set_client_capabilities(caps)
self._handler.progress(b"first message")
self._handler.progress(b"second message")
self._repo.refs._peeled_refs = peeled
self._repo.refs.add_packed_refs(refs)
- caps = list(self._handler.required_capabilities()) + [b"include-tag"]
+ caps = [*list(self._handler.required_capabilities()), b"include-tag"]
self._handler.set_client_capabilities(caps)
self.assertEqual(
{b"1234" * 10: ONE, b"5678" * 10: TWO},
# TODO: test commit time cutoff
def _handle_shallow_request(self, lines, heads):
- self._walker.proto.set_output(lines + [None])
+ self._walker.proto.set_output([*lines, None])
self._walker._handle_shallow_request(heads)
def assertReceived(self, expected):
def serve_command(self, handler_cls, args, inf, outf):
return serve_command(
handler_cls,
- [b"test"] + args,
+ [b"test", *args],
backend=self.backend,
inf=inf,
outf=outf,
blob - 4c2c24ece78b41caa3271548a9743bf7e617ea59
blob + 6acf80b9a12bef3db12e32ce9a7b96e0fc3966a0
--- dulwich/tests/test_walk.py
+++ dulwich/tests/test_walk.py
# Get the WalkEntry for the commit.
walker = Walker(self.store, c1.id)
- walker_entry = list(walker)[0]
+ walker_entry = next(iter(walker))
changes = walker_entry.changes()
# Compare the changes with the expected values.
# Get the WalkEntry for the commit.
walker = Walker(self.store, c1.id)
- walker_entry = list(walker)[0]
+ walker_entry = next(iter(walker))
changes = walker_entry.changes(path_prefix=b"x")
# Compare the changes with the expected values.
blob - 493c6499ddacbf0246e40b4247b29f4e9bef6672
blob + 19da34ee7280113e6d3bc17841160119b2cb3ffe
--- dulwich/web.py
+++ dulwich/web.py
import sys
import time
from io import BytesIO
-from typing import List, Optional, Tuple
+from typing import Callable, ClassVar, Dict, Iterator, List, Optional, Tuple
from urllib.parse import parse_qs
from wsgiref.simple_server import (
ServerHandler,
from .repo import BaseRepo, NotGitRepository, Repo
from .server import (
DEFAULT_HANDLERS,
+ Backend,
DictBackend,
generate_info_refs,
generate_objects_info_packs,
backend: the Backend object backing this application
"""
- services = {
+ services: ClassVar[Dict[Tuple[str, re.Pattern], Callable[[HTTPGitRequest, Backend, re.Match], Iterator[bytes]]]] = {
("GET", re.compile("/HEAD$")): get_text_file,
("GET", re.compile("/info/refs$")): get_info_refs,
("GET", re.compile("/objects/info/alternates$")): get_text_file,
blob - 49219fa399df5bf87a932a6a12421f0cda0e44d4
blob + 8bd3c7a729cdaa2d58a8e6f43fdc0443d3bbfff0
--- pyproject.toml
+++ pyproject.toml
"F",
"I",
"UP",
+ "RUF",
]
ignore = [
"ANN001",