commit - 2d0bcffed5b73122d233f7aa21e0d8357c4b24c9
commit + e1f37c3e0a98f3296aaa07176ded91a90b4069b4
blob - f6a3a4db2352736892658658c33f0c6a7ae268ae
blob + 9935286f65cded5cf95024cb9e7214ebcb910e4c
--- dulwich/cli.py
+++ dulwich/cli.py
print("Remote refs:")
for item in refs.items():
print("{} -> {}".format(*item))
+
+
+class cmd_for_each_ref(Command):
+ def run(self, args):
+ parser = argparse.ArgumentParser()
+ parser.add_argument("pattern", type=str, nargs="?")
+ args = parser.parse_args(args)
+ for sha, object_type, ref in porcelain.for_each_ref(".", args.pattern):
+ print(f"{sha.decode()} {object_type.decode()}\t{ref.decode()}")
class cmd_fsck(Command):
"dump-index": cmd_dump_index,
"fetch-pack": cmd_fetch_pack,
"fetch": cmd_fetch,
+ "for-each-ref": cmd_for_each_ref,
"fsck": cmd_fsck,
"help": cmd_help,
"init": cmd_init,
blob - aa6ae410a1e128cb3cbe99b9eb4a61a02c57c985
blob + 9eba72d82d7836aa268c93330a4b460c0c16a4f6
--- dulwich/porcelain.py
+++ dulwich/porcelain.py
* describe
* diff-tree
* fetch
+ * for-each-ref
* init
* ls-files
* ls-remote
"""
import datetime
+import fnmatch
import os
import posixpath
import stat
from contextlib import closing, contextmanager
from io import BytesIO, RawIOBase
from pathlib import Path
-from typing import Optional, Tuple, Union
+from typing import Dict, List, Optional, Tuple, Union
from .archive import tar_stream
from .client import get_transport_and_path
prune_tags=prune_tags,
)
return fetch_result
+
+
+def for_each_ref(
+ repo: Union[Repo, str] = ".",
+ pattern: Optional[Union[str, bytes]] = None,
+ **kwargs,
+) -> List[Tuple[bytes, bytes, bytes]]:
+ """Iterate over all refs that match the (optional) pattern.
+
+ Args:
+ repo: Path to the repository
+ pattern: Optional glob (7) patterns to filter the refs with
+ Returns:
+ List of bytes tuples with: (sha, object_type, ref_name)
+ """
+ if kwargs:
+ raise NotImplementedError(f"{''.join(kwargs.keys())}")
+
+ if isinstance(pattern, str):
+ pattern = os.fsencode(pattern)
+
+ with open_repo_closing(repo) as r:
+ refs = r.get_refs()
+
+ if pattern:
+ matching_refs: Dict[bytes, bytes] = {}
+ pattern_parts = pattern.split(b"/")
+ for ref, sha in refs.items():
+ matches = False
+
+ # git for-each-ref uses glob (7) style patterns, but fnmatch
+ # is greedy and also matches slashes, unlike glob.glob.
+ # We have to check parts of the pattern individually.
+ # See https://github.com/python/cpython/issues/72904
+ ref_parts = ref.split(b"/")
+ if len(ref_parts) > len(pattern_parts):
+ continue
+
+ for pat, ref_part in zip(pattern_parts, ref_parts):
+ matches = fnmatch.fnmatchcase(ref_part, pat)
+ if not matches:
+ break
+
+ if matches:
+ matching_refs[ref] = sha
+
+ refs = matching_refs
+
+ ret: List[Tuple[bytes, bytes, bytes]] = [
+ (sha, r.get_object(sha).type_name, ref)
+ for ref, sha in sorted(
+ refs.items(),
+ key=lambda ref_sha: ref_sha[0],
+ )
+ if ref != b"HEAD"
+ ]
+
+ return ret
def ls_remote(remote, config: Optional[Config] = None, **kwargs):
blob - a05ea85564e5f0fed8a0e7f2ef3919c2a7981af6
blob + a0a6c6b0b68b991a1453afa3f3487b27479eb82b
--- dulwich/tests/test_porcelain.py
+++ dulwich/tests/test_porcelain.py
with self._serving() as url:
porcelain.push(self.repo, url, "master")
+
+
+class ForEachTests(PorcelainTestCase):
+ def setUp(self):
+ super().setUp()
+ c1, c2, c3, c4 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2], [4]]
+ )
+ porcelain.tag_create(
+ self.repo.path,
+ b"v0.1",
+ objectish=c1.id,
+ annotated=True,
+ message=b"0.1",
+ )
+ porcelain.tag_create(
+ self.repo.path,
+ b"v1.0",
+ objectish=c2.id,
+ annotated=True,
+ message=b"1.0",
+ )
+ porcelain.tag_create(self.repo.path, b"simple-tag", objectish=c3.id)
+ porcelain.tag_create(
+ self.repo.path,
+ b"v1.1",
+ objectish=c4.id,
+ annotated=True,
+ message=b"1.1",
+ )
+ porcelain.branch_create(
+ self.repo.path, b"feat", objectish=c2.id.decode("ascii")
+ )
+ self.repo.refs[b"HEAD"] = c4.id
+
+ def test_for_each_ref(self):
+ refs = porcelain.for_each_ref(self.repo)
+
+ self.assertEqual(
+ [(object_type, tag) for _, object_type, tag in refs],
+ [
+ (b"commit", b"refs/heads/feat"),
+ (b"commit", b"refs/heads/master"),
+ (b"commit", b"refs/tags/simple-tag"),
+ (b"tag", b"refs/tags/v0.1"),
+ (b"tag", b"refs/tags/v1.0"),
+ (b"tag", b"refs/tags/v1.1"),
+ ],
+ )
+
+ def test_for_each_ref_pattern(self):
+ versions = porcelain.for_each_ref(self.repo, pattern="refs/tags/v*")
+ self.assertEqual(
+ [(object_type, tag) for _, object_type, tag in versions],
+ [
+ (b"tag", b"refs/tags/v0.1"),
+ (b"tag", b"refs/tags/v1.0"),
+ (b"tag", b"refs/tags/v1.1"),
+ ],
+ )
+
+ versions = porcelain.for_each_ref(self.repo, pattern="refs/tags/v1.?")
+ self.assertEqual(
+ [(object_type, tag) for _, object_type, tag in versions],
+ [
+ (b"tag", b"refs/tags/v1.0"),
+ (b"tag", b"refs/tags/v1.1"),
+ ],
+ )