commit 6fa8d893b2e007b48d78e75d0f067fcb48c27633 from: Jelmer Vernooij via: GitHub date: Sun Jan 15 21:15:38 2023 UTC Merge pull request #1118 from jelmer/pack-fixes-2 Various other pack fixes: commit - c18863d65ee8d1e668f14bea3e8a48978b364353 commit + 6fa8d893b2e007b48d78e75d0f067fcb48c27633 blob - 19122b8b7f23662036bce5954a323c9df203e90f blob + 02ca79adbc6f7f59941548d53a5a530b17806e04 --- dulwich/client.py +++ dulwich/client.py @@ -497,27 +497,17 @@ class _v1ReceivePackHeader: yield None -def _read_side_band64k_data(pkt_seq: Iterable[bytes], channel_callbacks: Dict[int, Callable[[bytes], None]]) -> None: +def _read_side_band64k_data(pkt_seq: Iterable[bytes]) -> Iterator[Tuple[int, bytes]]: """Read per-channel data. This requires the side-band-64k capability. Args: pkt_seq: Sequence of packets to read - channel_callbacks: Dictionary mapping channels to packet - handlers to use. None for a callback discards channel data. """ for pkt in pkt_seq: channel = ord(pkt[:1]) - pkt = pkt[1:] - try: - cb = channel_callbacks[channel] - except KeyError as exc: - raise AssertionError( - "Invalid sideband channel %d" % channel) from exc - else: - if cb is not None: - cb(pkt) + yield channel, pkt[1:] def _handle_upload_pack_head( @@ -630,13 +620,14 @@ def _handle_upload_pack_tail( def progress(x): pass - _read_side_band64k_data( - proto.read_pkt_seq(), - { - SIDE_BAND_CHANNEL_DATA: pack_data, - SIDE_BAND_CHANNEL_PROGRESS: progress, - }, - ) + for chan, data in _read_side_band64k_data(proto.read_pkt_seq()): + if chan == SIDE_BAND_CHANNEL_DATA: + pack_data(data) + elif chan == SIDE_BAND_CHANNEL_PROGRESS: + progress(data) + else: + raise AssertionError( + "Invalid sideband channel %d" % chan) else: while True: data = proto.read(rbufsize) @@ -821,15 +812,13 @@ class GitClient: if determine_wants is None: determine_wants = target.object_store.determine_wants_all if CAPABILITY_THIN_PACK in self._fetch_capabilities: - # TODO(jelmer): Avoid reading entire file into memory and - # only processing it after the whole file has been fetched. from tempfile import SpooledTemporaryFile f: IO[bytes] = SpooledTemporaryFile() def commit(): if f.tell(): f.seek(0) - target.object_store.add_thin_pack(f.read, None) + target.object_store.add_thin_pack(f.read, None, progress=progress) f.close() def abort(): @@ -935,12 +924,17 @@ class GitClient: def progress(x): pass - channel_callbacks = {2: progress} if CAPABILITY_REPORT_STATUS in capabilities: - channel_callbacks[1] = PktLineParser( - self._report_status_parser.handle_packet - ).parse - _read_side_band64k_data(proto.read_pkt_seq(), channel_callbacks) + pktline_parser = PktLineParser(self._report_status_parser.handle_packet) + for chan, data in _read_side_band64k_data(proto.read_pkt_seq()): + if chan == SIDE_BAND_CHANNEL_DATA: + if CAPABILITY_REPORT_STATUS in capabilities: + pktline_parser.parse(data) + elif chan == SIDE_BAND_CHANNEL_PROGRESS: + progress(data) + else: + raise AssertionError( + "Invalid sideband channel %d" % chan) else: if CAPABILITY_REPORT_STATUS in capabilities: for pkt in proto.read_pkt_seq(): @@ -1244,14 +1238,15 @@ class TraditionalGitClient(GitClient): ret = proto.read_pkt_line() if ret is not None: raise AssertionError("expected pkt tail") - _read_side_band64k_data( - proto.read_pkt_seq(), - { - SIDE_BAND_CHANNEL_DATA: write_data, - SIDE_BAND_CHANNEL_PROGRESS: progress, - SIDE_BAND_CHANNEL_FATAL: write_error, - }, - ) + for chan, data in _read_side_band64k_data(proto.read_pkt_seq()): + if chan == SIDE_BAND_CHANNEL_DATA: + write_data(data) + elif chan == SIDE_BAND_CHANNEL_PROGRESS: + progress(data) + elif chan == SIDE_BAND_CHANNEL_FATAL: + write_error(data) + else: + raise AssertionError("Invalid sideband channel %d" % chan) class TCPGitClient(TraditionalGitClient): blob - 9201aefc4bdd2ebbc00dd0cc2a47bad55083510c blob + a55f157decb93b8afadb4d07fcfac1d2495b2d76 --- dulwich/object_store.py +++ dulwich/object_store.py @@ -804,7 +804,7 @@ class DiskObjectStore(PackBasedObjectStore): suffix = suffix.decode("ascii") return os.path.join(self.pack_dir, "pack-" + suffix) - def _complete_thin_pack(self, f, path, copier, indexer): + def _complete_thin_pack(self, f, path, copier, indexer, progress=None): """Move a specific file containing a pack into the pack directory. Note: The file should be on the same file system as the @@ -816,36 +816,49 @@ class DiskObjectStore(PackBasedObjectStore): copier: A PackStreamCopier to use for writing pack data. indexer: A PackIndexer for indexing the pack. """ - entries = list(indexer) - - # Update the header with the new number of objects. - f.seek(0) - write_pack_header(f.write, len(entries) + len(indexer.ext_refs())) + entries = [] + for i, entry in enumerate(indexer): + if progress is not None: + progress(("generating index: %d\r" % i).encode('ascii')) + entries.append(entry) - # Must flush before reading (http://bugs.python.org/issue3207) - f.flush() + ext_refs = indexer.ext_refs() + + if ext_refs: + # Update the header with the new number of objects. + f.seek(0) + write_pack_header(f.write, len(entries) + len(ext_refs)) - # Rescan the rest of the pack, computing the SHA with the new header. - new_sha = compute_file_sha(f, end_ofs=-20) + # Must flush before reading (http://bugs.python.org/issue3207) + f.flush() - # Must reposition before writing (http://bugs.python.org/issue3207) - f.seek(0, os.SEEK_CUR) + # Rescan the rest of the pack, computing the SHA with the new header. + new_sha = compute_file_sha(f, end_ofs=-20) - # Complete the pack. - for ext_sha in indexer.ext_refs(): - assert len(ext_sha) == 20 - type_num, data = self.get_raw(ext_sha) - offset = f.tell() - crc32 = write_pack_object( - f.write, - type_num, - data, - sha=new_sha, - compression_level=self.pack_compression_level, - ) - entries.append((ext_sha, offset, crc32)) - pack_sha = new_sha.digest() - f.write(pack_sha) + # Must reposition before writing (http://bugs.python.org/issue3207) + f.seek(0, os.SEEK_CUR) + + # Complete the pack. + for i, ext_sha in enumerate(ext_refs): + if progress is not None: + progress(("writing extra base objects: %d/%d\r" % (i, len(ext_refs))).encode("ascii")) + assert len(ext_sha) == 20 + type_num, data = self.get_raw(ext_sha) + offset = f.tell() + crc32 = write_pack_object( + f.write, + type_num, + data, + sha=new_sha, + compression_level=self.pack_compression_level, + ) + entries.append((ext_sha, offset, crc32)) + pack_sha = new_sha.digest() + f.write(pack_sha) + else: + f.seek(-20, os.SEEK_END) + pack_sha = f.read(20) + f.close() # Move the pack in. @@ -875,7 +888,7 @@ class DiskObjectStore(PackBasedObjectStore): self._add_cached_pack(pack_base_name, final_pack) return final_pack - def add_thin_pack(self, read_all, read_some): + def add_thin_pack(self, read_all, read_some, progress=None): """Add a new thin pack to this object store. Thin packs are packs that contain deltas with parents that exist @@ -897,8 +910,8 @@ class DiskObjectStore(PackBasedObjectStore): os.chmod(path, PACK_MODE) indexer = PackIndexer(f, resolve_ext_ref=self.get_raw) copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer) - copier.verify() - return self._complete_thin_pack(f, path, copier, indexer) + copier.verify(progress=progress) + return self._complete_thin_pack(f, path, copier, indexer, progress=progress) def move_in_pack(self, path): """Move a specific file containing a pack into the pack directory. @@ -1076,7 +1089,7 @@ class MemoryObjectStore(BaseObjectStore): return f, commit, abort - def _complete_thin_pack(self, f, indexer): + def _complete_thin_pack(self, f, indexer, progress=None): """Complete a thin pack by adding external references. Args: @@ -1085,22 +1098,25 @@ class MemoryObjectStore(BaseObjectStore): """ entries = list(indexer) - # Update the header with the new number of objects. - f.seek(0) - write_pack_header(f.write, len(entries) + len(indexer.ext_refs())) + ext_refs = indexer.ext_refs() - # Rescan the rest of the pack, computing the SHA with the new header. - new_sha = compute_file_sha(f, end_ofs=-20) + if ext_refs: + # Update the header with the new number of objects. + f.seek(0) + write_pack_header(f.write, len(entries) + len(ext_refs)) - # Complete the pack. - for ext_sha in indexer.ext_refs(): - assert len(ext_sha) == 20 - type_num, data = self.get_raw(ext_sha) - write_pack_object(f.write, type_num, data, sha=new_sha) - pack_sha = new_sha.digest() - f.write(pack_sha) + # Rescan the rest of the pack, computing the SHA with the new header. + new_sha = compute_file_sha(f, end_ofs=-20) - def add_thin_pack(self, read_all, read_some): + # Complete the pack. + for ext_sha in indexer.ext_refs(): + assert len(ext_sha) == 20 + type_num, data = self.get_raw(ext_sha) + write_pack_object(f.write, type_num, data, sha=new_sha) + pack_sha = new_sha.digest() + f.write(pack_sha) + + def add_thin_pack(self, read_all, read_some, progress=None): """Add a new thin pack to this object store. Thin packs are packs that contain deltas with parents that exist @@ -1117,8 +1133,8 @@ class MemoryObjectStore(BaseObjectStore): try: indexer = PackIndexer(f, resolve_ext_ref=self.get_raw) copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer) - copier.verify() - self._complete_thin_pack(f, indexer) + copier.verify(progress=progress) + self._complete_thin_pack(f, indexer, progress=progress) except BaseException: abort() raise blob - 9dcfa31e67776df61b6c2d09f37f968c35955084 blob + cf7cdd6f77a86dc371e7ef123ebb5ee7efc6032a --- dulwich/pack.py +++ dulwich/pack.py @@ -1069,18 +1069,19 @@ class PackStreamCopier(PackStreamReader): self.outfile.write(data) return data - def verify(self): + def verify(self, progress=None): """Verify a pack stream and write it to the output file. See PackStreamReader.iterobjects for a list of exceptions this may throw. """ - if self._delta_iter: - for unpacked in self.read_objects(): + for i, unpacked in enumerate(self.read_objects()): + if self._delta_iter: self._delta_iter.record(unpacked) - else: - for _ in self.read_objects(): - pass + if progress is not None: + progress(("copying pack entries: %d/%d\r" % (i, len(self))).encode('ascii')) + if progress is not None: + progress(("copied %d pack entries\n" % i).encode('ascii')) def obj_sha(type, chunks): @@ -2352,7 +2353,7 @@ class Pack: def check_length_and_checksum(self) -> None: """Sanity check the length and checksum of the pack index and data.""" - assert len(self.index) == len(self.data) + assert len(self.index) == len(self.data), f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" idx_stored_checksum = self.index.get_pack_checksum() data_stored_checksum = self.data.get_stored_checksum() if idx_stored_checksum != data_stored_checksum: