commit - a77c88011f70493a154286133edefe6cf85fec7e
commit + 51b5b0c65c78fcde4d9df304a0783a8ef7fa4b87
blob - 2327196ec86d098b8aa391e9660d795e0cde887b
blob + a206c2ad77ab32e299e9813c9dc43db2ee39f4d0
--- swh/loader/git/loader.py
+++ swh/loader/git/loader.py
from dataclasses import dataclass
import datetime
from enum import Enum
+import io
import json
import logging
import os
heads_logger = logger.getChild("refs")
remote_logger = logger.getChild("remote")
fetch_pack_logger = logger.getChild("fetch_pack")
+
+
+class GraphSwapper:
+
+ def __init__(self, graph: Graph, swapfile: SpooledTemporaryFile):
+ self.graph = graph
+ self.f = swapfile
+ self.offset = -1
+
+ def swap_out(self):
+ if self.offset != -1:
+ return
+ self.f.seek(0, io.SEEK_END)
+ self.offset = self.f.tell()
+ self.graph.write_picklez(self.f)
+ del self.graph
+
+ def swap_in(self):
+ if self.offset == -1:
+ raise ValueError("graph has not been swapped out")
+ self.f.seek(self.offset, io.SEEK_SET)
+ return Graph.Read_Pickelz(self.f)
def split_lines_and_remainder(buf: bytes) -> Tuple[List[bytes], bytes]:
def make_object_graph(self):
logger.info("Building packed objects graph")
- self._object_graph = Graph(directed=True)
+ # Graph of commits, and tags. Always stored in memory.
+ self._commit_graph = Graph(directed=True)
+
+ # We store trees in an ordered collection of Graphs.
+ # Each Graph can be swapped out to disk in order to reduce memory requirements.
+ # This collection is ordered to allow for LRU cache behaviour via the methods
+ # popitem(last=False) and move_to_end(last=False).
+ self._tree_graphs: collections.OrderedDict[bytes, Graph] = (
+ collections.OrderedDict()
+ )
+ self._max_trees_in_mem = 100
+
+ # Graphs which swapped out to disk and will be read back on demand.
+ # Uses a SpooledTemporaryFile to avoid unnecessary disk I/O for smaller graphs.
+ # The swap file is shared among all GraphSwappers which is fine because we are
+ # currently not multi-threaded (should that change we'd need a lock in the
+ # GraphSwapper class which covers I/O sequences involving seek + read/write).
+ self._swapfile = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
+ self._swapped_graphs: Dict[bytes, GraphSwapper] = dict()
def get_pos_in_index(id_hex):
"""
# Calculate how many more objects we still need to add to the graph.
num_objects_left = self.num_objects - len(tags) - len(commits)
- def add_vertices(new_vertices, object_type):
+ def add_vertices(graph, new_vertices, object_type):
attributes = dict()
attributes["object_type"] = [object_type for x in new_vertices]
- self._object_graph.add_vertices(new_vertices, attributes=attributes)
+ graph.add_vertices(new_vertices, attributes=attributes)
# Add vertices for any directly referenced trees and blobs.
num_objects_left -= len(new_trees) + len(new_blobs)
if len(new_trees) > 0:
- add_vertices(new_trees, GitObjectType.TREE)
+ add_vertices(self._commit_graph, new_trees, GitObjectType.TREE)
new_trees = []
if len(new_blobs) > 0:
- add_vertices(new_blobs, GitObjectType.BLOB)
+ add_vertices(self._commit_graph, new_blobs, GitObjectType.BLOB)
new_blobs = []
# Add tags, commits and root trees to the graph
- add_vertices(list(tags.keys()), GitObjectType.TAG)
- add_vertices(list(commits.keys()), GitObjectType.COMMIT)
- add_vertices(list(commits.values()), GitObjectType.TREE)
- self._object_graph.add_edges(zip(tags.keys(), tags.values()))
- self._object_graph.add_edges(zip(commits.keys(), commits.values()))
- self._object_graph.add_edges(commit_edges)
+ add_vertices(self._commit_graph, list(tags.keys()), GitObjectType.TAG)
+ add_vertices(self._commit_graph, list(commits.keys()), GitObjectType.COMMIT)
+ add_vertices(self._commit_graph, list(commits.values()), GitObjectType.TREE)
+ self._commit_graph.add_edges(zip(tags.keys(), tags.values()))
+ self._commit_graph.add_edges(zip(commits.keys(), commits.values()))
+ self._commit_graph.add_edges(commit_edges)
- # Populate the graph with trees and blobs
+ # Create per-commit tree graphs populated with subtrees and blobs
new_edges = []
submodule_mode = stat.S_IFDIR | stat.S_IFLNK
i = 0
num_tree_entries = 0
logger.info("Adding trees and blobs to the graph...")
for commit_hash, tree_hash in commits.items():
+ if tree_hash in self._tree_graphs.keys():
+ # Keep recently used trees cached in memory.
+ self._tree_graphs.move_to_end(tree_hash, last=False)
+ continue
+ if tree_hash in self._swapped_graphs.keys():
+ continue
+ if len(self._tree_graphs) >= self._max_trees_in_mem:
+ (other_tree_hash, other_tree_graph) = self._tree_graphs.popitem(
+ last=False
+ )
+ swapper = GraphSwapper(other_tree_graph, self._swapfile)
+ self._swapped_graphs[other_tree_hash] = swapper
+ swapper.swap_out()
+ tree_graph = Graph(directed=True)
+ self._tree_graphs[tree_hash] = tree_graph
+ self._tree_graphs.move_to_end(tree_hash, last=False)
# Show some debug progress output for very large datasets
if i > 0 and i % 10000 == 0:
logger.debug(f"Adding trees and blobs: {i} commits processed...")
i = i + 1
+ new_trees.append(tree_hash)
subtrees = [tree_hash]
while len(subtrees) > 0:
tree_hash = subtrees.pop(0)
# add new vertices and edges in batches for performance reasons
if len(new_trees) + len(new_blobs) > 100000 or len(new_edges) > 1000000:
if len(new_trees) > 0:
- add_vertices(new_trees, GitObjectType.TREE)
+ add_vertices(tree_graph, new_trees, GitObjectType.TREE)
if len(new_blobs) > 0:
- add_vertices(new_blobs, GitObjectType.BLOB)
+ add_vertices(tree_graph, new_blobs, GitObjectType.BLOB)
if len(new_edges) > 0:
num_tree_entries += len(new_edges)
- logger.debug(
- f"Adding {len(new_edges)} tree entries to the graph "
- f"({num_tree_entries} entries total)"
- )
- self._object_graph.add_edges(new_edges)
+ tree_graph.add_edges(new_edges)
num_objects_found += len(new_trees) + len(new_blobs)
logger.debug(
f"Added {int((num_objects_found * 100) / num_objects_left)}% "
new_blobs = []
new_edges = []
- num_objects_found += len(new_trees) + len(new_blobs)
- if len(new_trees) > 0:
- add_vertices(new_trees, GitObjectType.TREE)
- if len(new_blobs) > 0:
- add_vertices(new_blobs, GitObjectType.BLOB)
- if len(new_edges) > 0:
- num_tree_entries += len(new_edges)
- logger.debug(
- f"Adding {len(new_edges)} tree entries to the graph "
- f"({num_tree_entries} entries total)"
- )
- self._object_graph.add_edges(new_edges)
+ num_objects_found += len(new_trees) + len(new_blobs)
+ if len(new_trees) > 0:
+ add_vertices(tree_graph, new_trees, GitObjectType.TREE)
+ if len(new_blobs) > 0:
+ add_vertices(tree_graph, new_blobs, GitObjectType.BLOB)
+ if len(new_edges) > 0:
+ num_tree_entries += len(new_edges)
+ tree_graph.add_edges(new_edges)
logger.info(
f"Added {int((num_objects_found * 100) / num_objects_left)}% "
assert self.origin
# No object graph was created if the pack file was empty.
- if not hasattr(self, "_object_graph"):
+ if not hasattr(self, "_commit_graph"):
logger.debug("No objects to load")
return
"fetched pack file nor in local heads nor in the archive"
)
- def get_successors(v, object_type=None):
- vertices = [v]
- while len(vertices) > 0:
- v = vertices.pop(0)
- for s in self._object_graph.successors(v):
- if object_type is not None:
- if self._object_graph.vs[s]["object_type"] != object_type:
- continue
- if s not in vertices:
- vertices.append(s)
- yield (
- self._object_graph.vs[s]["name"],
- self._object_graph.vs[s]["object_type"],
- )
-
- def get_neighbors(v, object_type=None):
- for s in self._object_graph.neighbors(v):
+ def get_successors(graph, v, object_type=None):
+ for s in graph.successors(v):
if object_type is not None:
- if self._object_graph.vs[s]["object_type"] != object_type:
+ if graph.vs[s]["object_type"] != object_type:
continue
- yield (
- self._object_graph.vs[s]["name"],
- self._object_graph.vs[s]["object_type"],
- )
+ yield (s, graph.vs[s]["name"], graph.vs[s]["object_type"])
- try:
- blob_vertices = self._object_graph.vs.select(object_type=GitObjectType.BLOB)
- except KeyError:
- missing_contents = set()
- else:
- missing_contents = set(
- self.storage.content_missing_per_sha1_git(blob_vertices["name"])
- )
-
- try:
- tree_vertices = self._object_graph.vs.select(object_type=GitObjectType.TREE)
- except KeyError:
- tree_vertices = set()
+ def get_neighbors(graph, v, object_type=None):
+ for s in graph.neighbors(v):
+ if object_type is not None:
+ if graph.vs[s]["object_type"] != object_type:
+ continue
+ yield (graph.vs[s]["name"], graph.vs[s]["object_type"])
+ def iter_tree_graphs(root_tree_vertices):
+ for t in root_tree_vertices:
+ tree_hash = t["name"]
+ try:
+ tree_graph = self._tree_graphs[tree_hash]
+ except KeyError:
+ swapper = self._swapped_graphs[tree_hash]
+ tree_graph = swapper.swap_in()
+ yield (tree_hash, tree_graph)
+
+ # Load the set of all blob IDs found in the pack file.
+ root_tree_vertices = self._commit_graph.vs.select(
+ object_type=GitObjectType.TREE
+ )
+ blob_hashes: Set[bytes] = set()
+ for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+ try:
+ blob_vertices = tree_graph.vs.select(object_type=GitObjectType.BLOB)
+ blob_hashes.update(blob_vertices["name"])
+ except KeyError:
+ continue
+
+ # Find out which blobs are missing from the archive.
+ missing_contents = set(
+ self.storage.content_missing_per_sha1_git(list(blob_hashes))
+ )
self.log.debug(
"Number of packed blobs that are missing in storage: "
- f"{len(missing_contents)} of {len(blob_vertices['name'])} packed blobs total"
+ f"{len(missing_contents)} of {len(blob_hashes)} packed blobs total"
)
- tree_hashes: List[bytes] = []
- if len(missing_contents) == len(blob_vertices["name"]):
+
+ missing_directories: Set[bytes] = set()
+ tree_hashes: Set[bytes] = set()
+ if len(missing_contents) == len(blob_hashes):
# If all blobs are missing then all trees are missing, too.
- tree_hashes = [hashutil.hash_to_bytes(t["name"]) for t in tree_vertices]
- missing_directories = set(tree_hashes)
+ for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+ missing_directories.add(hashutil.hash_to_bytes(root_hash))
+ try:
+ tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE)
+ missing_directories.update(
+ map(hashutil.hash_to_bytes, tree_vertices["name"])
+ )
+ except KeyError:
+ continue
self.log.debug(
"Number of packed trees considered missing by implication: "
f"{len(missing_directories)}"
)
elif len(missing_contents) > 0:
- missing_directories = set()
+ # If a subset of blobs is missing then a subset of trees are missing, too.
self.log.debug(
- f"Searching {len(tree_vertices)} packed trees for trees which "
- "depend on missing packed blobs"
+ "Searching for packed trees which are missing from the archive "
)
- nsearched = 0
- for t in tree_vertices:
- tree_hash = t["name"]
- nsearched += 1
- if (nsearched % 5000) == 0:
- self.log.debug(
- f"Searched {int((nsearched * 100) / len(tree_vertices))}% "
- f"of {len(tree_vertices)} packed trees..."
- )
- if tree_hash in missing_directories:
- continue
- have_dep = False
- for dep_hash, dep_type in get_successors(t):
- have_dep = True
- if dep_type == GitObjectType.BLOB and dep_hash in missing_contents:
- # We can infer that the tree is also missing.
- missing_directories.add(tree_hash)
- break
- if not have_dep:
- # An empty tree has no dependencies. Determine if it is missing.
- tree_hashes = [hashutil.hash_to_bytes(tree_hash)]
- missing_empty_tree = set(
- self.storage.directory_missing(tree_hashes)
- )
- if len(missing_empty_tree):
- missing_directories.add(tree_hash)
- self.log.debug(
- f"Searched {int((nsearched * 100) / len(tree_vertices))}% "
- f"of {len(tree_vertices)} packed trees..."
- )
+ for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+ t = tree_graph.vs.find(root_hash)
+ subtrees = [(t, root_hash)]
+ while len(subtrees) > 0:
+ (t, tree_hash) = subtrees.pop(0)
+ if hashutil.hash_to_bytes(tree_hash) in missing_directories:
+ continue
+ is_missing = False
+ for s, dep_hash, dep_type in get_successors(tree_graph, t):
+ if dep_type == GitObjectType.TREE:
+ if s not in subtrees:
+ subtrees.append((s, dep_hash))
+ continue
+ if (
+ dep_type == GitObjectType.BLOB
+ and dep_hash in missing_contents
+ ):
+ # Content this tree depends on is missing.
+ # We can infer that the tree itself is missing.
+ missing_directories.add(hashutil.hash_to_bytes(tree_hash))
+ is_missing = True
+ break
+ if not is_missing:
+ # This tree is either empty or has no known-missing dependencies.
+ # Determine if it is missing by searching the archive.
+ missing_tree = set(
+ self.storage.directory_missing(
+ [hashutil.hash_to_bytes(tree_hash)]
+ )
+ )
+ if len(missing_tree):
+ missing_directories.add(hashutil.hash_to_bytes(tree_hash))
self.log.debug(
- "Number of packed trees considered missing by implication: "
- f"{len(missing_directories)}"
+ f"Number of packed trees found missing: {len(missing_directories)}"
)
else:
- tree_hashes = [hashutil.hash_to_bytes(t["name"]) for t in tree_vertices]
- missing_directories = set(self.storage.directory_missing(tree_hashes))
+ # All blobs are present. Any of these blobs might have been added
+ # to the archive via imports of other origins. We must query the
+ # archive for missing trees specific to the origin we are importing.
+ for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+ tree_hashes.add(root_hash)
+ try:
+ tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE)
+ tree_hashes.add(tree_vertices["name"])
+ except KeyError:
+ continue
+ missing_directories = set(self.storage.directory_missing(list(tree_hashes)))
self.log.debug(
"Number of missing trees according to archive query: "
f"{len(missing_directories)}"
)
try:
- commit_vertices = self._object_graph.vs.select(
+ commit_vertices = self._commit_graph.vs.select(
object_type=GitObjectType.COMMIT
)
except KeyError:
)
if commit_hash in missing_revisions:
continue
- for dep_hash, dep_type in get_neighbors(c, GitObjectType.TREE):
- if dep_hash in missing_contents or dep_hash in missing_directories:
+ for dep_hash, dep_type in get_neighbors(
+ self._commit_graph, c, GitObjectType.TREE
+ ):
+ if dep_hash in missing_directories:
# We can infer that the commit is also missing.
missing_revisions.add(commit_hash)
break
# XXX This could result in tags pointing at commits which are
# missing from the archive.
try:
- tag_vertices = self._object_graph.vs.select(object_type=GitObjectType.TAG)
+ tag_vertices = self._commit_graph.vs.select(object_type=GitObjectType.TAG)
tag_hashes = [t["name"] for t in tag_vertices]
missing_releases = set(self.storage.release_missing(tag_hashes))
except KeyError: