Commit Diff


commit - a77c88011f70493a154286133edefe6cf85fec7e
commit + 51b5b0c65c78fcde4d9df304a0783a8ef7fa4b87
blob - 2327196ec86d098b8aa391e9660d795e0cde887b
blob + a206c2ad77ab32e299e9813c9dc43db2ee39f4d0
--- swh/loader/git/loader.py
+++ swh/loader/git/loader.py
@@ -7,6 +7,7 @@ import collections
 from dataclasses import dataclass
 import datetime
 from enum import Enum
+import io
 import json
 import logging
 import os
@@ -71,6 +72,28 @@ logger = logging.getLogger(__name__)
 heads_logger = logger.getChild("refs")
 remote_logger = logger.getChild("remote")
 fetch_pack_logger = logger.getChild("fetch_pack")
+
+
+class GraphSwapper:
+
+    def __init__(self, graph: Graph, swapfile: SpooledTemporaryFile):
+        self.graph = graph
+        self.f = swapfile
+        self.offset = -1
+
+    def swap_out(self):
+        if self.offset != -1:
+            return
+        self.f.seek(0, io.SEEK_END)
+        self.offset = self.f.tell()
+        self.graph.write_picklez(self.f)
+        del self.graph
+
+    def swap_in(self):
+        if self.offset == -1:
+            raise ValueError("graph has not been swapped out")
+        self.f.seek(self.offset, io.SEEK_SET)
+        return Graph.Read_Pickelz(self.f)
 
 
 def split_lines_and_remainder(buf: bytes) -> Tuple[List[bytes], bytes]:
@@ -720,7 +743,25 @@ class GitLoader(BaseGitLoader):
     def make_object_graph(self):
         logger.info("Building packed objects graph")
 
-        self._object_graph = Graph(directed=True)
+        # Graph of commits, and tags. Always stored in memory.
+        self._commit_graph = Graph(directed=True)
+
+        # We store trees in an ordered collection of Graphs.
+        # Each Graph can be swapped out to disk in order to reduce memory requirements.
+        # This collection is ordered to allow for LRU cache behaviour via the methods
+        # popitem(last=False) and move_to_end(last=False).
+        self._tree_graphs: collections.OrderedDict[bytes, Graph] = (
+            collections.OrderedDict()
+        )
+        self._max_trees_in_mem = 100
+
+        # Graphs which swapped out to disk and will be read back on demand.
+        # Uses a SpooledTemporaryFile to avoid unnecessary disk I/O for smaller graphs.
+        # The swap file is shared among all GraphSwappers which is fine because we are
+        # currently not multi-threaded (should that change we'd need a lock in the
+        # GraphSwapper class which covers I/O sequences involving seek + read/write).
+        self._swapfile = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
+        self._swapped_graphs: Dict[bytes, GraphSwapper] = dict()
 
         def get_pos_in_index(id_hex):
             """
@@ -841,29 +882,29 @@ class GitLoader(BaseGitLoader):
         # Calculate how many more objects we still need to add to the graph.
         num_objects_left = self.num_objects - len(tags) - len(commits)
 
-        def add_vertices(new_vertices, object_type):
+        def add_vertices(graph, new_vertices, object_type):
             attributes = dict()
             attributes["object_type"] = [object_type for x in new_vertices]
-            self._object_graph.add_vertices(new_vertices, attributes=attributes)
+            graph.add_vertices(new_vertices, attributes=attributes)
 
         # Add vertices for any directly referenced trees and blobs.
         num_objects_left -= len(new_trees) + len(new_blobs)
         if len(new_trees) > 0:
-            add_vertices(new_trees, GitObjectType.TREE)
+            add_vertices(self._commit_graph, new_trees, GitObjectType.TREE)
             new_trees = []
         if len(new_blobs) > 0:
-            add_vertices(new_blobs, GitObjectType.BLOB)
+            add_vertices(self._commit_graph, new_blobs, GitObjectType.BLOB)
             new_blobs = []
 
         # Add tags, commits and root trees to the graph
-        add_vertices(list(tags.keys()), GitObjectType.TAG)
-        add_vertices(list(commits.keys()), GitObjectType.COMMIT)
-        add_vertices(list(commits.values()), GitObjectType.TREE)
-        self._object_graph.add_edges(zip(tags.keys(), tags.values()))
-        self._object_graph.add_edges(zip(commits.keys(), commits.values()))
-        self._object_graph.add_edges(commit_edges)
+        add_vertices(self._commit_graph, list(tags.keys()), GitObjectType.TAG)
+        add_vertices(self._commit_graph, list(commits.keys()), GitObjectType.COMMIT)
+        add_vertices(self._commit_graph, list(commits.values()), GitObjectType.TREE)
+        self._commit_graph.add_edges(zip(tags.keys(), tags.values()))
+        self._commit_graph.add_edges(zip(commits.keys(), commits.values()))
+        self._commit_graph.add_edges(commit_edges)
 
-        # Populate the graph with trees and blobs
+        # Create per-commit tree graphs populated with subtrees and blobs
         new_edges = []
         submodule_mode = stat.S_IFDIR | stat.S_IFLNK
         i = 0
@@ -871,10 +912,27 @@ class GitLoader(BaseGitLoader):
         num_tree_entries = 0
         logger.info("Adding trees and blobs to the graph...")
         for commit_hash, tree_hash in commits.items():
+            if tree_hash in self._tree_graphs.keys():
+                # Keep recently used trees cached in memory.
+                self._tree_graphs.move_to_end(tree_hash, last=False)
+                continue
+            if tree_hash in self._swapped_graphs.keys():
+                continue
+            if len(self._tree_graphs) >= self._max_trees_in_mem:
+                (other_tree_hash, other_tree_graph) = self._tree_graphs.popitem(
+                    last=False
+                )
+                swapper = GraphSwapper(other_tree_graph, self._swapfile)
+                self._swapped_graphs[other_tree_hash] = swapper
+                swapper.swap_out()
+            tree_graph = Graph(directed=True)
+            self._tree_graphs[tree_hash] = tree_graph
+            self._tree_graphs.move_to_end(tree_hash, last=False)
             # Show some debug progress output for very large datasets
             if i > 0 and i % 10000 == 0:
                 logger.debug(f"Adding trees and blobs: {i} commits processed...")
             i = i + 1
+            new_trees.append(tree_hash)
             subtrees = [tree_hash]
             while len(subtrees) > 0:
                 tree_hash = subtrees.pop(0)
@@ -906,16 +964,12 @@ class GitLoader(BaseGitLoader):
                 # add new vertices and edges in batches for performance reasons
                 if len(new_trees) + len(new_blobs) > 100000 or len(new_edges) > 1000000:
                     if len(new_trees) > 0:
-                        add_vertices(new_trees, GitObjectType.TREE)
+                        add_vertices(tree_graph, new_trees, GitObjectType.TREE)
                     if len(new_blobs) > 0:
-                        add_vertices(new_blobs, GitObjectType.BLOB)
+                        add_vertices(tree_graph, new_blobs, GitObjectType.BLOB)
                     if len(new_edges) > 0:
                         num_tree_entries += len(new_edges)
-                        logger.debug(
-                            f"Adding {len(new_edges)} tree entries to the graph "
-                            f"({num_tree_entries} entries total)"
-                        )
-                        self._object_graph.add_edges(new_edges)
+                        tree_graph.add_edges(new_edges)
                     num_objects_found += len(new_trees) + len(new_blobs)
                     logger.debug(
                         f"Added {int((num_objects_found * 100) / num_objects_left)}% "
@@ -925,18 +979,14 @@ class GitLoader(BaseGitLoader):
                     new_blobs = []
                     new_edges = []
 
-        num_objects_found += len(new_trees) + len(new_blobs)
-        if len(new_trees) > 0:
-            add_vertices(new_trees, GitObjectType.TREE)
-        if len(new_blobs) > 0:
-            add_vertices(new_blobs, GitObjectType.BLOB)
-        if len(new_edges) > 0:
-            num_tree_entries += len(new_edges)
-            logger.debug(
-                f"Adding {len(new_edges)} tree entries to the graph "
-                f"({num_tree_entries} entries total)"
-            )
-            self._object_graph.add_edges(new_edges)
+            num_objects_found += len(new_trees) + len(new_blobs)
+            if len(new_trees) > 0:
+                add_vertices(tree_graph, new_trees, GitObjectType.TREE)
+            if len(new_blobs) > 0:
+                add_vertices(tree_graph, new_blobs, GitObjectType.BLOB)
+            if len(new_edges) > 0:
+                num_tree_entries += len(new_edges)
+                tree_graph.add_edges(new_edges)
 
         logger.info(
             f"Added {int((num_objects_found * 100) / num_objects_left)}% "
@@ -971,7 +1021,7 @@ class GitLoader(BaseGitLoader):
         assert self.origin
 
         # No object graph was created if the pack file was empty.
-        if not hasattr(self, "_object_graph"):
+        if not hasattr(self, "_commit_graph"):
             logger.debug("No objects to load")
             return
 
@@ -1004,108 +1054,127 @@ class GitLoader(BaseGitLoader):
                     "fetched pack file nor in local heads nor in the archive"
                 )
 
-        def get_successors(v, object_type=None):
-            vertices = [v]
-            while len(vertices) > 0:
-                v = vertices.pop(0)
-                for s in self._object_graph.successors(v):
-                    if object_type is not None:
-                        if self._object_graph.vs[s]["object_type"] != object_type:
-                            continue
-                    if s not in vertices:
-                        vertices.append(s)
-                    yield (
-                        self._object_graph.vs[s]["name"],
-                        self._object_graph.vs[s]["object_type"],
-                    )
-
-        def get_neighbors(v, object_type=None):
-            for s in self._object_graph.neighbors(v):
+        def get_successors(graph, v, object_type=None):
+            for s in graph.successors(v):
                 if object_type is not None:
-                    if self._object_graph.vs[s]["object_type"] != object_type:
+                    if graph.vs[s]["object_type"] != object_type:
                         continue
-                yield (
-                    self._object_graph.vs[s]["name"],
-                    self._object_graph.vs[s]["object_type"],
-                )
+                yield (s, graph.vs[s]["name"], graph.vs[s]["object_type"])
 
-        try:
-            blob_vertices = self._object_graph.vs.select(object_type=GitObjectType.BLOB)
-        except KeyError:
-            missing_contents = set()
-        else:
-            missing_contents = set(
-                self.storage.content_missing_per_sha1_git(blob_vertices["name"])
-            )
-
-        try:
-            tree_vertices = self._object_graph.vs.select(object_type=GitObjectType.TREE)
-        except KeyError:
-            tree_vertices = set()
+        def get_neighbors(graph, v, object_type=None):
+            for s in graph.neighbors(v):
+                if object_type is not None:
+                    if graph.vs[s]["object_type"] != object_type:
+                        continue
+                yield (graph.vs[s]["name"], graph.vs[s]["object_type"])
 
+        def iter_tree_graphs(root_tree_vertices):
+            for t in root_tree_vertices:
+                tree_hash = t["name"]
+                try:
+                    tree_graph = self._tree_graphs[tree_hash]
+                except KeyError:
+                    swapper = self._swapped_graphs[tree_hash]
+                    tree_graph = swapper.swap_in()
+                yield (tree_hash, tree_graph)
+
+        # Load the set of all blob IDs found in the pack file.
+        root_tree_vertices = self._commit_graph.vs.select(
+            object_type=GitObjectType.TREE
+        )
+        blob_hashes: Set[bytes] = set()
+        for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+            try:
+                blob_vertices = tree_graph.vs.select(object_type=GitObjectType.BLOB)
+                blob_hashes.update(blob_vertices["name"])
+            except KeyError:
+                continue
+
+        # Find out which blobs are missing from the archive.
+        missing_contents = set(
+            self.storage.content_missing_per_sha1_git(list(blob_hashes))
+        )
         self.log.debug(
             "Number of packed blobs that are missing in storage: "
-            f"{len(missing_contents)} of {len(blob_vertices['name'])} packed blobs total"
+            f"{len(missing_contents)} of {len(blob_hashes)} packed blobs total"
         )
-        tree_hashes: List[bytes] = []
-        if len(missing_contents) == len(blob_vertices["name"]):
+
+        missing_directories: Set[bytes] = set()
+        tree_hashes: Set[bytes] = set()
+        if len(missing_contents) == len(blob_hashes):
             # If all blobs are missing then all trees are missing, too.
-            tree_hashes = [hashutil.hash_to_bytes(t["name"]) for t in tree_vertices]
-            missing_directories = set(tree_hashes)
+            for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+                missing_directories.add(hashutil.hash_to_bytes(root_hash))
+                try:
+                    tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE)
+                    missing_directories.update(
+                        map(hashutil.hash_to_bytes, tree_vertices["name"])
+                    )
+                except KeyError:
+                    continue
             self.log.debug(
                 "Number of packed trees considered missing by implication: "
                 f"{len(missing_directories)}"
             )
         elif len(missing_contents) > 0:
-            missing_directories = set()
+            # If a subset of blobs is missing then a subset of trees are missing, too.
             self.log.debug(
-                f"Searching {len(tree_vertices)} packed trees for trees which "
-                "depend on missing packed blobs"
+                "Searching for packed trees which are missing from the archive "
             )
-            nsearched = 0
-            for t in tree_vertices:
-                tree_hash = t["name"]
-                nsearched += 1
-                if (nsearched % 5000) == 0:
-                    self.log.debug(
-                        f"Searched {int((nsearched * 100) / len(tree_vertices))}% "
-                        f"of {len(tree_vertices)} packed trees..."
-                    )
-                if tree_hash in missing_directories:
-                    continue
-                have_dep = False
-                for dep_hash, dep_type in get_successors(t):
-                    have_dep = True
-                    if dep_type == GitObjectType.BLOB and dep_hash in missing_contents:
-                        # We can infer that the tree is also missing.
-                        missing_directories.add(tree_hash)
-                        break
-                if not have_dep:
-                    # An empty tree has no dependencies. Determine if it is missing.
-                    tree_hashes = [hashutil.hash_to_bytes(tree_hash)]
-                    missing_empty_tree = set(
-                        self.storage.directory_missing(tree_hashes)
-                    )
-                    if len(missing_empty_tree):
-                        missing_directories.add(tree_hash)
-            self.log.debug(
-                f"Searched {int((nsearched * 100) / len(tree_vertices))}% "
-                f"of {len(tree_vertices)} packed trees..."
-            )
+            for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+                t = tree_graph.vs.find(root_hash)
+                subtrees = [(t, root_hash)]
+                while len(subtrees) > 0:
+                    (t, tree_hash) = subtrees.pop(0)
+                    if hashutil.hash_to_bytes(tree_hash) in missing_directories:
+                        continue
+                    is_missing = False
+                    for s, dep_hash, dep_type in get_successors(tree_graph, t):
+                        if dep_type == GitObjectType.TREE:
+                            if s not in subtrees:
+                                subtrees.append((s, dep_hash))
+                            continue
+                        if (
+                            dep_type == GitObjectType.BLOB
+                            and dep_hash in missing_contents
+                        ):
+                            # Content this tree depends on is missing.
+                            # We can infer that the tree itself is missing.
+                            missing_directories.add(hashutil.hash_to_bytes(tree_hash))
+                            is_missing = True
+                            break
+                    if not is_missing:
+                        # This tree is either empty or has no known-missing dependencies.
+                        # Determine if it is missing by searching the archive.
+                        missing_tree = set(
+                            self.storage.directory_missing(
+                                [hashutil.hash_to_bytes(tree_hash)]
+                            )
+                        )
+                        if len(missing_tree):
+                            missing_directories.add(hashutil.hash_to_bytes(tree_hash))
             self.log.debug(
-                "Number of packed trees considered missing by implication: "
-                f"{len(missing_directories)}"
+                f"Number of packed trees found missing: {len(missing_directories)}"
             )
         else:
-            tree_hashes = [hashutil.hash_to_bytes(t["name"]) for t in tree_vertices]
-            missing_directories = set(self.storage.directory_missing(tree_hashes))
+            # All blobs are present. Any of these blobs might have been added
+            # to the archive via imports of other origins. We must query the
+            # archive for missing trees specific to the origin we are importing.
+            for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+                tree_hashes.add(root_hash)
+                try:
+                    tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE)
+                    tree_hashes.add(tree_vertices["name"])
+                except KeyError:
+                    continue
+            missing_directories = set(self.storage.directory_missing(list(tree_hashes)))
             self.log.debug(
                 "Number of missing trees according to archive query: "
                 f"{len(missing_directories)}"
             )
 
         try:
-            commit_vertices = self._object_graph.vs.select(
+            commit_vertices = self._commit_graph.vs.select(
                 object_type=GitObjectType.COMMIT
             )
         except KeyError:
@@ -1126,8 +1195,10 @@ class GitLoader(BaseGitLoader):
                     )
                 if commit_hash in missing_revisions:
                     continue
-                for dep_hash, dep_type in get_neighbors(c, GitObjectType.TREE):
-                    if dep_hash in missing_contents or dep_hash in missing_directories:
+                for dep_hash, dep_type in get_neighbors(
+                    self._commit_graph, c, GitObjectType.TREE
+                ):
+                    if dep_hash in missing_directories:
                         # We can infer that the commit is also missing.
                         missing_revisions.add(commit_hash)
                         break
@@ -1166,7 +1237,7 @@ class GitLoader(BaseGitLoader):
         # XXX This could result in tags pointing at commits which are
         # missing from the archive.
         try:
-            tag_vertices = self._object_graph.vs.select(object_type=GitObjectType.TAG)
+            tag_vertices = self._commit_graph.vs.select(object_type=GitObjectType.TAG)
             tag_hashes = [t["name"] for t in tag_vertices]
             missing_releases = set(self.storage.release_missing(tag_hashes))
         except KeyError: