commit - c18863d65ee8d1e668f14bea3e8a48978b364353
commit + 6fa8d893b2e007b48d78e75d0f067fcb48c27633
blob - 19122b8b7f23662036bce5954a323c9df203e90f
blob + 02ca79adbc6f7f59941548d53a5a530b17806e04
--- dulwich/client.py
+++ dulwich/client.py
yield None
-def _read_side_band64k_data(pkt_seq: Iterable[bytes], channel_callbacks: Dict[int, Callable[[bytes], None]]) -> None:
+def _read_side_band64k_data(pkt_seq: Iterable[bytes]) -> Iterator[Tuple[int, bytes]]:
"""Read per-channel data.
This requires the side-band-64k capability.
Args:
pkt_seq: Sequence of packets to read
- channel_callbacks: Dictionary mapping channels to packet
- handlers to use. None for a callback discards channel data.
"""
for pkt in pkt_seq:
channel = ord(pkt[:1])
- pkt = pkt[1:]
- try:
- cb = channel_callbacks[channel]
- except KeyError as exc:
- raise AssertionError(
- "Invalid sideband channel %d" % channel) from exc
- else:
- if cb is not None:
- cb(pkt)
+ yield channel, pkt[1:]
def _handle_upload_pack_head(
def progress(x):
pass
- _read_side_band64k_data(
- proto.read_pkt_seq(),
- {
- SIDE_BAND_CHANNEL_DATA: pack_data,
- SIDE_BAND_CHANNEL_PROGRESS: progress,
- },
- )
+ for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
+ if chan == SIDE_BAND_CHANNEL_DATA:
+ pack_data(data)
+ elif chan == SIDE_BAND_CHANNEL_PROGRESS:
+ progress(data)
+ else:
+ raise AssertionError(
+ "Invalid sideband channel %d" % chan)
else:
while True:
data = proto.read(rbufsize)
if determine_wants is None:
determine_wants = target.object_store.determine_wants_all
if CAPABILITY_THIN_PACK in self._fetch_capabilities:
- # TODO(jelmer): Avoid reading entire file into memory and
- # only processing it after the whole file has been fetched.
from tempfile import SpooledTemporaryFile
f: IO[bytes] = SpooledTemporaryFile()
def commit():
if f.tell():
f.seek(0)
- target.object_store.add_thin_pack(f.read, None)
+ target.object_store.add_thin_pack(f.read, None, progress=progress)
f.close()
def abort():
def progress(x):
pass
- channel_callbacks = {2: progress}
if CAPABILITY_REPORT_STATUS in capabilities:
- channel_callbacks[1] = PktLineParser(
- self._report_status_parser.handle_packet
- ).parse
- _read_side_band64k_data(proto.read_pkt_seq(), channel_callbacks)
+ pktline_parser = PktLineParser(self._report_status_parser.handle_packet)
+ for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
+ if chan == SIDE_BAND_CHANNEL_DATA:
+ if CAPABILITY_REPORT_STATUS in capabilities:
+ pktline_parser.parse(data)
+ elif chan == SIDE_BAND_CHANNEL_PROGRESS:
+ progress(data)
+ else:
+ raise AssertionError(
+ "Invalid sideband channel %d" % chan)
else:
if CAPABILITY_REPORT_STATUS in capabilities:
for pkt in proto.read_pkt_seq():
ret = proto.read_pkt_line()
if ret is not None:
raise AssertionError("expected pkt tail")
- _read_side_band64k_data(
- proto.read_pkt_seq(),
- {
- SIDE_BAND_CHANNEL_DATA: write_data,
- SIDE_BAND_CHANNEL_PROGRESS: progress,
- SIDE_BAND_CHANNEL_FATAL: write_error,
- },
- )
+ for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
+ if chan == SIDE_BAND_CHANNEL_DATA:
+ write_data(data)
+ elif chan == SIDE_BAND_CHANNEL_PROGRESS:
+ progress(data)
+ elif chan == SIDE_BAND_CHANNEL_FATAL:
+ write_error(data)
+ else:
+ raise AssertionError("Invalid sideband channel %d" % chan)
class TCPGitClient(TraditionalGitClient):
blob - 9201aefc4bdd2ebbc00dd0cc2a47bad55083510c
blob + a55f157decb93b8afadb4d07fcfac1d2495b2d76
--- dulwich/object_store.py
+++ dulwich/object_store.py
suffix = suffix.decode("ascii")
return os.path.join(self.pack_dir, "pack-" + suffix)
- def _complete_thin_pack(self, f, path, copier, indexer):
+ def _complete_thin_pack(self, f, path, copier, indexer, progress=None):
"""Move a specific file containing a pack into the pack directory.
Note: The file should be on the same file system as the
copier: A PackStreamCopier to use for writing pack data.
indexer: A PackIndexer for indexing the pack.
"""
- entries = list(indexer)
-
- # Update the header with the new number of objects.
- f.seek(0)
- write_pack_header(f.write, len(entries) + len(indexer.ext_refs()))
+ entries = []
+ for i, entry in enumerate(indexer):
+ if progress is not None:
+ progress(("generating index: %d\r" % i).encode('ascii'))
+ entries.append(entry)
- # Must flush before reading (http://bugs.python.org/issue3207)
- f.flush()
+ ext_refs = indexer.ext_refs()
+
+ if ext_refs:
+ # Update the header with the new number of objects.
+ f.seek(0)
+ write_pack_header(f.write, len(entries) + len(ext_refs))
- # Rescan the rest of the pack, computing the SHA with the new header.
- new_sha = compute_file_sha(f, end_ofs=-20)
+ # Must flush before reading (http://bugs.python.org/issue3207)
+ f.flush()
- # Must reposition before writing (http://bugs.python.org/issue3207)
- f.seek(0, os.SEEK_CUR)
+ # Rescan the rest of the pack, computing the SHA with the new header.
+ new_sha = compute_file_sha(f, end_ofs=-20)
- # Complete the pack.
- for ext_sha in indexer.ext_refs():
- assert len(ext_sha) == 20
- type_num, data = self.get_raw(ext_sha)
- offset = f.tell()
- crc32 = write_pack_object(
- f.write,
- type_num,
- data,
- sha=new_sha,
- compression_level=self.pack_compression_level,
- )
- entries.append((ext_sha, offset, crc32))
- pack_sha = new_sha.digest()
- f.write(pack_sha)
+ # Must reposition before writing (http://bugs.python.org/issue3207)
+ f.seek(0, os.SEEK_CUR)
+
+ # Complete the pack.
+ for i, ext_sha in enumerate(ext_refs):
+ if progress is not None:
+ progress(("writing extra base objects: %d/%d\r" % (i, len(ext_refs))).encode("ascii"))
+ assert len(ext_sha) == 20
+ type_num, data = self.get_raw(ext_sha)
+ offset = f.tell()
+ crc32 = write_pack_object(
+ f.write,
+ type_num,
+ data,
+ sha=new_sha,
+ compression_level=self.pack_compression_level,
+ )
+ entries.append((ext_sha, offset, crc32))
+ pack_sha = new_sha.digest()
+ f.write(pack_sha)
+ else:
+ f.seek(-20, os.SEEK_END)
+ pack_sha = f.read(20)
+
f.close()
# Move the pack in.
self._add_cached_pack(pack_base_name, final_pack)
return final_pack
- def add_thin_pack(self, read_all, read_some):
+ def add_thin_pack(self, read_all, read_some, progress=None):
"""Add a new thin pack to this object store.
Thin packs are packs that contain deltas with parents that exist
os.chmod(path, PACK_MODE)
indexer = PackIndexer(f, resolve_ext_ref=self.get_raw)
copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer)
- copier.verify()
- return self._complete_thin_pack(f, path, copier, indexer)
+ copier.verify(progress=progress)
+ return self._complete_thin_pack(f, path, copier, indexer, progress=progress)
def move_in_pack(self, path):
"""Move a specific file containing a pack into the pack directory.
return f, commit, abort
- def _complete_thin_pack(self, f, indexer):
+ def _complete_thin_pack(self, f, indexer, progress=None):
"""Complete a thin pack by adding external references.
Args:
"""
entries = list(indexer)
- # Update the header with the new number of objects.
- f.seek(0)
- write_pack_header(f.write, len(entries) + len(indexer.ext_refs()))
+ ext_refs = indexer.ext_refs()
- # Rescan the rest of the pack, computing the SHA with the new header.
- new_sha = compute_file_sha(f, end_ofs=-20)
+ if ext_refs:
+ # Update the header with the new number of objects.
+ f.seek(0)
+ write_pack_header(f.write, len(entries) + len(ext_refs))
- # Complete the pack.
- for ext_sha in indexer.ext_refs():
- assert len(ext_sha) == 20
- type_num, data = self.get_raw(ext_sha)
- write_pack_object(f.write, type_num, data, sha=new_sha)
- pack_sha = new_sha.digest()
- f.write(pack_sha)
+ # Rescan the rest of the pack, computing the SHA with the new header.
+ new_sha = compute_file_sha(f, end_ofs=-20)
- def add_thin_pack(self, read_all, read_some):
+ # Complete the pack.
+ for ext_sha in indexer.ext_refs():
+ assert len(ext_sha) == 20
+ type_num, data = self.get_raw(ext_sha)
+ write_pack_object(f.write, type_num, data, sha=new_sha)
+ pack_sha = new_sha.digest()
+ f.write(pack_sha)
+
+ def add_thin_pack(self, read_all, read_some, progress=None):
"""Add a new thin pack to this object store.
Thin packs are packs that contain deltas with parents that exist
try:
indexer = PackIndexer(f, resolve_ext_ref=self.get_raw)
copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer)
- copier.verify()
- self._complete_thin_pack(f, indexer)
+ copier.verify(progress=progress)
+ self._complete_thin_pack(f, indexer, progress=progress)
except BaseException:
abort()
raise
blob - 9dcfa31e67776df61b6c2d09f37f968c35955084
blob + cf7cdd6f77a86dc371e7ef123ebb5ee7efc6032a
--- dulwich/pack.py
+++ dulwich/pack.py
self.outfile.write(data)
return data
- def verify(self):
+ def verify(self, progress=None):
"""Verify a pack stream and write it to the output file.
See PackStreamReader.iterobjects for a list of exceptions this may
throw.
"""
- if self._delta_iter:
- for unpacked in self.read_objects():
+ for i, unpacked in enumerate(self.read_objects()):
+ if self._delta_iter:
self._delta_iter.record(unpacked)
- else:
- for _ in self.read_objects():
- pass
+ if progress is not None:
+ progress(("copying pack entries: %d/%d\r" % (i, len(self))).encode('ascii'))
+ if progress is not None:
+ progress(("copied %d pack entries\n" % i).encode('ascii'))
def obj_sha(type, chunks):
def check_length_and_checksum(self) -> None:
"""Sanity check the length and checksum of the pack index and data."""
- assert len(self.index) == len(self.data)
+ assert len(self.index) == len(self.data), f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)"
idx_stored_checksum = self.index.get_pack_checksum()
data_stored_checksum = self.data.get_stored_checksum()
if idx_stored_checksum != data_stored_checksum: