commit 1abdec0ef115d8c29968fc2eeb22936bff743295 from: Stefan Sperling date: Wed Sep 11 11:16:55 2024 UTC reduce storage requests by inferring object presence via dependencies Query the presence of packed objects in bulk. First look for missing contents. Assuming topogical loading order, trees, commits, and tags which depend on these contents will be missing as well. Based on this assumption we can avoid sending queries about such objects to storage. The approach was suggested by olasd, but all potential bugs in here are mine. commit - 39b88888999abc64dd5ea2dd56e48bcc639fab15 commit + 1abdec0ef115d8c29968fc2eeb22936bff743295 blob - c31af09d8a3d859fa7c78e67803100b9d74404ac blob + bad93b27fbcbfcfe219bca5547cabae56b6d75e1 --- swh/loader/git/loader.py +++ swh/loader/git/loader.py @@ -695,6 +695,7 @@ class GitLoader(BaseGitLoader): percent_done = p if self.pack_size < 12: # No pack file header present + logger.debug("Pack file too small") return False packdata = PackData.from_file(self.pack_buffer, self.pack_size) @@ -718,7 +719,8 @@ class GitLoader(BaseGitLoader): self._object_graph = Graph(directed=True) - # Find all commits and corresponding tree roots in the pack file + # Find all tags, commits and corresponding tree roots in the pack file + tags = {} commits = {} commit_edges = [] for ref_name, ref_object_hex in self.remote_refs.items(): @@ -728,16 +730,26 @@ class GitLoader(BaseGitLoader): obj = self.pack[ref_object_hex] except KeyError: continue - logger.debug(f"Opened {obj}") - # Peel tags for now; consider adding them to the graph later + + logger.debug(f"Opened {obj.type_name} {obj}") + while obj.type_name == b"tag": + tag_hash = hashutil.bytehex_to_hash(ref_object_hex) + tagged_object_hex = obj.object[1] + logger.debug(f"Opened tag {obj} pointing at {tagged_object_hex}") try: - ref_object_hex = obj.object[1] - obj = self.pack[ref_object_hex] - logger.debug(f"Opened {obj}") + tagged_obj = self.pack[tagged_object_hex] except KeyError: + logger.debug(f" pack is missing: {tagged_object_hex}") obj = None break + else: + tagged_hash = hashutil.bytehex_to_hash(tagged_object_hex) + tags[tag_hash] = tagged_hash + obj = tagged_obj + ref_object_hex = tagged_object_hex + + # TODO: Allow tags pointing at blobs or trees? if obj is None or obj.type_name != b"commit": continue @@ -763,9 +775,11 @@ class GitLoader(BaseGitLoader): attributes["object_type"] = [object_type for x in new_vertices] self._object_graph.add_vertices(new_vertices, attributes=attributes) - # Add commits and root trees to the graph + # Add tags, commits and root trees to the graph + add_vertices(list(tags.keys()), GitObjectType.TAG) add_vertices(list(commits.keys()), GitObjectType.COMMIT) add_vertices(list(commits.values()), GitObjectType.TREE) + self._object_graph.add_edges(zip(tags.keys(), tags.values())) self._object_graph.add_edges(zip(commits.keys(), commits.values())) self._object_graph.add_edges(commit_edges) @@ -865,6 +879,12 @@ class GitLoader(BaseGitLoader): def store_data_topological(self) -> None: assert self.origin + + # No object graph was created if the pack file was empty. + if not hasattr(self, "_object_graph"): + logger.debug("No objects to load") + return + if self.save_data_path: self.save_data() @@ -893,46 +913,175 @@ class GitLoader(BaseGitLoader): f"{ref_name!r}: {ref_object!r} was found in neither the " "fetched pack file nor in local heads nor in the archive" ) - for o in self.walk_ref(ref_name, ref_object, self.pack): - obj = self.pack[hashutil.hash_to_bytehex(o)] - logger.debug(f"Loading object {obj.id}") - if obj.type_name == b"blob": - if obj.id in self.ref_object_types: - self.ref_object_types[obj.id] = SnapshotTargetType.CONTENT - content = converters.dulwich_blob_to_content( - obj, max_content_size=self.max_content_size + + def get_dependencies(object_hash): + vertices = [self._object_graph.vs.find(name=object_hash)] + while len(vertices) > 0: + v = vertices.pop(0) + for s in self._object_graph.successors(v): + vertices.append(s) + yield ( + self._object_graph.vs[s]["name"], + self._object_graph.vs[s]["object_type"], ) - if isinstance(content, Content): - self.counts["content"] += 1 - storage_summary.update(self.storage.content_add([content])) - elif isinstance(content, SkippedContent): - self.counts["skipped_content"] += 1 - storage_summary.update( - self.storage.skipped_content_add([content]) + + try: + blob_vertices = self._object_graph.vs.select(object_type=GitObjectType.BLOB) + except KeyError: + missing_contents = set() + else: + missing_contents = set( + self.storage.content_missing_per_sha1_git(blob_vertices["name"]) + ) + + try: + tree_vertices = self._object_graph.vs.select(object_type=GitObjectType.TREE) + except KeyError: + tree_vertices = set() + + self.log.debug( + "Number of packed blobs that are missing in storage: " + f"{len(missing_contents)}" + ) + tree_hashes: List[bytes] = [] + if len(missing_contents) > 0: + missing_directories = set() + self.log.debug( + "Searching for packed trees which depend on missing packed blobs" + ) + for t in tree_vertices: + tree_hash = t["name"] + have_dep = False + for (dep_hash, dep_type) in get_dependencies(tree_hash): + have_dep = True + if dep_type == GitObjectType.BLOB and dep_hash in missing_contents: + self.log.debug( + f"tree {hashutil.hash_to_bytehex(tree_hash)!r} depends on " + f"missing {dep_type} {hashutil.hash_to_bytehex(dep_hash)!r}" ) - else: - raise TypeError(f"Unexpected content type: {content}") - elif obj.type_name == b"tree": - if obj.id in self.ref_object_types: - self.ref_object_types[obj.id] = SnapshotTargetType.DIRECTORY - self.counts["directory"] += 1 - directory = converters.dulwich_tree_to_directory(obj) - storage_summary.update(self.storage.directory_add([directory])) - elif obj.type_name == b"commit": - if obj.id in self.ref_object_types: - self.ref_object_types[obj.id] = SnapshotTargetType.REVISION - self.counts["revision"] += 1 - revision = converters.dulwich_commit_to_revision(obj) - storage_summary.update(self.storage.revision_add([revision])) - elif obj.type_name == b"tag": - if obj.id in self.ref_object_types: - self.ref_object_types[obj.id] = SnapshotTargetType.RELEASE - self.counts["release"] += 1 - release = converters.dulwich_tag_to_release(obj) - storage_summary.update(self.storage.release_add([release])) - else: - raise NotFound(f"object {obj} has bad type {obj.type}") + # We can infer that the tree is also missing. + missing_directories.add(tree_hash) + break + if not have_dep: + # An empty tree has no dependencies. Determine if it is missing. + tree_hashes = [hashutil.hash_to_bytes(tree_hash)] + missing_empty_tree = set( + self.storage.directory_missing(tree_hashes) + ) + if len(missing_empty_tree): + missing_directories.add(tree_hash) + self.log.debug( + "Number of packed trees considered missing by implication: " + f"{len(missing_directories)}" + ) + else: + tree_hashes = [hashutil.hash_to_bytes(t["name"]) for t in tree_vertices] + missing_directories = set(self.storage.directory_missing(tree_hashes)) + try: + commit_vertices = self._object_graph.vs.select( + object_type=GitObjectType.COMMIT + ) + except KeyError: + commit_vertices = set() + missing_revisions = set() + if len(missing_contents) > 0 or len(missing_directories) > 0: + self.log.debug( + "Searching for packed commits which depend on missing packed blobs or trees" + ) + for c in commit_vertices: + commit_hash = c["name"] + for (dep_hash, dep_type) in get_dependencies(commit_hash): + if dep_hash in missing_contents or dep_hash in missing_directories: + self.log.debug( + f"commit {hashutil.hash_to_bytehex(commit_hash)!r} depends on " + f"missing {dep_type} {hashutil.hash_to_bytehex(dep_hash)!r}" + ) + # We can infer that the commit is also missing. + missing_revisions.add(commit_hash) + break + self.log.debug( + "Number of packed commits considered missing by implication: " + f"{len(missing_revisions)}" + ) + else: + commit_hashes = [c["name"] for c in commit_vertices] + missing_revisions = set(self.storage.revision_missing(commit_hashes)) + + for blob_hash in missing_contents: + obj = self.pack[hashutil.hash_to_bytehex(blob_hash)] + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.CONTENT + content = converters.dulwich_blob_to_content( + obj, max_content_size=self.max_content_size + ) + if isinstance(content, Content): + self.counts["content"] += 1 + storage_summary.update(self.storage.content_add([content])) + elif isinstance(content, SkippedContent): + self.counts["skipped_content"] += 1 + storage_summary.update(self.storage.skipped_content_add([content])) + else: + raise TypeError(f"Unexpected content type: {content}") + + try: + tag_vertices = self._object_graph.vs.select(object_type=GitObjectType.TAG) + except KeyError: + tag_vertices = set() + missing_releases = set() + if len(missing_revisions) > 0: + self.log.debug( + "Searching for packed tags which depend on missing packed objects" + ) + for t in tag_vertices: + tag_hash = t["name"] + for (dep_hash, dep_type) in get_dependencies(tag_hash): + if ( + dep_hash in missing_revisions + or dep_hash in missing_directories + or dep_hash in missing_contents + ): + self.log.debug( + f"tag {hashutil.hash_to_bytehex(tag_hash)!r} depends on " + f"missing {dep_type} {hashutil.hash_to_bytehex(dep_hash)!r}" + ) + # We can infer that the tag is also missing. + missing_releases.add(tag_hash) + break + self.log.debug( + "Number of packed tag considered missing by implication: " + f"{len(missing_releases)}" + ) + else: + tag_hashes = [t["name"] for t in tag_vertices] + missing_releases = set(self.storage.release_missing(tag_hashes)) + + for tree_hash in missing_directories: + obj = self.pack[hashutil.hash_to_bytehex(tree_hash)] + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.DIRECTORY + self.counts["directory"] += 1 + directory = converters.dulwich_tree_to_directory(obj) + storage_summary.update(self.storage.directory_add([directory])) + + for commit_hash in missing_revisions: + obj = self.pack[hashutil.hash_to_bytehex(commit_hash)] + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.REVISION + self.counts["revision"] += 1 + revision = converters.dulwich_commit_to_revision(obj) + storage_summary.update(self.storage.revision_add([revision])) + + for tag_hash in missing_releases: + obj = self.pack[hashutil.hash_to_bytehex(tag_hash)] + if obj.id in self.ref_object_types: + self.ref_object_types[obj.id] = SnapshotTargetType.RELEASE + self.counts["release"] += 1 + release = converters.dulwich_tag_to_release(obj) + storage_summary.update(self.storage.release_add([release])) + + self.flush() + snapshot = self.get_snapshot() self.counts["snapshot"] += 1 storage_summary.update(self.storage.snapshot_add([snapshot])) @@ -1227,10 +1376,12 @@ class GitLoader(BaseGitLoader): """The load was eventful if the current snapshot is different to the one we retrieved at the beginning of the run""" eventful = False - if self.prev_snapshot and self.snapshot: - eventful = self.snapshot.id != self.prev_snapshot.id - elif self.snapshot: - eventful = bool(self.snapshot.branches) + + if hasattr(self, "snapshot"): + if self.prev_snapshot and self.snapshot: + eventful = self.snapshot.id != self.prev_snapshot.id + elif self.snapshot: + eventful = bool(self.snapshot.branches) return {"status": ("eventful" if eventful else "uneventful")}