commit b1e497a6d0bae8ab541b45d1ffcd4b58be2c5b25 from: Stefan Sperling date: Mon Oct 21 10:12:37 2024 UTC swap tree entry bitarrays out to disk if needed commit - 568a6ca9ddd97a61bce58ce17be126c949f0c071 commit + b1e497a6d0bae8ab541b45d1ffcd4b58be2c5b25 blob - f61fcf552a5b8c5d03e3eb6fecfbc80f27028991 blob + e78ebf760ea71e81d5e57ff1b70f68d2b3d7b405 --- swh/loader/git/loader.py +++ swh/loader/git/loader.py @@ -75,27 +75,27 @@ remote_logger = logger.getChild("remote") fetch_pack_logger = logger.getChild("fetch_pack") -class GraphSwapper: +class BitarraySwapper: - def __init__(self, graph: Graph, swapfile: SpooledTemporaryFile): - self.graph = graph + def __init__(self, array: bytes, swapfile: SpooledTemporaryFile): + self.bitarray = array # already compressed with sc_encode() self.f = swapfile self.offset = -1 - self.graph_storage_format = "pickle" + self.len = -1 def swap_out(self): if self.offset != -1: return self.f.seek(0, io.SEEK_END) self.offset = self.f.tell() - self.graph.write(self.f, format=self.graph_storage_format) - del self.graph + self.len = self.f.write(self.bitarray) + del self.bitarray def swap_in(self): if self.offset == -1: - raise ValueError("graph has not been swapped out") + raise ValueError("bitarray has not been swapped out") self.f.seek(self.offset, io.SEEK_SET) - return Graph.Read(self.f, format=self.graph_storage_format) + return self.f.read(self.len) # caller will decompress with sc_decode() def split_lines_and_remainder(buf: bytes) -> Tuple[List[bytes], bytes]: @@ -749,10 +749,13 @@ class GitLoader(BaseGitLoader): # Graph of commits, and tags. self._commit_graph = Graph(directed=True) - self._tree_deps: collections.OrderedDict[bytes, bitarray] = ( + # Bitarrays recording direct dependencies of tree objects. + # The bitarray are stored as bytes rather than bitarray objects, + # compressed with sc_encode() to save space. + self._tree_deps: collections.OrderedDict[bytes, bytes] = ( collections.OrderedDict() ) - self._max_trees_in_mem = 100 + self._max_trees_in_mem = 10000 # Object type markers. self._blob_types = bitarray(self.num_objects) @@ -760,11 +763,11 @@ class GitLoader(BaseGitLoader): # Graphs which swapped out to disk and will be read back on demand. # Uses a SpooledTemporaryFile to avoid unnecessary disk I/O for smaller graphs. - # The swap file is shared among all GraphSwappers which is fine because we are + # The swap file is shared among all BitarraySwappers which is fine because we are # currently not multi-threaded (should that change we'd need a lock in the - # GraphSwapper class which covers I/O sequences involving seek + read/write). + # BitarraySwapper class which covers I/O sequences involving seek + read/write). self._swapfile = SpooledTemporaryFile(max_size=self.temp_file_cutoff) - self._swapped_graphs: Dict[bytes, GraphSwapper] = dict() + self._swapped_tree_deps: Dict[bytes, BitarraySwapper] = dict() def get_pos_in_index(id_hex): """ @@ -922,22 +925,13 @@ class GitLoader(BaseGitLoader): i = i + 1 tree_hex = hashutil.hash_to_bytehex(root_tree_hash) t = have_traversed(traversed_objects, tree_hex) - if t is None: - continue - if t is True: - # try: - # Keep bitarrays for recently used trees cached in memory. - # self._tree_deps.move_to_end(root_tree_hash, last=False) - # except KeyError: - # pass + if t is None or t is True: continue - # if len(self._tree_deps) >= self._max_trees_in_mem: - # (other_tree_hash, other_tree_deps) = self._tree_deps.popitem( - # last=False - # ) - # swapper = GraphSwapper(other_tree_graph, self._swapfile) - # self._swapped_graphs[other_tree_hash] = swapper - # swapper.swap_out() + if len(self._tree_deps) >= self._max_trees_in_mem: + (other_tree_hash, other_tree_deps) = self._tree_deps.popitem(last=False) + swapper = BitarraySwapper(other_tree_deps, self._swapfile) + self._swapped_tree_deps[other_tree_hash] = swapper + swapper.swap_out() subtrees = [root_tree_hash] while len(subtrees) > 0: tree_hash = subtrees.pop(0) @@ -951,7 +945,6 @@ class GitLoader(BaseGitLoader): except KeyError: continue tree_deps = bitarray(self.num_objects) - # self._tree_deps.move_to_end(tree_hash, last=False) for name, mode, entry_hex in tree.iteritems(): if mode & submodule_mode == submodule_mode: continue # ignore submodules @@ -968,6 +961,7 @@ class GitLoader(BaseGitLoader): else: self._blob_types[obj_pos] = 1 self._tree_deps[tree_hash] = sc_encode(tree_deps) + self._tree_deps.move_to_end(tree_hash, last=False) logger.info("Packed objects graph has been built") @@ -1029,9 +1023,19 @@ class GitLoader(BaseGitLoader): f"{ref_name!r}: {ref_object!r} was found in neither the " "fetched pack file nor in local heads nor in the archive" ) + + def get_tree_deps(tree_hash): + try: + tree_deps = sc_decode(self._tree_deps[tree_hash]) + # Keep bitarrays for recently used trees cached in memory. + self._tree_deps.move_to_end(tree_hash, last=False) + except KeyError: + swapper = self._swapped_tree_deps[tree_hash] + tree_deps = sc_decode(swapper.swap_in()) + return tree_deps def get_successors(tree_hash): - tree_deps = sc_decode(self._tree_deps[tree_hash]) + tree_deps = get_tree_deps(tree_hash) for obj_pos in tree_deps.search(1): dep_hex = sha_to_hex(self.pack.index._unpack_name(obj_pos)) dep_hash = hashutil.bytehex_to_hash(dep_hex) @@ -1053,12 +1057,7 @@ class GitLoader(BaseGitLoader): def iter_tree_deps(root_tree_vertices): for t in root_tree_vertices: tree_hash = t["name"] - try: - tree_deps = sc_decode(self._tree_deps[tree_hash]) - except KeyError: - # swapper = self._swapped_graphs[tree_hash] - # tree_graph = swapper.swap_in() - raise + tree_deps = get_tree_deps(tree_hash) yield (tree_hash, tree_deps) def get_recursive_tree_deps(tree_deps, obj_type=None): @@ -1079,7 +1078,7 @@ class GitLoader(BaseGitLoader): else: raise TypeError(f"Unexpected object type: {obj_type}") if self._tree_types[obj_pos] == 1: - sub_deps = sc_decode(self._tree_deps[id_hash]) + sub_deps = get_tree_deps(id_hash) deps.append(sub_deps) # Load the set of all blob IDs found in the pack file.