commit 4c9b38eda9a90a091d25041dc45c8669adf9639e from: Nicolas Dandrimont date: Mon Jan 29 16:10:02 2024 UTC loader: Push remote messages to a logger instead of stderr Instead of dumping the dulwich remote communication stream to stderr, add a separate logger for remote messages, and handle the remote stream as proper log entries. commit - 8cc7eb12ea0d2cd26be310c5b02e32ac6a47b5a9 commit + 4c9b38eda9a90a091d25041dc45c8669adf9639e blob - 5217edd9bc9894e37dd088138285c1925d5fe5be blob + 61d717d8fd989813ca141faf91da108914673579 --- swh/loader/git/loader.py +++ swh/loader/git/loader.py @@ -10,8 +10,8 @@ import json import logging import os import pickle -import sys from tempfile import SpooledTemporaryFile +import time from typing import ( Any, Callable, @@ -63,6 +63,27 @@ from .utils import HexBytes logger = logging.getLogger(__name__) heads_logger = logger.getChild("refs") +remote_logger = logger.getChild("remote") + +# How often to log messages for long-running operations, in seconds +LOGGING_INTERVAL = 30 + + +def split_lines_and_remainder(buf: bytes) -> Tuple[List[bytes], bytes]: + """Get newline-terminated (``b"\\r"`` or ``b"\\n"``) lines from `buf`, + and the beginning of the last line if it isn't terminated.""" + + lines = buf.splitlines(keepends=True) + if not lines: + return [], b"" + + if buf.endswith((b"\r", b"\n")): + # The buffer ended with a newline, everything can be sent as lines + return lines, b"" + else: + # The buffer didn't end with a newline, we need to keep the + # last bit as the beginning of the next line + return lines[:-1], lines[-1] class RepoRepresentation: @@ -367,14 +388,71 @@ class GitLoader(BaseGitLoader): statsd=self.statsd, ) - def do_progress(msg: bytes) -> None: - sys.stderr.buffer.write(msg) - sys.stderr.flush() + # Remote logging utilities + + # Number of lines (ending with a carriage return) elided when debug + # logging is not enabled + remote_lines_elided = 0 + + # Timestamp where the last elision was logged + last_elision_logged = time.monotonic() + + def maybe_log_elision(force: bool = False): + nonlocal remote_lines_elided + nonlocal last_elision_logged + + if remote_lines_elided and ( + force + # Always log at least every LOGGING_INTERVAL + or time.monotonic() > last_elision_logged + LOGGING_INTERVAL + ): + remote_logger.info( + "%s remote line%s elided", + remote_lines_elided, + "s" if remote_lines_elided > 1 else "", + ) + remote_lines_elided = 0 + last_elision_logged = time.monotonic() + def log_remote_message(line: bytes): + nonlocal remote_lines_elided + + do_debug = remote_logger.isEnabledFor(logging.DEBUG) + + if not line.endswith(b"\n"): + # This is a verbose line, ending with a carriage return only + if do_debug: + if stripped := line.strip(): + remote_logger.debug( + "remote: %s", stripped.decode("utf-8", "backslashreplace") + ) + else: + remote_lines_elided += 1 + maybe_log_elision() + else: + # This is the last line in the current section, we will always log it + maybe_log_elision(force=True) + if stripped := line.strip(): + remote_logger.info( + "remote: %s", stripped.decode("utf-8", "backslashreplace") + ) + + # This buffer keeps the end of what do_remote has received, across + # calls, if it happens to be unterminated + next_line_buf = b"" + + def do_remote(msg: bytes) -> None: + nonlocal next_line_buf + + lines, next_line_buf = split_lines_and_remainder(next_line_buf + msg) + + for line in lines: + log_remote_message(line) + try: with raise_not_found_repository(): fetch_info = self.fetch_pack_from_origin( - self.origin.url, base_repo, do_progress + self.origin.url, base_repo, do_remote ) except NotFound: # NotFound inherits from ValueError and should not be caught @@ -388,6 +466,10 @@ class GitLoader(BaseGitLoader): self.dumb = dumb.check_protocol(self.origin.url, self.requests_extra_kwargs) if not self.dumb: raise + else: + # Always log what remains in the next_line_buf, if it's not empty + maybe_log_elision(force=True) + log_remote_message(next_line_buf) logger.debug( "Protocol used for communication: %s", "dumb" if self.dumb else "smart" blob - 0ced59cf70860949970d8df78a860481e61bea48 blob + 25c460ee36f0f78b3c27962a9d05df9edb660190 --- swh/loader/git/tests/test_loader.py +++ swh/loader/git/tests/test_loader.py @@ -25,7 +25,7 @@ import pytest from requests import HTTPError from swh.loader.git import converters, dumb -from swh.loader.git.loader import FetchPackReturn, GitLoader +from swh.loader.git.loader import FetchPackReturn, GitLoader, split_lines_and_remainder from swh.loader.git.tests.test_from_disk import SNAPSHOT1, FullGitLoaderTests from swh.loader.tests import ( assert_last_visit_matches, @@ -1073,3 +1073,19 @@ def test_loader_too_large_pack_file_for_github_origin( f"Pack file too big for repository {repo_url}, " f"limit is {loader.pack_size_bytes} bytes, current size is {big_size_kib*1024}" ) + + +@pytest.mark.parametrize( + "input,output", + ( + (b"", ([], b"")), + (b"trailing", ([], b"trailing")), + (b"line1\r", ([b"line1\r"], b"")), + (b"line1\rtrailing", ([b"line1\r"], b"trailing")), + (b"line1\r\ntrailing", ([b"line1\r\n"], b"trailing")), + (b"line1\r\nline2\ntrailing", ([b"line1\r\n", b"line2\n"], b"trailing")), + (b"line1\r\nline2\nline3\r", ([b"line1\r\n", b"line2\n", b"line3\r"], b"")), + ), +) +def test_split_lines_and_remainder(input, output): + assert split_lines_and_remainder(input) == output