commit - c902a02aaa278d1ecbb79ad2ccb2dcaf6f2523da
commit + caa004fc71ad2f2e0667e7a1957ca06c86e730a0
blob - 2e222d3645cf7f03df9b55de032db11f60632f79
blob + fe7961526b717bebd3dcabc2dc7471fac685007c
--- swh/loader/git/loader.py
+++ swh/loader/git/loader.py
from bitarray import bitarray
import dulwich.client
from dulwich.object_store import ObjectStoreGraphWalker
-from dulwich.objects import Blob, Commit, ShaFile, Tag, Tree, hex_to_sha
+from dulwich.objects import Blob, Commit, ShaFile, Tag, Tree, hex_to_sha, sha_to_hex
from dulwich.pack import Pack, PackData, PackInflater, bisect_find_sha, load_pack_index
from igraph import Graph
import urllib3.util
def make_object_graph(self):
logger.info("Building packed objects graph")
- # Graph of commits, and tags. Always stored in memory.
+ # Graph of commits, and tags.
self._commit_graph = Graph(directed=True)
- # We store trees in an ordered collection of Graphs.
- # Each Graph can be swapped out to disk in order to reduce memory requirements.
- # This collection is ordered to allow for LRU cache behaviour via the methods
- # popitem(last=False) and move_to_end(last=False).
- self._tree_graphs: collections.OrderedDict[bytes, Graph] = (
+ self._tree_deps: collections.OrderedDict[bytes, bitarray] = (
collections.OrderedDict()
)
self._max_trees_in_mem = 100
+
+ # Object type markers.
+ self._blob_types = bitarray(self.num_objects)
+ self._tree_types = bitarray(self.num_objects)
# Graphs which swapped out to disk and will be read back on demand.
# Uses a SpooledTemporaryFile to avoid unnecessary disk I/O for smaller graphs.
been traversed (i.e. already has its vertex added to the graph).
Return None if the object is not present in the pack file.
"""
- n = get_pos_in_index(id_hex)
- if n is None:
+ obj_pos = get_pos_in_index(id_hex)
+ if obj_pos is None:
return None
- return bitstr[n] == 1
+ return bitstr[obj_pos] == 1
# Find all tags, commits and corresponding tree roots in the pack file
tags = {}
self._commit_graph.add_edges(zip(commits.keys(), commits.values()))
self._commit_graph.add_edges(commit_edges)
- # Create per-commit tree graphs populated with subtrees and blobs
- new_edges = []
+ # Create per-commit tree object-reachability bitarrays
submodule_mode = stat.S_IFDIR | stat.S_IFLNK
i = 0
last_p = -1
- num_objects_found = 0
- num_tree_entries = 0
num_commits_total = len(commits.items())
logger.info("Adding trees and blobs to the graph...")
for commit_hash, root_tree_hash in commits.items():
logger.debug(
f"Processing commit {i + 1}: {hashutil.hash_to_bytehex(commit_hash)}"
)
- traversed_objects = bitarray(self.num_objects)
# Show some debug progress output for very large datasets
p = int(i * 100 / num_commits_total)
if p != last_p:
if t is None:
continue
if t is True:
- try:
- # Keep graphs for recently used trees cached in memory.
- self._tree_graphs.move_to_end(root_tree_hash, last=False)
- except KeyError:
- pass
+ # try:
+ # Keep bitarrays for recently used trees cached in memory.
+ # self._tree_deps.move_to_end(root_tree_hash, last=False)
+ # except KeyError:
+ # pass
continue
- if len(self._tree_graphs) >= self._max_trees_in_mem:
- (other_tree_hash, other_tree_graph) = self._tree_graphs.popitem(
- last=False
- )
- swapper = GraphSwapper(other_tree_graph, self._swapfile)
- self._swapped_graphs[other_tree_hash] = swapper
- swapper.swap_out()
- tree_graph = Graph(directed=True)
- self._tree_graphs[root_tree_hash] = tree_graph
- self._tree_graphs.move_to_end(root_tree_hash, last=False)
- new_trees.append(root_tree_hash)
+ # if len(self._tree_deps) >= self._max_trees_in_mem:
+ # (other_tree_hash, other_tree_deps) = self._tree_deps.popitem(
+ # last=False
+ # )
+ # swapper = GraphSwapper(other_tree_graph, self._swapfile)
+ # self._swapped_graphs[other_tree_hash] = swapper
+ # swapper.swap_out()
subtrees = [root_tree_hash]
while len(subtrees) > 0:
tree_hash = subtrees.pop(0)
tree = self.pack[tree_hex]
except KeyError:
continue
+ tree_deps = self._tree_deps[tree_hash] = bitarray(self.num_objects)
+ # self._tree_deps.move_to_end(tree_hash, last=False)
for name, mode, entry_hex in tree.iteritems():
if mode & submodule_mode == submodule_mode:
continue # ignore submodules
t = have_traversed(traversed_objects, entry_hex)
if t is None:
continue # not present in pack file
- entry_hash = hashutil.bytehex_to_hash(entry_hex)
- new_edges.append((tree_hash, entry_hash))
+ obj_pos = get_pos_in_index(entry_hex)
+ tree_deps[obj_pos] = 1
if t is False:
if mode & stat.S_IFDIR:
- new_trees.append(entry_hash)
+ entry_hash = hashutil.bytehex_to_hash(entry_hex)
subtrees.append(entry_hash)
+ self._tree_types[obj_pos] = 1
else:
- new_blobs.append(entry_hash)
- mark_as_traversed(traversed_objects, entry_hex)
+ self._blob_types[obj_pos] = 1
- # add new vertices and edges in batches for performance reasons
- if len(new_trees) + len(new_blobs) > 100000 or len(new_edges) > 1000000:
- if len(new_trees) > 0:
- add_vertices(tree_graph, new_trees, GitObjectType.TREE)
- if len(new_blobs) > 0:
- add_vertices(tree_graph, new_blobs, GitObjectType.BLOB)
- if len(new_edges) > 0:
- num_tree_entries += len(new_edges)
- tree_graph.add_edges(new_edges)
- num_objects_found += len(new_trees) + len(new_blobs)
- logger.debug(
- f"Added {int((num_objects_found * 100) / num_objects_left)}% "
- "of packed objects to the graph"
- )
- new_trees = []
- new_blobs = []
- new_edges = []
-
- num_objects_found += len(new_trees) + len(new_blobs)
- if len(new_trees) > 0:
- add_vertices(tree_graph, new_trees, GitObjectType.TREE)
- new_trees = []
- if len(new_blobs) > 0:
- add_vertices(tree_graph, new_blobs, GitObjectType.BLOB)
- new_blobs = []
- if len(new_edges) > 0:
- num_tree_entries += len(new_edges)
- tree_graph.add_edges(new_edges)
- new_edges = []
-
- logger.info(
- f"Added {int((num_objects_found * 100) / num_objects_left)}% "
- "of packed objects to the graph"
- )
logger.info("Packed objects graph has been built")
def save_data(self) -> None:
"fetched pack file nor in local heads nor in the archive"
)
- def get_successors(graph, v, object_type=None):
- for s in graph.successors(v):
- if object_type is not None:
- if graph.vs[s]["object_type"] != object_type:
- continue
- yield (s, graph.vs[s]["name"], graph.vs[s]["object_type"])
+ def get_successors(tree_hash):
+ tree_deps = self._tree_deps[tree_hash]
+ for obj_pos in tree_deps.search(1):
+ dep_hex = sha_to_hex(self.pack.index._unpack_name(obj_pos))
+ dep_hash = hashutil.bytehex_to_hash(dep_hex)
+ if self._blob_types[obj_pos] == 1:
+ obj_type = GitObjectType.BLOB
+ elif self._tree_types[obj_pos] == 1:
+ obj_type = GitObjectType.TREE
+ else:
+ raise TypeError(f"Unexpected object type: {obj_type}")
+ yield (dep_hash, obj_type)
def get_neighbors(graph, v, object_type=None):
for s in graph.neighbors(v):
continue
yield (graph.vs[s]["name"], graph.vs[s]["object_type"])
- def iter_tree_graphs(root_tree_vertices):
+ def iter_tree_deps(root_tree_vertices):
for t in root_tree_vertices:
tree_hash = t["name"]
try:
- tree_graph = self._tree_graphs[tree_hash]
+ tree_deps = self._tree_deps[tree_hash]
except KeyError:
- swapper = self._swapped_graphs[tree_hash]
- tree_graph = swapper.swap_in()
- yield (tree_hash, tree_graph)
+ # swapper = self._swapped_graphs[tree_hash]
+ # tree_graph = swapper.swap_in()
+ raise
+ yield (tree_hash, tree_deps)
+ def get_recursive_tree_deps(tree_deps, obj_type=None):
+ deps = [tree_deps]
+ while len(deps) > 0:
+ tree_deps = deps.pop(0)
+ for obj_pos in tree_deps.search(1):
+ id_hex = sha_to_hex(self.pack.index._unpack_name(obj_pos))
+ id_hash = hashutil.bytehex_to_hash(id_hex)
+ if obj_type is None:
+ yield id_hash
+ elif obj_type == GitObjectType.BLOB:
+ if self._blob_types[obj_pos] == 1:
+ yield id_hash
+ elif obj_type == GitObjectType.TREE:
+ if self._tree_types[obj_pos] == 1:
+ yield id_hash
+ else:
+ raise TypeError(f"Unexpected object type: {obj_type}")
+ if self._tree_types[obj_pos] == 1:
+ sub_deps = self._tree_deps[id_hash]
+ deps.append(sub_deps)
+
# Load the set of all blob IDs found in the pack file.
root_tree_vertices = self._commit_graph.vs.select(
object_type=GitObjectType.TREE
)
blob_hashes: Set[bytes] = set()
- for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+ for root_hash, tree_deps in iter_tree_deps(root_tree_vertices):
try:
- blob_vertices = tree_graph.vs.select(object_type=GitObjectType.BLOB)
- blob_hashes.update(blob_vertices["name"])
+ dep_hashes = get_recursive_tree_deps(
+ tree_deps, obj_type=GitObjectType.BLOB
+ )
+ blob_hashes.update(dep_hashes)
except KeyError:
continue
tree_hashes: Set[bytes] = set()
if len(missing_contents) == len(blob_hashes):
# If all blobs are missing then all trees are missing, too.
- for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+ for root_hash, tree_deps in iter_tree_deps(root_tree_vertices):
missing_directories.add(hashutil.hash_to_bytes(root_hash))
try:
- tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE)
- missing_directories.update(
- map(hashutil.hash_to_bytes, tree_vertices["name"])
+ dep_hashes = get_recursive_tree_deps(
+ tree_deps, obj_type=GitObjectType.TREE
)
+ missing_directories.update(map(hashutil.hash_to_bytes, dep_hashes))
except KeyError:
continue
self.log.debug(
self.log.debug(
"Searching for packed trees which are missing from the archive "
)
- for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
- t = tree_graph.vs.find(root_hash)
- subtrees = [(t, root_hash)]
+ for root_hash, tree_deps in iter_tree_deps(root_tree_vertices):
+ subtrees = [root_hash]
while len(subtrees) > 0:
- (t, tree_hash) = subtrees.pop(0)
+ tree_hash = subtrees.pop(0)
if hashutil.hash_to_bytes(tree_hash) in missing_directories:
continue
is_missing = False
- for s, dep_hash, dep_type in get_successors(tree_graph, t):
+ for dep_hash, dep_type in get_successors(tree_hash):
if dep_type == GitObjectType.TREE:
- if s not in subtrees:
- subtrees.append((s, dep_hash))
- continue
- if (
- dep_type == GitObjectType.BLOB
+ if dep_hash not in subtrees:
+ subtrees.append(dep_hash)
+ elif (
+ not is_missing
+ and dep_type == GitObjectType.BLOB
and dep_hash in missing_contents
):
# Content this tree depends on is missing.
# We can infer that the tree itself is missing.
missing_directories.add(hashutil.hash_to_bytes(tree_hash))
is_missing = True
- break
if not is_missing:
# This tree is either empty or has no known-missing dependencies.
# Determine if it is missing by searching the archive.
# All blobs are present. Any of these blobs might have been added
# to the archive via imports of other origins. We must query the
# archive for missing trees specific to the origin we are importing.
- for root_hash, tree_graph in iter_tree_graphs(root_tree_vertices):
+ for root_hash, tree_deps in iter_tree_deps(root_tree_vertices):
tree_hashes.add(root_hash)
try:
- tree_vertices = tree_graph.vs.select(object_type=GitObjectType.TREE)
- tree_hashes.add(tree_vertices["name"])
+ dep_hashes = get_recursive_tree_deps(
+ tree_deps, obj_type=GitObjectType.TREE
+ )
+ tree_hashes.add(dep_hashes)
except KeyError:
continue
missing_directories = set(self.storage.directory_missing(list(tree_hashes)))