commit a0d798169a3e034cee72b7c918670727b94aa2db from: Jelmer Vernooij via: GitHub date: Tue Jan 17 17:51:48 2023 UTC Merge pull request #1124 from jelmer/extend-pack Add extend_pack, limit SpooledTemporaryFile memory usage commit - acde02107412a6064a1ab18ed3d8f4f036126fa2 commit + a0d798169a3e034cee72b7c918670727b94aa2db blob - 02ca79adbc6f7f59941548d53a5a530b17806e04 blob + 71de0a97318ce099eeb356458c6f8e9153a0ef9a --- dulwich/client.py +++ dulwich/client.py @@ -122,6 +122,7 @@ from dulwich.pack import ( write_pack_from_container, UnpackedObject, PackChunkGenerator, + PACK_SPOOL_FILE_MAX_SIZE, ) from dulwich.refs import ( read_info_refs, @@ -813,7 +814,9 @@ class GitClient: determine_wants = target.object_store.determine_wants_all if CAPABILITY_THIN_PACK in self._fetch_capabilities: from tempfile import SpooledTemporaryFile - f: IO[bytes] = SpooledTemporaryFile() + f: IO[bytes] = SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-', + dir=getattr(target.object_store, 'path', None)) def commit(): if f.tell(): blob - d6d1c562b6365070f9fe8bf136c2af40f8b56e28 blob + ed0a493edccb5e0acdc795875c337ea4f82c5035 --- dulwich/cloud/gcs.py +++ dulwich/cloud/gcs.py @@ -25,7 +25,7 @@ import posixpath import tempfile from ..object_store import BucketBasedObjectStore -from ..pack import PackData, Pack, load_pack_index_file +from ..pack import PackData, Pack, load_pack_index_file, PACK_SPOOL_FILE_MAX_SIZE # TODO(jelmer): For performance, read ranges? @@ -58,14 +58,14 @@ class GcsObjectStore(BucketBasedObjectStore): def _load_pack_data(self, name): b = self.bucket.blob(posixpath.join(self.subpath, name + '.pack')) - f = tempfile.SpooledTemporaryFile() + f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) b.download_to_file(f) f.seek(0) return PackData(name + '.pack', f) def _load_pack_index(self, name): b = self.bucket.blob(posixpath.join(self.subpath, name + '.idx')) - f = tempfile.SpooledTemporaryFile() + f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) b.download_to_file(f) f.seek(0) return load_pack_index_file(name + '.idx', f) blob - a55f157decb93b8afadb4d07fcfac1d2495b2d76 blob + ccef377c35774ca0f40daa705116704d70fdbffa --- dulwich/object_store.py +++ dulwich/object_store.py @@ -63,19 +63,18 @@ from dulwich.pack import ( PackInflater, PackFileDisappeared, UnpackedObject, + extend_pack, load_pack_index_file, iter_sha1, full_unpacked_object, generate_unpacked_objects, pack_objects_to_data, - write_pack_header, - write_pack_index_v2, + write_pack_index, write_pack_data, - write_pack_object, - compute_file_sha, PackIndexer, PackStreamCopier, PackedObjectContainer, + PACK_SPOOL_FILE_MAX_SIZE, ) from dulwich.protocol import DEPTH_INFINITE from dulwich.refs import ANNOTATED_TAG_SUFFIX, Ref @@ -819,48 +818,15 @@ class DiskObjectStore(PackBasedObjectStore): entries = [] for i, entry in enumerate(indexer): if progress is not None: - progress(("generating index: %d\r" % i).encode('ascii')) + progress(("generating index: %d/%d\r" % (i, len(copier))).encode('ascii')) entries.append(entry) - ext_refs = indexer.ext_refs() - - if ext_refs: - # Update the header with the new number of objects. - f.seek(0) - write_pack_header(f.write, len(entries) + len(ext_refs)) - - # Must flush before reading (http://bugs.python.org/issue3207) - f.flush() - - # Rescan the rest of the pack, computing the SHA with the new header. - new_sha = compute_file_sha(f, end_ofs=-20) - - # Must reposition before writing (http://bugs.python.org/issue3207) - f.seek(0, os.SEEK_CUR) + pack_sha, extra_entries = extend_pack( + f, indexer.ext_refs(), get_raw=self.get_raw, compression_level=self.pack_compression_level, + progress=progress) - # Complete the pack. - for i, ext_sha in enumerate(ext_refs): - if progress is not None: - progress(("writing extra base objects: %d/%d\r" % (i, len(ext_refs))).encode("ascii")) - assert len(ext_sha) == 20 - type_num, data = self.get_raw(ext_sha) - offset = f.tell() - crc32 = write_pack_object( - f.write, - type_num, - data, - sha=new_sha, - compression_level=self.pack_compression_level, - ) - entries.append((ext_sha, offset, crc32)) - pack_sha = new_sha.digest() - f.write(pack_sha) - else: - f.seek(-20, os.SEEK_END) - pack_sha = f.read(20) + entries.extend(extra_entries) - f.close() - # Move the pack in. entries.sort() pack_base_name = self._get_pack_basepath(entries) @@ -877,7 +843,7 @@ class DiskObjectStore(PackBasedObjectStore): # Write the index. index_file = GitFile(pack_base_name + ".idx", "wb", mask=PACK_MODE) try: - write_pack_index_v2(index_file, entries, pack_sha) + write_pack_index(index_file, entries, pack_sha) index_file.close() finally: index_file.abort() @@ -928,7 +894,7 @@ class DiskObjectStore(PackBasedObjectStore): index_name = basename + ".idx" if not os.path.exists(index_name): with GitFile(index_name, "wb", mask=PACK_MODE) as f: - write_pack_index_v2(f, entries, p.get_stored_checksum()) + write_pack_index(f, entries, p.get_stored_checksum()) for pack in self.packs: if pack._basename == basename: return pack @@ -1076,46 +1042,23 @@ class MemoryObjectStore(BaseObjectStore): Returns: Fileobject to write to and a commit function to call when the pack is finished. """ - f = BytesIO() + from tempfile import SpooledTemporaryFile + f = SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-') def commit(): - p = PackData.from_file(BytesIO(f.getvalue()), f.tell()) - f.close() + size = f.tell() + f.seek(0) + p = PackData.from_file(f, size) for obj in PackInflater.for_pack_data(p, self.get_raw): self.add_object(obj) + p.close() def abort(): pass return f, commit, abort - - def _complete_thin_pack(self, f, indexer, progress=None): - """Complete a thin pack by adding external references. - - Args: - f: Open file object for the pack. - indexer: A PackIndexer for indexing the pack. - """ - entries = list(indexer) - - ext_refs = indexer.ext_refs() - - if ext_refs: - # Update the header with the new number of objects. - f.seek(0) - write_pack_header(f.write, len(entries) + len(ext_refs)) - - # Rescan the rest of the pack, computing the SHA with the new header. - new_sha = compute_file_sha(f, end_ofs=-20) - # Complete the pack. - for ext_sha in indexer.ext_refs(): - assert len(ext_sha) == 20 - type_num, data = self.get_raw(ext_sha) - write_pack_object(f.write, type_num, data, sha=new_sha) - pack_sha = new_sha.digest() - f.write(pack_sha) - def add_thin_pack(self, read_all, read_some, progress=None): """Add a new thin pack to this object store. @@ -1129,12 +1072,11 @@ class MemoryObjectStore(BaseObjectStore): read_some: Read function that returns at least one byte, but may not return the number of bytes requested. """ + f, commit, abort = self.add_pack() try: - indexer = PackIndexer(f, resolve_ext_ref=self.get_raw) - copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer) - copier.verify(progress=progress) - self._complete_thin_pack(f, indexer, progress=progress) + copier = PackStreamCopier(read_all, read_some, f) + copier.verify() except BaseException: abort() raise @@ -1601,7 +1543,8 @@ class BucketBasedObjectStore(PackBasedObjectStore): """ import tempfile - pf = tempfile.SpooledTemporaryFile() + pf = tempfile.SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-') def commit(): if pf.tell() == 0: @@ -1612,9 +1555,10 @@ class BucketBasedObjectStore(PackBasedObjectStore): p = PackData(pf.name, pf) entries = p.sorted_entries() basename = iter_sha1(entry[0] for entry in entries).decode('ascii') - idxf = tempfile.SpooledTemporaryFile() + idxf = tempfile.SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-') checksum = p.get_stored_checksum() - write_pack_index_v2(idxf, entries, checksum) + write_pack_index(idxf, entries, checksum) idxf.seek(0) idx = load_pack_index_file(basename + '.idx', idxf) for pack in self.packs: blob - cf7cdd6f77a86dc371e7ef123ebb5ee7efc6032a blob + dc4e176fc498dba4c490cec8672befb98c24d8d9 --- dulwich/pack.py +++ dulwich/pack.py @@ -50,7 +50,7 @@ from itertools import chain import os import sys -from typing import Optional, Callable, Tuple, List, Deque, Union, Iterable, Iterator, Dict, TypeVar, Generic, Sequence, Set +from typing import Optional, Callable, Tuple, List, Deque, Union, Iterable, Iterator, Dict, TypeVar, Generic, Sequence, Set, BinaryIO try: from typing import Protocol @@ -103,7 +103,10 @@ DELTA_TYPES = (OFS_DELTA, REF_DELTA) DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 +# Keep pack files under 16Mb in memory, otherwise write them out to disk +PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 + OldUnpackedObject = Union[Tuple[Union[bytes, int], List[bytes]], List[bytes]] ResolveExtRefFn = Callable[[bytes], Tuple[int, OldUnpackedObject]] ProgressFn = Callable[[int, str], None] @@ -1006,8 +1009,6 @@ class PackStreamReader: IOError: if an error occurred writing to the output file. """ pack_version, self._num_objects = read_pack_header(self.read) - if pack_version is None: - return for i in range(self._num_objects): offset = self.offset @@ -2559,8 +2560,55 @@ class Pack: unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) unpacked.pack_type_num = REF_DELTA return unpacked + + +def extend_pack(f: BinaryIO, object_ids: Set[ObjectID], get_raw, *, compression_level=-1, progress=None) -> Tuple[bytes, List]: + """Extend a pack file with more objects. + + The caller should make sure that object_ids does not contain any objects + that are already in the pack + """ + # Update the header with the new number of objects. + f.seek(0) + _version, num_objects = read_pack_header(f.read) + + if object_ids: + f.seek(0) + write_pack_header(f.write, num_objects + len(object_ids)) + + # Must flush before reading (http://bugs.python.org/issue3207) + f.flush() + + # Rescan the rest of the pack, computing the SHA with the new header. + new_sha = compute_file_sha(f, end_ofs=-20) + # Must reposition before writing (http://bugs.python.org/issue3207) + f.seek(0, os.SEEK_CUR) + extra_entries = [] + + # Complete the pack. + for i, object_id in enumerate(object_ids): + if progress is not None: + progress(("writing extra base objects: %d/%d\r" % (i, len(object_ids))).encode("ascii")) + assert len(object_id) == 20 + type_num, data = get_raw(object_id) + offset = f.tell() + crc32 = write_pack_object( + f.write, + type_num, + data, + sha=new_sha, + compression_level=compression_level, + ) + extra_entries.append((object_id, offset, crc32)) + pack_sha = new_sha.digest() + f.write(pack_sha) + f.close() + + return pack_sha, extra_entries + + try: from dulwich._pack import ( # type: ignore # noqa: F811 apply_delta, blob - 1b02b1aade5feeb9bcf86c3a8a0595d5ac8223ad blob + cf395d1304c426089959aac005ffb07bdfebac5b --- dulwich/tests/test_web.py +++ dulwich/tests/test_web.py @@ -78,7 +78,7 @@ class MinimalistWSGIInputStream: start = self.pos end = self.pos + howmuch if start >= len(self.data): - return "" + return b"" self.pos = end return self.data[start:end] @@ -538,7 +538,7 @@ class GunzipTestCase(HTTPGitApplicationTestCase): def _get_zstream(self, text): zstream = BytesIO() - zfile = gzip.GzipFile(fileobj=zstream, mode="w") + zfile = gzip.GzipFile(fileobj=zstream, mode="wb") zfile.write(text) zfile.close() zlength = zstream.tell() blob - 5582dda5e71df8dcd4ef861e9531734c444d2adc blob + e6767dfd8eb94540be263836781e96b46b721a1e --- dulwich/web.py +++ dulwich/web.py @@ -22,9 +22,6 @@ """HTTP server for dulwich that implements the git smart HTTP protocol.""" from io import BytesIO -import shutil -import tempfile -import gzip import os import re import sys @@ -466,21 +463,10 @@ class GunzipFilter: self.app = application def __call__(self, environ, start_response): + import gzip if environ.get("HTTP_CONTENT_ENCODING", "") == "gzip": - try: - environ["wsgi.input"].tell() - wsgi_input = environ["wsgi.input"] - except (AttributeError, OSError, NotImplementedError): - # The gzip implementation in the standard library of Python 2.x - # requires working '.seek()' and '.tell()' methods on the input - # stream. Read the data into a temporary file to work around - # this limitation. - wsgi_input = tempfile.SpooledTemporaryFile(16 * 1024 * 1024) - shutil.copyfileobj(environ["wsgi.input"], wsgi_input) - wsgi_input.seek(0) - - environ["wsgi.input"] = gzip.GzipFile( - filename=None, fileobj=wsgi_input, mode="r" + environ["wsgi.input"] = gzip.GzipFile( + filename=None, fileobj=environ["wsgi.input"], mode="rb" ) del environ["HTTP_CONTENT_ENCODING"] if "CONTENT_LENGTH" in environ: