Commit Diff


commit - c18863d65ee8d1e668f14bea3e8a48978b364353
commit + 6fa8d893b2e007b48d78e75d0f067fcb48c27633
blob - 19122b8b7f23662036bce5954a323c9df203e90f
blob + 02ca79adbc6f7f59941548d53a5a530b17806e04
--- dulwich/client.py
+++ dulwich/client.py
@@ -497,27 +497,17 @@ class _v1ReceivePackHeader:
         yield None
 
 
-def _read_side_band64k_data(pkt_seq: Iterable[bytes], channel_callbacks: Dict[int, Callable[[bytes], None]]) -> None:
+def _read_side_band64k_data(pkt_seq: Iterable[bytes]) -> Iterator[Tuple[int, bytes]]:
     """Read per-channel data.
 
     This requires the side-band-64k capability.
 
     Args:
       pkt_seq: Sequence of packets to read
-      channel_callbacks: Dictionary mapping channels to packet
-        handlers to use. None for a callback discards channel data.
     """
     for pkt in pkt_seq:
         channel = ord(pkt[:1])
-        pkt = pkt[1:]
-        try:
-            cb = channel_callbacks[channel]
-        except KeyError as exc:
-            raise AssertionError(
-                "Invalid sideband channel %d" % channel) from exc
-        else:
-            if cb is not None:
-                cb(pkt)
+        yield channel, pkt[1:]
 
 
 def _handle_upload_pack_head(
@@ -630,13 +620,14 @@ def _handle_upload_pack_tail(
             def progress(x):
                 pass
 
-        _read_side_band64k_data(
-            proto.read_pkt_seq(),
-            {
-                SIDE_BAND_CHANNEL_DATA: pack_data,
-                SIDE_BAND_CHANNEL_PROGRESS: progress,
-            },
-        )
+        for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
+            if chan == SIDE_BAND_CHANNEL_DATA:
+                pack_data(data)
+            elif chan == SIDE_BAND_CHANNEL_PROGRESS:
+                progress(data)
+            else:
+                raise AssertionError(
+                    "Invalid sideband channel %d" % chan)
     else:
         while True:
             data = proto.read(rbufsize)
@@ -821,15 +812,13 @@ class GitClient:
         if determine_wants is None:
             determine_wants = target.object_store.determine_wants_all
         if CAPABILITY_THIN_PACK in self._fetch_capabilities:
-            # TODO(jelmer): Avoid reading entire file into memory and
-            # only processing it after the whole file has been fetched.
             from tempfile import SpooledTemporaryFile
             f: IO[bytes] = SpooledTemporaryFile()
 
             def commit():
                 if f.tell():
                     f.seek(0)
-                    target.object_store.add_thin_pack(f.read, None)
+                    target.object_store.add_thin_pack(f.read, None, progress=progress)
                 f.close()
 
             def abort():
@@ -935,12 +924,17 @@ class GitClient:
                 def progress(x):
                     pass
 
-            channel_callbacks = {2: progress}
             if CAPABILITY_REPORT_STATUS in capabilities:
-                channel_callbacks[1] = PktLineParser(
-                    self._report_status_parser.handle_packet
-                ).parse
-            _read_side_band64k_data(proto.read_pkt_seq(), channel_callbacks)
+                pktline_parser = PktLineParser(self._report_status_parser.handle_packet)
+            for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
+                if chan == SIDE_BAND_CHANNEL_DATA:
+                    if CAPABILITY_REPORT_STATUS in capabilities:
+                        pktline_parser.parse(data)
+                elif chan == SIDE_BAND_CHANNEL_PROGRESS:
+                    progress(data)
+                else:
+                    raise AssertionError(
+                        "Invalid sideband channel %d" % chan)
         else:
             if CAPABILITY_REPORT_STATUS in capabilities:
                 for pkt in proto.read_pkt_seq():
@@ -1244,14 +1238,15 @@ class TraditionalGitClient(GitClient):
             ret = proto.read_pkt_line()
             if ret is not None:
                 raise AssertionError("expected pkt tail")
-            _read_side_band64k_data(
-                proto.read_pkt_seq(),
-                {
-                    SIDE_BAND_CHANNEL_DATA: write_data,
-                    SIDE_BAND_CHANNEL_PROGRESS: progress,
-                    SIDE_BAND_CHANNEL_FATAL: write_error,
-                },
-            )
+            for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
+                if chan == SIDE_BAND_CHANNEL_DATA:
+                    write_data(data)
+                elif chan == SIDE_BAND_CHANNEL_PROGRESS:
+                    progress(data)
+                elif chan == SIDE_BAND_CHANNEL_FATAL:
+                    write_error(data)
+                else:
+                    raise AssertionError("Invalid sideband channel %d" % chan)
 
 
 class TCPGitClient(TraditionalGitClient):
blob - 9201aefc4bdd2ebbc00dd0cc2a47bad55083510c
blob + a55f157decb93b8afadb4d07fcfac1d2495b2d76
--- dulwich/object_store.py
+++ dulwich/object_store.py
@@ -804,7 +804,7 @@ class DiskObjectStore(PackBasedObjectStore):
         suffix = suffix.decode("ascii")
         return os.path.join(self.pack_dir, "pack-" + suffix)
 
-    def _complete_thin_pack(self, f, path, copier, indexer):
+    def _complete_thin_pack(self, f, path, copier, indexer, progress=None):
         """Move a specific file containing a pack into the pack directory.
 
         Note: The file should be on the same file system as the
@@ -816,36 +816,49 @@ class DiskObjectStore(PackBasedObjectStore):
           copier: A PackStreamCopier to use for writing pack data.
           indexer: A PackIndexer for indexing the pack.
         """
-        entries = list(indexer)
-
-        # Update the header with the new number of objects.
-        f.seek(0)
-        write_pack_header(f.write, len(entries) + len(indexer.ext_refs()))
+        entries = []
+        for i, entry in enumerate(indexer):
+            if progress is not None:
+                progress(("generating index: %d\r" % i).encode('ascii'))
+            entries.append(entry)
 
-        # Must flush before reading (http://bugs.python.org/issue3207)
-        f.flush()
+        ext_refs = indexer.ext_refs()
+
+        if ext_refs:
+            # Update the header with the new number of objects.
+            f.seek(0)
+            write_pack_header(f.write, len(entries) + len(ext_refs))
 
-        # Rescan the rest of the pack, computing the SHA with the new header.
-        new_sha = compute_file_sha(f, end_ofs=-20)
+            # Must flush before reading (http://bugs.python.org/issue3207)
+            f.flush()
 
-        # Must reposition before writing (http://bugs.python.org/issue3207)
-        f.seek(0, os.SEEK_CUR)
+            # Rescan the rest of the pack, computing the SHA with the new header.
+            new_sha = compute_file_sha(f, end_ofs=-20)
 
-        # Complete the pack.
-        for ext_sha in indexer.ext_refs():
-            assert len(ext_sha) == 20
-            type_num, data = self.get_raw(ext_sha)
-            offset = f.tell()
-            crc32 = write_pack_object(
-                f.write,
-                type_num,
-                data,
-                sha=new_sha,
-                compression_level=self.pack_compression_level,
-            )
-            entries.append((ext_sha, offset, crc32))
-        pack_sha = new_sha.digest()
-        f.write(pack_sha)
+            # Must reposition before writing (http://bugs.python.org/issue3207)
+            f.seek(0, os.SEEK_CUR)
+
+            # Complete the pack.
+            for i, ext_sha in enumerate(ext_refs):
+                if progress is not None:
+                    progress(("writing extra base objects: %d/%d\r" % (i, len(ext_refs))).encode("ascii"))
+                assert len(ext_sha) == 20
+                type_num, data = self.get_raw(ext_sha)
+                offset = f.tell()
+                crc32 = write_pack_object(
+                    f.write,
+                    type_num,
+                    data,
+                    sha=new_sha,
+                    compression_level=self.pack_compression_level,
+                )
+                entries.append((ext_sha, offset, crc32))
+            pack_sha = new_sha.digest()
+            f.write(pack_sha)
+        else:
+            f.seek(-20, os.SEEK_END)
+            pack_sha = f.read(20)
+
         f.close()
 
         # Move the pack in.
@@ -875,7 +888,7 @@ class DiskObjectStore(PackBasedObjectStore):
         self._add_cached_pack(pack_base_name, final_pack)
         return final_pack
 
-    def add_thin_pack(self, read_all, read_some):
+    def add_thin_pack(self, read_all, read_some, progress=None):
         """Add a new thin pack to this object store.
 
         Thin packs are packs that contain deltas with parents that exist
@@ -897,8 +910,8 @@ class DiskObjectStore(PackBasedObjectStore):
             os.chmod(path, PACK_MODE)
             indexer = PackIndexer(f, resolve_ext_ref=self.get_raw)
             copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer)
-            copier.verify()
-            return self._complete_thin_pack(f, path, copier, indexer)
+            copier.verify(progress=progress)
+            return self._complete_thin_pack(f, path, copier, indexer, progress=progress)
 
     def move_in_pack(self, path):
         """Move a specific file containing a pack into the pack directory.
@@ -1076,7 +1089,7 @@ class MemoryObjectStore(BaseObjectStore):
 
         return f, commit, abort
 
-    def _complete_thin_pack(self, f, indexer):
+    def _complete_thin_pack(self, f, indexer, progress=None):
         """Complete a thin pack by adding external references.
 
         Args:
@@ -1085,22 +1098,25 @@ class MemoryObjectStore(BaseObjectStore):
         """
         entries = list(indexer)
 
-        # Update the header with the new number of objects.
-        f.seek(0)
-        write_pack_header(f.write, len(entries) + len(indexer.ext_refs()))
+        ext_refs = indexer.ext_refs()
 
-        # Rescan the rest of the pack, computing the SHA with the new header.
-        new_sha = compute_file_sha(f, end_ofs=-20)
+        if ext_refs:
+            # Update the header with the new number of objects.
+            f.seek(0)
+            write_pack_header(f.write, len(entries) + len(ext_refs))
 
-        # Complete the pack.
-        for ext_sha in indexer.ext_refs():
-            assert len(ext_sha) == 20
-            type_num, data = self.get_raw(ext_sha)
-            write_pack_object(f.write, type_num, data, sha=new_sha)
-        pack_sha = new_sha.digest()
-        f.write(pack_sha)
+            # Rescan the rest of the pack, computing the SHA with the new header.
+            new_sha = compute_file_sha(f, end_ofs=-20)
 
-    def add_thin_pack(self, read_all, read_some):
+            # Complete the pack.
+            for ext_sha in indexer.ext_refs():
+                assert len(ext_sha) == 20
+                type_num, data = self.get_raw(ext_sha)
+                write_pack_object(f.write, type_num, data, sha=new_sha)
+            pack_sha = new_sha.digest()
+            f.write(pack_sha)
+
+    def add_thin_pack(self, read_all, read_some, progress=None):
         """Add a new thin pack to this object store.
 
         Thin packs are packs that contain deltas with parents that exist
@@ -1117,8 +1133,8 @@ class MemoryObjectStore(BaseObjectStore):
         try:
             indexer = PackIndexer(f, resolve_ext_ref=self.get_raw)
             copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer)
-            copier.verify()
-            self._complete_thin_pack(f, indexer)
+            copier.verify(progress=progress)
+            self._complete_thin_pack(f, indexer, progress=progress)
         except BaseException:
             abort()
             raise
blob - 9dcfa31e67776df61b6c2d09f37f968c35955084
blob + cf7cdd6f77a86dc371e7ef123ebb5ee7efc6032a
--- dulwich/pack.py
+++ dulwich/pack.py
@@ -1069,18 +1069,19 @@ class PackStreamCopier(PackStreamReader):
         self.outfile.write(data)
         return data
 
-    def verify(self):
+    def verify(self, progress=None):
         """Verify a pack stream and write it to the output file.
 
         See PackStreamReader.iterobjects for a list of exceptions this may
         throw.
         """
-        if self._delta_iter:
-            for unpacked in self.read_objects():
+        for i, unpacked in enumerate(self.read_objects()):
+            if self._delta_iter:
                 self._delta_iter.record(unpacked)
-        else:
-            for _ in self.read_objects():
-                pass
+            if progress is not None:
+                progress(("copying pack entries: %d/%d\r" % (i, len(self))).encode('ascii'))
+        if progress is not None:
+            progress(("copied %d pack entries\n" % i).encode('ascii'))
 
 
 def obj_sha(type, chunks):
@@ -2352,7 +2353,7 @@ class Pack:
 
     def check_length_and_checksum(self) -> None:
         """Sanity check the length and checksum of the pack index and data."""
-        assert len(self.index) == len(self.data)
+        assert len(self.index) == len(self.data), f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)"
         idx_stored_checksum = self.index.get_pack_checksum()
         data_stored_checksum = self.data.get_stored_checksum()
         if idx_stored_checksum != data_stored_checksum: