commit 971c04d1b79f92a74c42df10efba38fddf2ad006 from: Stefan Sperling date: Wed Sep 11 11:16:55 2024 UTC make the Git loader load objects in topological order The Git loader will now send objects to storage in topological Git DAG order. There is no support for topological order in the dumb loader yet because the current implementation assumes that a single pack file will be fetched from the server. The tests are passing, except for these 4: TestGitLoader::test_metrics TestGitLoader::test_metrics_filtered TestGitLoader2::test_load_incremental TestGitLoader2::test_load_incremental_from[partial-parent-and-empty-previous] It seems these tests depend on assumptions that are no longer valid once objects get loaded in topological order? The implementation does not yet handle edge cases such as tags pointing at non-commit objects. commit - 0dd744df1d1192d4eeb3fbe38c96cce76bda7376 commit + 971c04d1b79f92a74c42df10efba38fddf2ad006 blob - 4880ce47f67e486e84c40b56791d6693a02c6b1f blob + 2bf0fd14a9839c0e3232ce7352ee471dffdfed17 --- swh/loader/git/base.py +++ swh/loader/git/base.py @@ -100,6 +100,12 @@ class BaseGitLoader(BaseLoader): self.next_log_after = time.monotonic() + LOGGING_INTERVAL def store_data(self) -> None: + # XXX This is a temporary hack to keep the dumb loader working. + if hasattr(self, "store_data_topological") and hasattr(self, "dumb"): + if not self.dumb: + self.store_data_topological() + return + assert self.origin if self.save_data_path: self.save_data() blob - ec17c8b4aa4844707f053b13e05a7619261dd1fa blob + 558384425de018b5bd4cb9814df33623677ae268 --- swh/loader/git/loader.py +++ swh/loader/git/loader.py @@ -3,19 +3,21 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict +import collections from dataclasses import dataclass import datetime import json import logging import os import pickle -from tempfile import SpooledTemporaryFile +import stat +from tempfile import NamedTemporaryFile, SpooledTemporaryFile import time from typing import ( Any, Callable, Dict, + Generator, Iterable, Iterator, List, @@ -28,7 +30,7 @@ from typing import ( import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.objects import Blob, Commit, ShaFile, Tag, Tree -from dulwich.pack import PackData, PackInflater +from dulwich.pack import Pack, PackData, PackInflater, load_pack_index import urllib3.util from swh.core.statsd import Statsd @@ -48,6 +50,7 @@ from swh.model.model import ( RawExtrinsicMetadata, Release, Revision, + SkippedContent, Snapshot, SnapshotBranch, SnapshotTargetType, @@ -239,6 +242,7 @@ class GitLoader(BaseGitLoader): if not verify_certs: self.urllib3_extra_kwargs["cert_reqs"] = "CERT_NONE" self.requests_extra_kwargs["verify"] = False + self.objects_seen: Set[bytes] = set() def fetch_pack_from_origin( self, @@ -500,7 +504,197 @@ class GitLoader(BaseGitLoader): # No more data to fetch return False + + def get_missing_parents(self, commit, pack): + missing_parents = [] + parent_hashes = [] + for p in commit.parents: + phash = hashutil.bytehex_to_hash(p) + if phash not in self.objects_seen: + parent_hashes.append(phash) + for p in self.storage.revision_missing(parent_hashes): + try: + obj = pack[hashutil.hash_to_bytehex(p)] + if not isinstance(obj, Commit): + raise ValueError( + f"Parent {hashutil.hash_to_bytehex(p)} of " + f"{commit} is not a commit object" + ) + logger.debug( + f"Parent {hashutil.hash_to_bytehex(p)} of " + f"{commit} found in pack file" + ) + missing_parents.append(p) + except KeyError as e: + raise NotFound( + f"Parent {hashutil.hash_to_bytehex(p)} of " + f"{commit} was found in neither the " + "fetched pack file nor the archive" + ) from e + return missing_parents + + def get_missing_children(self, tree): + subtree_hashes = [] + content_hashes = [] + missing_subtrees = [] + missing_contents = [] + for (name, mode, sha) in tree.iteritems(): + if mode & stat.S_IFDIR: + subtree_hashes.append(hashutil.bytehex_to_hash(sha)) + logging.debug(f"subtree: {name} {sha}") + else: + content_hashes.append(hashutil.bytehex_to_hash(sha)) + logging.debug(f"content: {name} {sha}") + for t in self.storage.directory_missing(subtree_hashes): + if t not in self.objects_seen: + missing_subtrees.append(t) + self.objects_seen.add(t) + for c in self.storage.content_missing_per_sha1_git(content_hashes): + if c not in self.objects_seen: + missing_contents.append(c) + self.objects_seen.add(c) + return (missing_subtrees, missing_contents) + + def walk_commit_tree(self, commit, pack): + dirs_to_load = [] + contents_to_load = {} + + if hashutil.bytehex_to_hash(commit.tree) in self.objects_seen: + return + + missing_dir_hashes = [ + d + for d in self.storage.directory_missing( + [hashutil.bytehex_to_hash(commit.tree)] + ) + ] + # Here we do a tree walk up to the bottom-most sub-trees in this tree + # which have already been loaded. Could this be optimized somehow? + # Thanks to our filtering of "seen" objects, when multiple commits being + # loaded contain the same tree we will only be loading new unique trees. + while missing_dir_hashes: + missing_dir_hash = missing_dir_hashes.pop() + logger.debug(f"missing dir: {hashutil.hash_to_bytehex(missing_dir_hash)!r}") + dirs_to_load.append(missing_dir_hash) + try: + tree = pack[hashutil.hash_to_bytehex(missing_dir_hash)] + if not isinstance(tree, Tree): + raise ValueError(f"Tree {tree} is not a Tree object") + except KeyError as e: + raise NotFound( + f"Tree {hashutil.hash_to_bytehex(missing_dir_hash)!r} " + "was found in neither the fetched pack file nor the archive" + ) from e + logger.debug(f"Tree {tree!r} found in pack file") + (missing_subtrees, missing_contents) = self.get_missing_children(tree) + if missing_subtrees: + logger.debug("missing subtrees:") + for d in missing_subtrees: + logger.debug(f"{hashutil.hash_to_bytehex(d)!r}") + missing_dir_hashes.extend(missing_subtrees) + if missing_contents: + logger.debug("missing contents:") + for c in missing_contents: + logger.debug(f"{hashutil.hash_to_bytehex(c)!r}") + contents_to_load[missing_dir_hash] = missing_contents or [] + # Load the list of missing trees in reverse order, i.e. bottom-up + dirs_to_load.reverse() + for d in dirs_to_load: + # Load the contents required by this subtree + yield from contents_to_load[d] + # Load the subtree itself + yield d + + def walk_commit(self, commit, pack) -> Generator: + # Here we do a history walk to identify all commits between the new + # tip commit and the bottom-most commits which have already been loaded. + # Could this be optimized somehow? + # Thanks to our filtering of "seen" objects, when multiple references point + # at commits in the pack file we will only be loading new unique commits. + parents_to_load = [] + missing_parent_hashes = self.get_missing_parents(commit, pack) + while missing_parent_hashes: + parent_hash = missing_parent_hashes.pop() + parents_to_load.append(parent_hash) + parent = pack[hashutil.hash_to_bytehex(parent_hash)] + missing_parent_hashes.extend(self.get_missing_parents(parent, pack)) + + # Load the parent commit queue in reverse order, tail first. + # This way we load missing bottom-most commits first. + parents_to_load.reverse() + for p in parents_to_load: + parent = pack[hashutil.hash_to_bytehex(p)] + yield from self.walk_commit_tree(parent, pack) + self.objects_seen.add(hashutil.bytehex_to_hash(parent.tree)) + yield parent.sha().digest() + self.objects_seen.add(parent.sha().digest()) + + # Now load the tip commit's tree. + yield from self.walk_commit_tree(commit, pack) + self.objects_seen.add(hashutil.bytehex_to_hash(commit.tree)) + + # All dependencies of this commit have been loaded. Load the commit. + yield commit.sha().digest() + self.objects_seen.add(commit.sha().digest()) + + def walk_tag(self, tag, pack) -> Generator: + # TODO: look up tag in archive if not present in pack + obj = pack[tag.object[1]] + while obj.type_name == b"tag": + obj = pack[obj.object[1]] + if obj.type_name == b"commit": + yield from self.walk_commit(obj, pack) + else: + raise ValueError( + f"tag {obj} points to object {obj.object} of unexpected type {obj.type_name}" + ) + yield tag.sha().digest() + + def walk_ref(self, ref_name, ref_object, pack) -> Generator: + logger.debug(f"Walking ref {ref_name}: {ref_object}") + obj = pack[ref_object] + logger.debug(f"Opened {obj}") + if obj.type_name == b"commit": + yield from self.walk_commit(obj, pack) + elif obj.type_name == b"tag": + yield from self.walk_tag(obj, pack) + else: + raise ValueError(f"object {obj} has an unexpected type") + + def process_data(self) -> bool: + assert self.origin is not None + + # XXX The dumb loader does not support loading objects in topological order yet. + if self.dumb: + return False + + percent_done = 0 + + def pack_index_progress(i, num_objects): + nonlocal percent_done + p = int(i * 100 / num_objects) + if p != percent_done: + logger.debug(f"Indexing pack file: {p}%") + percent_done = p + + if self.pack_size < 12: # No pack file header present + return False + + packdata = PackData.from_file(self.pack_buffer, self.pack_size) + indexfile = NamedTemporaryFile(delete=False) + packdata.create_index( + indexfile.name, + progress=pack_index_progress, + version=2, + resolve_ext_ref=self._resolve_ext_ref, + ) + logger.debug("Indexing pack file: 100%") + packindex = load_pack_index(indexfile.name) + self.pack = Pack.from_objects(packdata, packindex) + self.pack.resolve_ext_ref = self._resolve_ext_ref + return False + def save_data(self) -> None: """Store a pack for archival""" assert isinstance(self.visit_date, datetime.datetime) @@ -523,7 +717,110 @@ class GitLoader(BaseGitLoader): with open(os.path.join(pack_dir, refs_name), "xb") as f: pickle.dump(self.remote_refs, f) + + def store_data_topological(self) -> None: + assert self.origin + if self.save_data_path: + self.save_data() + + self.counts: Dict[str, int] = collections.defaultdict(int) + storage_summary: Dict[str, int] = collections.Counter() + + base_repo = self.repo_representation( + storage=self.storage, + base_snapshots=self.base_snapshots, + incremental=self.incremental, + statsd=self.statsd, + ) + + for ref_name, ref_object in self.remote_refs.items(): + if utils.ignore_branch_name(ref_name): + continue + if ref_object not in self.pack: + if ref_object in base_repo.local_heads: + continue + ref_object_hash = hashutil.bytehex_to_hash(ref_object) + if ref_object_hash not in self.storage.revision_missing( + [ref_object_hash] + ): + continue + raise NotFound( + f"{ref_name!r}: {ref_object!r} was found in neither the " + "fetched pack file nor in local heads nor in the archive" + ) + for o in self.walk_ref(ref_name, ref_object, self.pack): + obj = self.pack[hashutil.hash_to_bytehex(o)] + logger.debug(f"Loading object {obj.id}") + if obj.type_name == b"blob": + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.CONTENT + content = converters.dulwich_blob_to_content( + obj, max_content_size=self.max_content_size + ) + if isinstance(content, Content): + self.counts["content"] += 1 + storage_summary.update(self.storage.content_add([content])) + elif isinstance(content, SkippedContent): + self.counts["skipped_content"] += 1 + storage_summary.update( + self.storage.skipped_content_add([content]) + ) + else: + raise TypeError(f"Unexpected content type: {content}") + elif obj.type_name == b"tree": + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.DIRECTORY + self.counts["directory"] += 1 + directory = converters.dulwich_tree_to_directory(obj) + storage_summary.update(self.storage.directory_add([directory])) + elif obj.type_name == b"commit": + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.REVISION + self.counts["revision"] += 1 + revision = converters.dulwich_commit_to_revision(obj) + storage_summary.update(self.storage.revision_add([revision])) + elif obj.type_name == b"tag": + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.RELEASE + self.counts["release"] += 1 + release = converters.dulwich_tag_to_release(obj) + storage_summary.update(self.storage.release_add([release])) + else: + raise NotFound(f"object {obj} has bad type {obj.type}") + snapshot = self.get_snapshot() + self.counts["snapshot"] += 1 + storage_summary.update(self.storage.snapshot_add([snapshot])) + + storage_summary.update(self.flush()) + self.loaded_snapshot_id = snapshot.id + + for (object_type, total) in self.counts.items(): + filtered = total - storage_summary[f"{object_type}:add"] + assert 0 <= filtered <= total, (filtered, total) + + if total == 0: + # No need to send it + continue + + # cannot use self.statsd_average, because this is a weighted average + tags = {"object_type": object_type} + + # unweighted average + self.statsd.histogram( + "filtered_objects_percent", filtered / total, tags=tags + ) + + # average weighted by total + self.statsd.increment("filtered_objects_total_sum", filtered, tags=tags) + self.statsd.increment("filtered_objects_total_count", total, tags=tags) + + self.log.info( + "Fetched %d objects; %d are new", + sum(self.counts.values()), + sum(storage_summary[f"{object_type}:add"] for object_type in self.counts), + ) + def _resolve_ext_ref(self, sha1: bytes) -> Tuple[int, bytes]: """Resolve external references to git objects a pack file might contain by getting associated git manifests from the archive. @@ -722,7 +1019,7 @@ class GitLoader(BaseGitLoader): # known. We can look these objects up in the archive, as they should # have had all their ancestors loaded when the previous snapshot was # loaded. - refs_for_target = defaultdict(list) + refs_for_target = collections.defaultdict(list) for ref_name, target in unknown_objects.items(): refs_for_target[target].append(ref_name) blob - /dev/null blob + 55ada4d3f9130709a154e77e03f1186edac025ab (mode 644) Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo1.bundle differ blob - /dev/null blob + fadda9170d4c7ff008fb82c9f40ac43b036bf605 (mode 644) Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo2.bundle differ blob - /dev/null blob + 2adc27756260fd88ad11853ce3b95f5051f371a8 (mode 644) Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo3.bundle differ blob - /dev/null blob + cda23c3c08d02b02362699fa95271a324e26d750 (mode 644) Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo4.bundle differ blob - b4191278134487036b84dbfa52f06b53008a9dcc blob + d199507f2851280fa67d25572d06aa042f027f5c --- swh/loader/git/tests/test_from_disk.py +++ swh/loader/git/tests/test_from_disk.py @@ -181,6 +181,8 @@ class CommonGitLoaderTests: # (patching won't work self.loader is already instantiated) # Make get_contents fail for some reason self.loader.get_contents = None + # Make store_data_topological fail for some reason + self.loader.store_data_topological = None res = self.loader.load() assert res["status"] == "failed" @@ -248,6 +250,8 @@ class CommonGitLoaderTests: assert res == {"status": "eventful"} assert self.loader.storage.snapshot_get_branches(partial_snapshot.id) + + self.loader.storage.flush() res = self.loader.load() assert res == {"status": "eventful"} blob - 41fa0d5e23df5b567ffb87534e22a349ae122995 blob + bf1d67a55efa310f33e1c25c9aacc78155591e6d --- swh/loader/git/tests/test_loader.py +++ swh/loader/git/tests/test_loader.py @@ -9,6 +9,7 @@ from http.server import HTTPServer, SimpleHTTPRequestH import io import logging import os +import shutil import subprocess import sys from tempfile import SpooledTemporaryFile @@ -29,6 +30,7 @@ import sentry_sdk from swh.loader.git import converters, dumb from swh.loader.git.loader import FetchPackReturn, GitLoader, split_lines_and_remainder from swh.loader.git.tests.test_from_disk import SNAPSHOT1, FullGitLoaderTests +from swh.loader.git.utils import HexBytes from swh.loader.tests import ( assert_last_visit_matches, get_stats, @@ -1145,3 +1147,177 @@ def test_loader_too_large_pack_file_for_github_origin( ) def test_split_lines_and_remainder(input, output): assert split_lines_and_remainder(input) == output + +TEST_DATA = os.path.join(os.path.dirname(__file__), "data") + +def test_loader_load_in_toplogical_order( + swh_storage, datadir, tmp_path, mocker, sentry_events +): + """Ensure that objects which are already present in storage will not be + sent to storage again if they reappear in a pack file fetched during a + subsequent visit. + """ + + def prepare_load_from_bundle(bundle_name, index): + bundle = os.path.join(TEST_DATA, "git-repos", f"{bundle_name}{index}.bundle") + repo_path = os.path.join(tmp_path, bundle_name) + shutil.rmtree(repo_path, ignore_errors=True) + git = subprocess.Popen( + ["git", "clone", "--quiet", "--bare", "--mirror", bundle, repo_path], + cwd=TEST_DATA, + ) + (stdout, stderr) = git.communicate() + assert not stderr + return GitLoader(swh_storage, f"file://{repo_path}") + + # The first bundle contains one commit which adds some files + # and directories: alpha, beta, gamma/delta, epsilon/zeta + loader = prepare_load_from_bundle("topological-repo", 1) + assert loader.load() == {"status": "eventful"} + counts = loader.counts + + # This is a routine first visit scenario. + # All objects should have been loaded. + assert counts["content"] == 4 + assert counts["directory"] == 3 + assert counts["revision"] == 1 + assert counts["releases"] == 0 + assert get_stats(swh_storage) == { + "content": 4, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 1, + "skipped_content": 0, + "snapshot": 1, + } + + # Ensure that data loaded up to this point shows up in persistent storage + swh_storage.flush() + + # The second bundle contains two commits. + # The first commit is the same as before. The second commit makes changes + # to the files: alpha, beta, gamma/delta, epsilon/zeta + # Cloning from a Git repository via file:// results in a minimal pack file + # being generated that omits objects loaded during the previous visit because + # the Git loader reports local heads which have already been archived. + # In real life, Github may send us cached pack files during fetches which + # contain objects we have already archived. This test needs to mirror that + # situation in order to trigger the behaviour under test. + # For this reason we extract pack file data from the bundle and mock the + # loader's fetch_data() method to read data from this pack file instead + # of running 'git clone'. + def get_packfile_from_bundle(self) -> bool: + bundlepath = os.path.join( + TEST_DATA, + "git-repos", + f"topological-repo{self.get_packfile_bundle_index}.bundle", + ) + logging.debug( + "Protocol used for communication: " + f"get_packfile_from_bundle({os.path.basename(bundlepath)})" + ) + with open(bundlepath, "rb") as bundle: + pack_path = os.path.join( + tmp_path, + "topological-repo", + "objects", + "pack", + f"pack-{self.get_packfile_pack_hash}.pack", + ) + self.pack_buffer = open(pack_path, mode="wb+") + bundle.seek(self.get_packfile_bundle_header_len) # skip bundle header + self.pack_buffer.write(bundle.read()) + self.pack_buffer.flush() + self.pack_size = self.pack_buffer.tell() + self.pack_buffer.seek(0) + self.remote_refs = { + b"refs/heads/main": HexBytes(self.get_packfile_main_ref_hash), + } + self.symbolic_refs = {} + self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()} + self.dumb = False + return False + + repo_path = os.path.join(tmp_path, "topological-repo") + mocker.patch.object(GitLoader, "fetch_data", get_packfile_from_bundle) + loader = GitLoader(swh_storage, f"file://{repo_path}") + loader.get_packfile_bundle_index = 2 + loader.get_packfile_bundle_header_len = 74 + loader.get_packfile_pack_hash = "a6b77e7f1a358b6f17e4395da7ad8f936849cc76" + loader.get_packfile_main_ref_hash = b"dad8b9cc684bceb77bb6404834d05a17e8e98a29" + loader.load() == {"status": "eventful"} + + # Only new objects should have been sent to storage. + counts = loader.counts + assert counts["content"] == 4 + assert counts["directory"] == 3 + assert counts["revision"] == 1 + assert counts["releases"] == 0 + + # Storage should have all objects. + assert get_stats(swh_storage) == { + "content": 8, + "directory": 6, + "origin": 1, + "origin_visit": 2, + "release": 0, + "revision": 2, + "skipped_content": 0, + "snapshot": 2, + } + + # Load another packfile which adds a third commit. + loader = GitLoader(swh_storage, f"file://{repo_path}") + loader.get_packfile_bundle_index = 3 + loader.get_packfile_bundle_header_len = 133 + loader.get_packfile_pack_hash = "cdcca5adfd76c0c5662f9db80b319d780d1baf47" + loader.get_packfile_main_ref_hash = b"6ace83d95e70ef36cd585ed0b99bf1085a2d6075" + loader.load() == {"status": "eventful"} + + # Only new objects should have been sent to storage. + counts = loader.counts + assert counts["content"] == 1 + assert counts["directory"] == 2 + assert counts["revision"] == 1 + assert counts["releases"] == 0 + + # Storage should have all objects. + assert get_stats(swh_storage) == { + "content": 9, + "directory": 8, + "origin": 1, + "origin_visit": 3, + "release": 0, + "revision": 3, + "skipped_content": 0, + "snapshot": 3, + } + + # Load another packfile that adds two more commits, one of which is a merge. + loader = GitLoader(swh_storage, f"file://{repo_path}") + loader.get_packfile_bundle_index = 4 + loader.get_packfile_bundle_header_len = 187 + loader.get_packfile_pack_hash = "fb1f0d696fdb4d2a3d0a8a96cbf25c08b9310b1a" + loader.get_packfile_main_ref_hash = b"a7ce77855ea3a2e6b5eb112fc795a6c030bdf91e" + loader.load() == {"status": "eventful"} + + # Only new objects should have been sent to storage. + counts = loader.counts + assert counts["content"] == 1 + assert counts["directory"] == 3 + assert counts["revision"] == 2 + assert counts["releases"] == 0 + + # Storage should have all objects. + assert get_stats(swh_storage) == { + "content": 10, + "directory": 11, + "origin": 1, + "origin_visit": 4, + "release": 0, + "revision": 5, + "skipped_content": 0, + "snapshot": 4, + }