commit 51b5b0c65c78fcde4d9df304a0783a8ef7fa4b87 from: Stefan Sperling date: Fri Oct 4 13:31:30 2024 UTC store trees in subgraphs which can be swapped out to a temporary file This should reduce memory requirements significantly. Storing all trees of all commits with all tree entries in a single igraph uses too much memory to load repositories such as git.git. commit - a77c88011f70493a154286133edefe6cf85fec7e commit + 51b5b0c65c78fcde4d9df304a0783a8ef7fa4b87 blob - 2327196ec86d098b8aa391e9660d795e0cde887b blob + a206c2ad77ab32e299e9813c9dc43db2ee39f4d0 --- swh/loader/git/loader.py +++ swh/loader/git/loader.py @@ -7,6 +7,7 @@ import collections from dataclasses import dataclass import datetime from enum import Enum +import io import json import logging import os @@ -71,6 +72,28 @@ logger = logging.getLogger(__name__) heads_logger = logger.getChild("refs") remote_logger = logger.getChild("remote") fetch_pack_logger = logger.getChild("fetch_pack") + + +class GraphSwapper: + + def __init__(self, graph: Graph, swapfile: SpooledTemporaryFile): + self.graph = graph + self.f = swapfile + self.offset = -1 + + def swap_out(self): + if self.offset != -1: + return + self.f.seek(0, io.SEEK_END) + self.offset = self.f.tell() + self.graph.write_picklez(self.f) + del self.graph + + def swap_in(self): + if self.offset == -1: + raise ValueError("graph has not been swapped out") + self.f.seek(self.offset, io.SEEK_SET) + return Graph.Read_Pickelz(self.f) def split_lines_and_remainder(buf: bytes) -> Tuple[List[bytes], bytes]: @@ -720,7 +743,25 @@ class GitLoader(BaseGitLoader): def make_object_graph(self): logger.info("Building packed objects graph") - self._object_graph = Graph(directed=True) + # Graph of commits, and tags. Always stored in memory. + self._commit_graph = Graph(directed=True) + + # We store trees in an ordered collection of Graphs. + # Each Graph can be swapped out to disk in order to reduce memory requirements. + # This collection is ordered to allow for LRU cache behaviour via the methods + # popitem(last=False) and move_to_end(last=False). + self._tree_graphs: collections.OrderedDict[bytes, Graph] = ( + collections.OrderedDict() + ) + self._max_trees_in_mem = 100 + + # Graphs which swapped out to disk and will be read back on demand. + # Uses a SpooledTemporaryFile to avoid unnecessary disk I/O for smaller graphs. + # The swap file is shared among all GraphSwappers which is fine because we are + # currently not multi-threaded (should that change we'd need a lock in the + # GraphSwapper class which covers I/O sequences involving seek + read/write). + self._swapfile = SpooledTemporaryFile(max_size=self.temp_file_cutoff) + self._swapped_graphs: Dict[bytes, GraphSwapper] = dict() def get_pos_in_index(id_hex): """ @@ -841,29 +882,29 @@ class GitLoader(BaseGitLoader): # Calculate how many more objects we still need to add to the graph. num_objects_left = self.num_objects - len(tags) - len(commits) - def add_vertices(new_vertices, object_type): + def add_vertices(graph, new_vertices, object_type): attributes = dict() attributes["object_type"] = [object_type for x in new_vertices] - self._object_graph.add_vertices(new_vertices, attributes=attributes) + graph.add_vertices(new_vertices, attributes=attributes) # Add vertices for any directly referenced trees and blobs. num_objects_left -= len(new_trees) + len(new_blobs) if len(new_trees) > 0: - add_vertices(new_trees, GitObjectType.TREE) + add_vertices(self._commit_graph, new_trees, GitObjectType.TREE) new_trees = [] if len(new_blobs) > 0: - add_vertices(new_blobs, GitObjectType.BLOB) + add_vertices(self._commit_graph, new_blobs, GitObjectType.BLOB) new_blobs = [] # Add tags, commits and root trees to the graph - add_vertices(list(tags.keys()), GitObjectType.TAG) - add_vertices(list(commits.keys()), GitObjectType.COMMIT) - add_vertices(list(commits.values()), GitObjectType.TREE) - self._object_graph.add_edges(zip(tags.keys(), tags.values())) - self._object_graph.add_edges(zip(commits.keys(), commits.values())) - self._object_graph.add_edges(commit_edges) + add_vertices(self._commit_graph, list(tags.keys()), GitObjectType.TAG) + add_vertices(self._commit_graph, list(commits.keys()), GitObjectType.COMMIT) + add_vertices(self._commit_graph, list(commits.values()), GitObjectType.TREE) + self._commit_graph.add_edges(zip(tags.keys(), tags.values())) + self._commit_graph.add_edges(zip(commits.keys(), commits.values())) + self._commit_graph.add_edges(commit_edges) - # Populate the graph with trees and blobs + # Create per-commit tree graphs populated with subtrees and blobs new_edges = [] submodule_mode = stat.S_IFDIR | stat.S_IFLNK i = 0 @@ -871,10 +912,27 @@ class GitLoader(BaseGitLoader): num_tree_entries = 0 logger.info("Adding trees and blobs to the graph...") for commit_hash, tree_hash in commits.items(): + if tree_hash in self._tree_graphs.keys(): + # Keep recently used trees cached in memory. + self._tree_graphs.move_to_end(tree_hash, last=False) + continue + if tree_hash in self._swapped_graphs.keys(): + continue + if len(self._tree_graphs) >= self._max_trees_in_mem: + (other_tree_hash, other_tree_graph) = self._tree_graphs.popitem( + last=False + ) + swapper = GraphSwapper(other_tree_graph, self._swapfile) + self._swapped_graphs[other_tree_hash] = swapper + swapper.swap_out() + tree_graph = Graph(directed=True) + self._tree_graphs[tree_hash] = tree_graph + self._tree_graphs.move_to_end(tree_hash, last=False) # Show some debug progress output for very large datasets if i > 0 and i % 10000 == 0: logger.debug(f"Adding trees and blobs: {i} commits processed...") i = i + 1 + new_trees.append(tree_hash) subtrees = [tree_hash] while len(subtrees) > 0: tree_hash = subtrees.pop(0) @@ -906,16 +964,12 @@ class GitLoader(BaseGitLoader): # add new vertices and edges in batches for performance reasons if len(new_trees) + len(new_blobs) > 100000 or len(new_edges) > 1000000: if len(new_trees) > 0: - add_vertices(new_trees, GitObjectType.TREE) + add_vertices(tree_graph, new_trees, GitObjectType.TREE) if len(new_blobs) > 0: - add_vertices(new_blobs, GitObjectType.BLOB) + add_vertices(tree_graph, new_blobs, GitObjectType.BLOB) if len(new_edges) > 0: num_tree_entries += len(new_edges) - logger.debug( - f"Adding {len(new_edges)} tree entries to the graph " - f"({num_tree_entries} entries total)" - ) - self._object_graph.add_edges(new_edges) + tree_graph.add_edges(new_edges) num_objects_found += len(new_trees) + len(new_blobs) logger.debug( f"Added {int((num_objects_found * 100) / num_objects_left)}% " @@ -925,18 +979,14 @@ class GitLoader(BaseGitLoader): new_blobs = [] new_edges = [] - num_objects_found += len(new_trees) + len(new_blobs) - if len(new_trees) > 0: - add_vertices(new_trees, GitObjectType.TREE) - if len(new_blobs) > 0: - add_vertices(new_blobs, GitObjectType.BLOB) - if len(new_edges) > 0: - num_tree_entries += len(new_edges) - logger.debug( - f"Adding {len(new_edges)} tree entries to the graph " - f"({num_tree_entries} entries total)" - ) - self._object_graph.add_edges(new_edges) + num_objects_found += len(new_trees) + len(new_blobs) + if len(new_trees) > 0: + add_vertices(tree_graph, new_trees, GitObjectType.TREE) + if len(new_blobs) > 0: + add_vertices(tree_graph, new_blobs, GitObjectType.BLOB) + if len(new_edges) > 0: + num_tree_entries += len(new_edges) + tree_graph.add_edges(new_edges) logger.info( f"Added {int((num_objects_found * 100) / num_objects_left)}% " @@ -971,7 +1021,7 @@ class GitLoader(BaseGitLoader): assert self.origin # No object graph was created if the pack file was empty. - if not hasattr(self, "_object_graph"): + if not hasattr(self, "_commit_graph"): logger.debug("No objects to load") return @@ -1004,108 +1054,127 @@ class GitLoader(BaseGitLoader): "fetched pack file nor in local heads nor in the archive" ) - def get_successors(v, object_type=None): - vertices = [v] - while len(vertices) > 0: - v = vertices.pop(0) - for s in self._object_graph.successors(v): - if object_type is not None: - if self._object_graph.vs[s]["object_type"] != object_type: - continue - if s not in vertices: - vertices.append(s) - yield ( - self._object_graph.vs[s]["name"], - self._object_graph.vs[s]["object_type"], - ) - - def get_neighbors(v, object_type=None): - for s in self._object_graph.neighbors(v): + def get_successors(graph, v, object_type=None): + for s in graph.successors(v): if object_type is not None: - if self._object_graph.vs[s]["object_type"] != object_type: + if graph.vs[s]["object_type"] != object_type: continue - yield ( - self._object_graph.vs[s]["name"], - self._object_graph.vs[s]["object_type"], - ) + yield (s, graph.vs[s]["name"], graph.vs[s]["object_type"]) - try: - blob_vertices = self._object_graph.vs.select(object_type=GitObjectType.BLOB) - except KeyError: - missing_contents = set() - else: - missing_contents = set( - self.storage.content_missing_per_sha1_git(blob_vertices["name"]) - ) - - try: - tree_vertices = self._object_graph.vs.select(object_type=GitObjectType.TREE) - except KeyError: - tree_vertices = set() + def get_neighbors(graph, v, object_type=None): + for s in graph.neighbors(v): + if object_type is not None: + if graph.vs[s]["object_type"] != object_type: + continue + yield (graph.vs[s]["name"], graph.vs[s]["object_type"]) + def iter_tree_graphs(root_tree_vertices): + for t in root_tree_vertices: + tree_hash = t["name"] + try: + tree_graph = self._tree_graphs[tree_hash] + except KeyError: + swapper = self._swapped_graphs[tree_hash] + tree_graph = swapper.swap_in() + yield (tree_hash, tree_graph) + + # Load the set of all blob IDs found in the pack file. + root_tree_vertices = self._commit_graph.vs.select( + object_type=GitObjectType.TREE + ) + blob_hashes: Set[bytes] = set() + for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices): + try: + blob_vertices = tree_graph.vs.select(object_type=GitObjectType.BLOB) + blob_hashes.update(blob_vertices["name"]) + except KeyError: + continue + + # Find out which blobs are missing from the archive. + missing_contents = set( + self.storage.content_missing_per_sha1_git(list(blob_hashes)) + ) self.log.debug( "Number of packed blobs that are missing in storage: " - f"{len(missing_contents)} of {len(blob_vertices['name'])} packed blobs total" + f"{len(missing_contents)} of {len(blob_hashes)} packed blobs total" ) - tree_hashes: List[bytes] = [] - if len(missing_contents) == len(blob_vertices["name"]): + + missing_directories: Set[bytes] = set() + tree_hashes: Set[bytes] = set() + if len(missing_contents) == len(blob_hashes): # If all blobs are missing then all trees are missing, too. - tree_hashes = [hashutil.hash_to_bytes(t["name"]) for t in tree_vertices] - missing_directories = set(tree_hashes) + for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices): + missing_directories.add(hashutil.hash_to_bytes(root_hash)) + try: + tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE) + missing_directories.update( + map(hashutil.hash_to_bytes, tree_vertices["name"]) + ) + except KeyError: + continue self.log.debug( "Number of packed trees considered missing by implication: " f"{len(missing_directories)}" ) elif len(missing_contents) > 0: - missing_directories = set() + # If a subset of blobs is missing then a subset of trees are missing, too. self.log.debug( - f"Searching {len(tree_vertices)} packed trees for trees which " - "depend on missing packed blobs" + "Searching for packed trees which are missing from the archive " ) - nsearched = 0 - for t in tree_vertices: - tree_hash = t["name"] - nsearched += 1 - if (nsearched % 5000) == 0: - self.log.debug( - f"Searched {int((nsearched * 100) / len(tree_vertices))}% " - f"of {len(tree_vertices)} packed trees..." - ) - if tree_hash in missing_directories: - continue - have_dep = False - for dep_hash, dep_type in get_successors(t): - have_dep = True - if dep_type == GitObjectType.BLOB and dep_hash in missing_contents: - # We can infer that the tree is also missing. - missing_directories.add(tree_hash) - break - if not have_dep: - # An empty tree has no dependencies. Determine if it is missing. - tree_hashes = [hashutil.hash_to_bytes(tree_hash)] - missing_empty_tree = set( - self.storage.directory_missing(tree_hashes) - ) - if len(missing_empty_tree): - missing_directories.add(tree_hash) - self.log.debug( - f"Searched {int((nsearched * 100) / len(tree_vertices))}% " - f"of {len(tree_vertices)} packed trees..." - ) + for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices): + t = tree_graph.vs.find(root_hash) + subtrees = [(t, root_hash)] + while len(subtrees) > 0: + (t, tree_hash) = subtrees.pop(0) + if hashutil.hash_to_bytes(tree_hash) in missing_directories: + continue + is_missing = False + for s, dep_hash, dep_type in get_successors(tree_graph, t): + if dep_type == GitObjectType.TREE: + if s not in subtrees: + subtrees.append((s, dep_hash)) + continue + if ( + dep_type == GitObjectType.BLOB + and dep_hash in missing_contents + ): + # Content this tree depends on is missing. + # We can infer that the tree itself is missing. + missing_directories.add(hashutil.hash_to_bytes(tree_hash)) + is_missing = True + break + if not is_missing: + # This tree is either empty or has no known-missing dependencies. + # Determine if it is missing by searching the archive. + missing_tree = set( + self.storage.directory_missing( + [hashutil.hash_to_bytes(tree_hash)] + ) + ) + if len(missing_tree): + missing_directories.add(hashutil.hash_to_bytes(tree_hash)) self.log.debug( - "Number of packed trees considered missing by implication: " - f"{len(missing_directories)}" + f"Number of packed trees found missing: {len(missing_directories)}" ) else: - tree_hashes = [hashutil.hash_to_bytes(t["name"]) for t in tree_vertices] - missing_directories = set(self.storage.directory_missing(tree_hashes)) + # All blobs are present. Any of these blobs might have been added + # to the archive via imports of other origins. We must query the + # archive for missing trees specific to the origin we are importing. + for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices): + tree_hashes.add(root_hash) + try: + tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE) + tree_hashes.add(tree_vertices["name"]) + except KeyError: + continue + missing_directories = set(self.storage.directory_missing(list(tree_hashes))) self.log.debug( "Number of missing trees according to archive query: " f"{len(missing_directories)}" ) try: - commit_vertices = self._object_graph.vs.select( + commit_vertices = self._commit_graph.vs.select( object_type=GitObjectType.COMMIT ) except KeyError: @@ -1126,8 +1195,10 @@ class GitLoader(BaseGitLoader): ) if commit_hash in missing_revisions: continue - for dep_hash, dep_type in get_neighbors(c, GitObjectType.TREE): - if dep_hash in missing_contents or dep_hash in missing_directories: + for dep_hash, dep_type in get_neighbors( + self._commit_graph, c, GitObjectType.TREE + ): + if dep_hash in missing_directories: # We can infer that the commit is also missing. missing_revisions.add(commit_hash) break @@ -1166,7 +1237,7 @@ class GitLoader(BaseGitLoader): # XXX This could result in tags pointing at commits which are # missing from the archive. try: - tag_vertices = self._object_graph.vs.select(object_type=GitObjectType.TAG) + tag_vertices = self._commit_graph.vs.select(object_type=GitObjectType.TAG) tag_hashes = [t["name"] for t in tag_vertices] missing_releases = set(self.storage.release_missing(tag_hashes)) except KeyError: