commit - 0dd744df1d1192d4eeb3fbe38c96cce76bda7376
commit + 971c04d1b79f92a74c42df10efba38fddf2ad006
blob - 4880ce47f67e486e84c40b56791d6693a02c6b1f
blob + 2bf0fd14a9839c0e3232ce7352ee471dffdfed17
--- swh/loader/git/base.py
+++ swh/loader/git/base.py
self.next_log_after = time.monotonic() + LOGGING_INTERVAL
def store_data(self) -> None:
+ # XXX This is a temporary hack to keep the dumb loader working.
+ if hasattr(self, "store_data_topological") and hasattr(self, "dumb"):
+ if not self.dumb:
+ self.store_data_topological()
+ return
+
assert self.origin
if self.save_data_path:
self.save_data()
blob - ec17c8b4aa4844707f053b13e05a7619261dd1fa
blob + 558384425de018b5bd4cb9814df33623677ae268
--- swh/loader/git/loader.py
+++ swh/loader/git/loader.py
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from collections import defaultdict
+import collections
from dataclasses import dataclass
import datetime
import json
import logging
import os
import pickle
-from tempfile import SpooledTemporaryFile
+import stat
+from tempfile import NamedTemporaryFile, SpooledTemporaryFile
import time
from typing import (
Any,
Callable,
Dict,
+ Generator,
Iterable,
Iterator,
List,
import dulwich.client
from dulwich.object_store import ObjectStoreGraphWalker
from dulwich.objects import Blob, Commit, ShaFile, Tag, Tree
-from dulwich.pack import PackData, PackInflater
+from dulwich.pack import Pack, PackData, PackInflater, load_pack_index
import urllib3.util
from swh.core.statsd import Statsd
RawExtrinsicMetadata,
Release,
Revision,
+ SkippedContent,
Snapshot,
SnapshotBranch,
SnapshotTargetType,
if not verify_certs:
self.urllib3_extra_kwargs["cert_reqs"] = "CERT_NONE"
self.requests_extra_kwargs["verify"] = False
+ self.objects_seen: Set[bytes] = set()
def fetch_pack_from_origin(
self,
# No more data to fetch
return False
+
+ def get_missing_parents(self, commit, pack):
+ missing_parents = []
+ parent_hashes = []
+ for p in commit.parents:
+ phash = hashutil.bytehex_to_hash(p)
+ if phash not in self.objects_seen:
+ parent_hashes.append(phash)
+ for p in self.storage.revision_missing(parent_hashes):
+ try:
+ obj = pack[hashutil.hash_to_bytehex(p)]
+ if not isinstance(obj, Commit):
+ raise ValueError(
+ f"Parent {hashutil.hash_to_bytehex(p)} of "
+ f"{commit} is not a commit object"
+ )
+ logger.debug(
+ f"Parent {hashutil.hash_to_bytehex(p)} of "
+ f"{commit} found in pack file"
+ )
+ missing_parents.append(p)
+ except KeyError as e:
+ raise NotFound(
+ f"Parent {hashutil.hash_to_bytehex(p)} of "
+ f"{commit} was found in neither the "
+ "fetched pack file nor the archive"
+ ) from e
+ return missing_parents
+
+ def get_missing_children(self, tree):
+ subtree_hashes = []
+ content_hashes = []
+ missing_subtrees = []
+ missing_contents = []
+ for (name, mode, sha) in tree.iteritems():
+ if mode & stat.S_IFDIR:
+ subtree_hashes.append(hashutil.bytehex_to_hash(sha))
+ logging.debug(f"subtree: {name} {sha}")
+ else:
+ content_hashes.append(hashutil.bytehex_to_hash(sha))
+ logging.debug(f"content: {name} {sha}")
+ for t in self.storage.directory_missing(subtree_hashes):
+ if t not in self.objects_seen:
+ missing_subtrees.append(t)
+ self.objects_seen.add(t)
+ for c in self.storage.content_missing_per_sha1_git(content_hashes):
+ if c not in self.objects_seen:
+ missing_contents.append(c)
+ self.objects_seen.add(c)
+ return (missing_subtrees, missing_contents)
+
+ def walk_commit_tree(self, commit, pack):
+ dirs_to_load = []
+ contents_to_load = {}
+
+ if hashutil.bytehex_to_hash(commit.tree) in self.objects_seen:
+ return
+
+ missing_dir_hashes = [
+ d
+ for d in self.storage.directory_missing(
+ [hashutil.bytehex_to_hash(commit.tree)]
+ )
+ ]
+ # Here we do a tree walk up to the bottom-most sub-trees in this tree
+ # which have already been loaded. Could this be optimized somehow?
+ # Thanks to our filtering of "seen" objects, when multiple commits being
+ # loaded contain the same tree we will only be loading new unique trees.
+ while missing_dir_hashes:
+ missing_dir_hash = missing_dir_hashes.pop()
+ logger.debug(f"missing dir: {hashutil.hash_to_bytehex(missing_dir_hash)!r}")
+ dirs_to_load.append(missing_dir_hash)
+ try:
+ tree = pack[hashutil.hash_to_bytehex(missing_dir_hash)]
+ if not isinstance(tree, Tree):
+ raise ValueError(f"Tree {tree} is not a Tree object")
+ except KeyError as e:
+ raise NotFound(
+ f"Tree {hashutil.hash_to_bytehex(missing_dir_hash)!r} "
+ "was found in neither the fetched pack file nor the archive"
+ ) from e
+ logger.debug(f"Tree {tree!r} found in pack file")
+ (missing_subtrees, missing_contents) = self.get_missing_children(tree)
+ if missing_subtrees:
+ logger.debug("missing subtrees:")
+ for d in missing_subtrees:
+ logger.debug(f"{hashutil.hash_to_bytehex(d)!r}")
+ missing_dir_hashes.extend(missing_subtrees)
+ if missing_contents:
+ logger.debug("missing contents:")
+ for c in missing_contents:
+ logger.debug(f"{hashutil.hash_to_bytehex(c)!r}")
+ contents_to_load[missing_dir_hash] = missing_contents or []
+ # Load the list of missing trees in reverse order, i.e. bottom-up
+ dirs_to_load.reverse()
+ for d in dirs_to_load:
+ # Load the contents required by this subtree
+ yield from contents_to_load[d]
+ # Load the subtree itself
+ yield d
+
+ def walk_commit(self, commit, pack) -> Generator:
+ # Here we do a history walk to identify all commits between the new
+ # tip commit and the bottom-most commits which have already been loaded.
+ # Could this be optimized somehow?
+ # Thanks to our filtering of "seen" objects, when multiple references point
+ # at commits in the pack file we will only be loading new unique commits.
+ parents_to_load = []
+ missing_parent_hashes = self.get_missing_parents(commit, pack)
+ while missing_parent_hashes:
+ parent_hash = missing_parent_hashes.pop()
+ parents_to_load.append(parent_hash)
+ parent = pack[hashutil.hash_to_bytehex(parent_hash)]
+ missing_parent_hashes.extend(self.get_missing_parents(parent, pack))
+
+ # Load the parent commit queue in reverse order, tail first.
+ # This way we load missing bottom-most commits first.
+ parents_to_load.reverse()
+ for p in parents_to_load:
+ parent = pack[hashutil.hash_to_bytehex(p)]
+ yield from self.walk_commit_tree(parent, pack)
+ self.objects_seen.add(hashutil.bytehex_to_hash(parent.tree))
+ yield parent.sha().digest()
+ self.objects_seen.add(parent.sha().digest())
+
+ # Now load the tip commit's tree.
+ yield from self.walk_commit_tree(commit, pack)
+ self.objects_seen.add(hashutil.bytehex_to_hash(commit.tree))
+
+ # All dependencies of this commit have been loaded. Load the commit.
+ yield commit.sha().digest()
+ self.objects_seen.add(commit.sha().digest())
+
+ def walk_tag(self, tag, pack) -> Generator:
+ # TODO: look up tag in archive if not present in pack
+ obj = pack[tag.object[1]]
+ while obj.type_name == b"tag":
+ obj = pack[obj.object[1]]
+ if obj.type_name == b"commit":
+ yield from self.walk_commit(obj, pack)
+ else:
+ raise ValueError(
+ f"tag {obj} points to object {obj.object} of unexpected type {obj.type_name}"
+ )
+ yield tag.sha().digest()
+
+ def walk_ref(self, ref_name, ref_object, pack) -> Generator:
+ logger.debug(f"Walking ref {ref_name}: {ref_object}")
+ obj = pack[ref_object]
+ logger.debug(f"Opened {obj}")
+ if obj.type_name == b"commit":
+ yield from self.walk_commit(obj, pack)
+ elif obj.type_name == b"tag":
+ yield from self.walk_tag(obj, pack)
+ else:
+ raise ValueError(f"object {obj} has an unexpected type")
+
+ def process_data(self) -> bool:
+ assert self.origin is not None
+
+ # XXX The dumb loader does not support loading objects in topological order yet.
+ if self.dumb:
+ return False
+
+ percent_done = 0
+
+ def pack_index_progress(i, num_objects):
+ nonlocal percent_done
+ p = int(i * 100 / num_objects)
+ if p != percent_done:
+ logger.debug(f"Indexing pack file: {p}%")
+ percent_done = p
+
+ if self.pack_size < 12: # No pack file header present
+ return False
+
+ packdata = PackData.from_file(self.pack_buffer, self.pack_size)
+ indexfile = NamedTemporaryFile(delete=False)
+ packdata.create_index(
+ indexfile.name,
+ progress=pack_index_progress,
+ version=2,
+ resolve_ext_ref=self._resolve_ext_ref,
+ )
+ logger.debug("Indexing pack file: 100%")
+ packindex = load_pack_index(indexfile.name)
+ self.pack = Pack.from_objects(packdata, packindex)
+ self.pack.resolve_ext_ref = self._resolve_ext_ref
+ return False
+
def save_data(self) -> None:
"""Store a pack for archival"""
assert isinstance(self.visit_date, datetime.datetime)
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
+
+ def store_data_topological(self) -> None:
+ assert self.origin
+ if self.save_data_path:
+ self.save_data()
+
+ self.counts: Dict[str, int] = collections.defaultdict(int)
+ storage_summary: Dict[str, int] = collections.Counter()
+
+ base_repo = self.repo_representation(
+ storage=self.storage,
+ base_snapshots=self.base_snapshots,
+ incremental=self.incremental,
+ statsd=self.statsd,
+ )
+
+ for ref_name, ref_object in self.remote_refs.items():
+ if utils.ignore_branch_name(ref_name):
+ continue
+ if ref_object not in self.pack:
+ if ref_object in base_repo.local_heads:
+ continue
+ ref_object_hash = hashutil.bytehex_to_hash(ref_object)
+ if ref_object_hash not in self.storage.revision_missing(
+ [ref_object_hash]
+ ):
+ continue
+ raise NotFound(
+ f"{ref_name!r}: {ref_object!r} was found in neither the "
+ "fetched pack file nor in local heads nor in the archive"
+ )
+ for o in self.walk_ref(ref_name, ref_object, self.pack):
+ obj = self.pack[hashutil.hash_to_bytehex(o)]
+ logger.debug(f"Loading object {obj.id}")
+ if obj.type_name == b"blob":
+ if obj.id in self.ref_object_types:
+ self.ref_object_types[obj.id] = SnapshotTargetType.CONTENT
+ content = converters.dulwich_blob_to_content(
+ obj, max_content_size=self.max_content_size
+ )
+ if isinstance(content, Content):
+ self.counts["content"] += 1
+ storage_summary.update(self.storage.content_add([content]))
+ elif isinstance(content, SkippedContent):
+ self.counts["skipped_content"] += 1
+ storage_summary.update(
+ self.storage.skipped_content_add([content])
+ )
+ else:
+ raise TypeError(f"Unexpected content type: {content}")
+ elif obj.type_name == b"tree":
+ if obj.id in self.ref_object_types:
+ self.ref_object_types[obj.id] = SnapshotTargetType.DIRECTORY
+ self.counts["directory"] += 1
+ directory = converters.dulwich_tree_to_directory(obj)
+ storage_summary.update(self.storage.directory_add([directory]))
+ elif obj.type_name == b"commit":
+ if obj.id in self.ref_object_types:
+ self.ref_object_types[obj.id] = SnapshotTargetType.REVISION
+ self.counts["revision"] += 1
+ revision = converters.dulwich_commit_to_revision(obj)
+ storage_summary.update(self.storage.revision_add([revision]))
+ elif obj.type_name == b"tag":
+ if obj.id in self.ref_object_types:
+ self.ref_object_types[obj.id] = SnapshotTargetType.RELEASE
+ self.counts["release"] += 1
+ release = converters.dulwich_tag_to_release(obj)
+ storage_summary.update(self.storage.release_add([release]))
+ else:
+ raise NotFound(f"object {obj} has bad type {obj.type}")
+ snapshot = self.get_snapshot()
+ self.counts["snapshot"] += 1
+ storage_summary.update(self.storage.snapshot_add([snapshot]))
+
+ storage_summary.update(self.flush())
+ self.loaded_snapshot_id = snapshot.id
+
+ for (object_type, total) in self.counts.items():
+ filtered = total - storage_summary[f"{object_type}:add"]
+ assert 0 <= filtered <= total, (filtered, total)
+
+ if total == 0:
+ # No need to send it
+ continue
+
+ # cannot use self.statsd_average, because this is a weighted average
+ tags = {"object_type": object_type}
+
+ # unweighted average
+ self.statsd.histogram(
+ "filtered_objects_percent", filtered / total, tags=tags
+ )
+
+ # average weighted by total
+ self.statsd.increment("filtered_objects_total_sum", filtered, tags=tags)
+ self.statsd.increment("filtered_objects_total_count", total, tags=tags)
+
+ self.log.info(
+ "Fetched %d objects; %d are new",
+ sum(self.counts.values()),
+ sum(storage_summary[f"{object_type}:add"] for object_type in self.counts),
+ )
+
def _resolve_ext_ref(self, sha1: bytes) -> Tuple[int, bytes]:
"""Resolve external references to git objects a pack file might contain
by getting associated git manifests from the archive.
# known. We can look these objects up in the archive, as they should
# have had all their ancestors loaded when the previous snapshot was
# loaded.
- refs_for_target = defaultdict(list)
+ refs_for_target = collections.defaultdict(list)
for ref_name, target in unknown_objects.items():
refs_for_target[target].append(ref_name)
blob - /dev/null
blob + 55ada4d3f9130709a154e77e03f1186edac025ab (mode 644)
Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo1.bundle differ
blob - /dev/null
blob + fadda9170d4c7ff008fb82c9f40ac43b036bf605 (mode 644)
Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo2.bundle differ
blob - /dev/null
blob + 2adc27756260fd88ad11853ce3b95f5051f371a8 (mode 644)
Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo3.bundle differ
blob - /dev/null
blob + cda23c3c08d02b02362699fa95271a324e26d750 (mode 644)
Binary files /dev/null and swh/loader/git/tests/data/git-repos/topological-repo4.bundle differ
blob - b4191278134487036b84dbfa52f06b53008a9dcc
blob + d199507f2851280fa67d25572d06aa042f027f5c
--- swh/loader/git/tests/test_from_disk.py
+++ swh/loader/git/tests/test_from_disk.py
# (patching won't work self.loader is already instantiated)
# Make get_contents fail for some reason
self.loader.get_contents = None
+ # Make store_data_topological fail for some reason
+ self.loader.store_data_topological = None
res = self.loader.load()
assert res["status"] == "failed"
assert res == {"status": "eventful"}
assert self.loader.storage.snapshot_get_branches(partial_snapshot.id)
+
+ self.loader.storage.flush()
res = self.loader.load()
assert res == {"status": "eventful"}
blob - 41fa0d5e23df5b567ffb87534e22a349ae122995
blob + bf1d67a55efa310f33e1c25c9aacc78155591e6d
--- swh/loader/git/tests/test_loader.py
+++ swh/loader/git/tests/test_loader.py
import io
import logging
import os
+import shutil
import subprocess
import sys
from tempfile import SpooledTemporaryFile
from swh.loader.git import converters, dumb
from swh.loader.git.loader import FetchPackReturn, GitLoader, split_lines_and_remainder
from swh.loader.git.tests.test_from_disk import SNAPSHOT1, FullGitLoaderTests
+from swh.loader.git.utils import HexBytes
from swh.loader.tests import (
assert_last_visit_matches,
get_stats,
)
def test_split_lines_and_remainder(input, output):
assert split_lines_and_remainder(input) == output
+
+TEST_DATA = os.path.join(os.path.dirname(__file__), "data")
+
+def test_loader_load_in_toplogical_order(
+ swh_storage, datadir, tmp_path, mocker, sentry_events
+):
+ """Ensure that objects which are already present in storage will not be
+ sent to storage again if they reappear in a pack file fetched during a
+ subsequent visit.
+ """
+
+ def prepare_load_from_bundle(bundle_name, index):
+ bundle = os.path.join(TEST_DATA, "git-repos", f"{bundle_name}{index}.bundle")
+ repo_path = os.path.join(tmp_path, bundle_name)
+ shutil.rmtree(repo_path, ignore_errors=True)
+ git = subprocess.Popen(
+ ["git", "clone", "--quiet", "--bare", "--mirror", bundle, repo_path],
+ cwd=TEST_DATA,
+ )
+ (stdout, stderr) = git.communicate()
+ assert not stderr
+ return GitLoader(swh_storage, f"file://{repo_path}")
+
+ # The first bundle contains one commit which adds some files
+ # and directories: alpha, beta, gamma/delta, epsilon/zeta
+ loader = prepare_load_from_bundle("topological-repo", 1)
+ assert loader.load() == {"status": "eventful"}
+ counts = loader.counts
+
+ # This is a routine first visit scenario.
+ # All objects should have been loaded.
+ assert counts["content"] == 4
+ assert counts["directory"] == 3
+ assert counts["revision"] == 1
+ assert counts["releases"] == 0
+ assert get_stats(swh_storage) == {
+ "content": 4,
+ "directory": 3,
+ "origin": 1,
+ "origin_visit": 1,
+ "release": 0,
+ "revision": 1,
+ "skipped_content": 0,
+ "snapshot": 1,
+ }
+
+ # Ensure that data loaded up to this point shows up in persistent storage
+ swh_storage.flush()
+
+ # The second bundle contains two commits.
+ # The first commit is the same as before. The second commit makes changes
+ # to the files: alpha, beta, gamma/delta, epsilon/zeta
+ # Cloning from a Git repository via file:// results in a minimal pack file
+ # being generated that omits objects loaded during the previous visit because
+ # the Git loader reports local heads which have already been archived.
+ # In real life, Github may send us cached pack files during fetches which
+ # contain objects we have already archived. This test needs to mirror that
+ # situation in order to trigger the behaviour under test.
+ # For this reason we extract pack file data from the bundle and mock the
+ # loader's fetch_data() method to read data from this pack file instead
+ # of running 'git clone'.
+ def get_packfile_from_bundle(self) -> bool:
+ bundlepath = os.path.join(
+ TEST_DATA,
+ "git-repos",
+ f"topological-repo{self.get_packfile_bundle_index}.bundle",
+ )
+ logging.debug(
+ "Protocol used for communication: "
+ f"get_packfile_from_bundle({os.path.basename(bundlepath)})"
+ )
+ with open(bundlepath, "rb") as bundle:
+ pack_path = os.path.join(
+ tmp_path,
+ "topological-repo",
+ "objects",
+ "pack",
+ f"pack-{self.get_packfile_pack_hash}.pack",
+ )
+ self.pack_buffer = open(pack_path, mode="wb+")
+ bundle.seek(self.get_packfile_bundle_header_len) # skip bundle header
+ self.pack_buffer.write(bundle.read())
+ self.pack_buffer.flush()
+ self.pack_size = self.pack_buffer.tell()
+ self.pack_buffer.seek(0)
+ self.remote_refs = {
+ b"refs/heads/main": HexBytes(self.get_packfile_main_ref_hash),
+ }
+ self.symbolic_refs = {}
+ self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()}
+ self.dumb = False
+ return False
+
+ repo_path = os.path.join(tmp_path, "topological-repo")
+ mocker.patch.object(GitLoader, "fetch_data", get_packfile_from_bundle)
+ loader = GitLoader(swh_storage, f"file://{repo_path}")
+ loader.get_packfile_bundle_index = 2
+ loader.get_packfile_bundle_header_len = 74
+ loader.get_packfile_pack_hash = "a6b77e7f1a358b6f17e4395da7ad8f936849cc76"
+ loader.get_packfile_main_ref_hash = b"dad8b9cc684bceb77bb6404834d05a17e8e98a29"
+ loader.load() == {"status": "eventful"}
+
+ # Only new objects should have been sent to storage.
+ counts = loader.counts
+ assert counts["content"] == 4
+ assert counts["directory"] == 3
+ assert counts["revision"] == 1
+ assert counts["releases"] == 0
+
+ # Storage should have all objects.
+ assert get_stats(swh_storage) == {
+ "content": 8,
+ "directory": 6,
+ "origin": 1,
+ "origin_visit": 2,
+ "release": 0,
+ "revision": 2,
+ "skipped_content": 0,
+ "snapshot": 2,
+ }
+
+ # Load another packfile which adds a third commit.
+ loader = GitLoader(swh_storage, f"file://{repo_path}")
+ loader.get_packfile_bundle_index = 3
+ loader.get_packfile_bundle_header_len = 133
+ loader.get_packfile_pack_hash = "cdcca5adfd76c0c5662f9db80b319d780d1baf47"
+ loader.get_packfile_main_ref_hash = b"6ace83d95e70ef36cd585ed0b99bf1085a2d6075"
+ loader.load() == {"status": "eventful"}
+
+ # Only new objects should have been sent to storage.
+ counts = loader.counts
+ assert counts["content"] == 1
+ assert counts["directory"] == 2
+ assert counts["revision"] == 1
+ assert counts["releases"] == 0
+
+ # Storage should have all objects.
+ assert get_stats(swh_storage) == {
+ "content": 9,
+ "directory": 8,
+ "origin": 1,
+ "origin_visit": 3,
+ "release": 0,
+ "revision": 3,
+ "skipped_content": 0,
+ "snapshot": 3,
+ }
+
+ # Load another packfile that adds two more commits, one of which is a merge.
+ loader = GitLoader(swh_storage, f"file://{repo_path}")
+ loader.get_packfile_bundle_index = 4
+ loader.get_packfile_bundle_header_len = 187
+ loader.get_packfile_pack_hash = "fb1f0d696fdb4d2a3d0a8a96cbf25c08b9310b1a"
+ loader.get_packfile_main_ref_hash = b"a7ce77855ea3a2e6b5eb112fc795a6c030bdf91e"
+ loader.load() == {"status": "eventful"}
+
+ # Only new objects should have been sent to storage.
+ counts = loader.counts
+ assert counts["content"] == 1
+ assert counts["directory"] == 3
+ assert counts["revision"] == 2
+ assert counts["releases"] == 0
+
+ # Storage should have all objects.
+ assert get_stats(swh_storage) == {
+ "content": 10,
+ "directory": 11,
+ "origin": 1,
+ "origin_visit": 4,
+ "release": 0,
+ "revision": 5,
+ "skipped_content": 0,
+ "snapshot": 4,
+ }