commit a0b6043ad8bb7d1709d24648ebe155d125373bf9 from: Nicolas Dandrimont date: Mon Jan 29 16:10:02 2024 UTC Add INFO-level logging every few minutes while loading Git loading tasks can take a pretty long time, and it's not easy to diagnose if it's stuck or if it's just taking a while. Instead of only logging at the end of the task, print a log line after each object type has been fully processed. Also print a log line every 3 minutes while objects are being processed. commit - a742d967b190b295de060fea86e73294009c5c5f commit + a0b6043ad8bb7d1709d24648ebe155d125373bf9 blob - 4579d7956a956f7649104843eb1d6129d4971283 blob + 4880ce47f67e486e84c40b56791d6693a02c6b1f --- swh/loader/git/base.py +++ swh/loader/git/base.py @@ -4,6 +4,8 @@ # See top-level LICENSE file for more information import collections +import logging +import time from typing import Dict, Iterable from swh.loader.core.loader import BaseLoader @@ -17,13 +19,23 @@ from swh.model.model import ( Snapshot, ) +logger = logging.getLogger(__name__) +# Print a log message every LOGGING_INTERVAL +LOGGING_INTERVAL = 180 + + class BaseGitLoader(BaseLoader): """This base class is a pattern for both git loaders Those loaders are able to load all the data in one go. """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.next_log_after = time.monotonic() + LOGGING_INTERVAL + def cleanup(self) -> None: """Clean up an eventual state installed for computations.""" pass @@ -68,6 +80,25 @@ class BaseGitLoader(BaseLoader): """Whether the load was eventful""" raise NotImplementedError + def maybe_log(self, msg: str, *args, level=logging.INFO, force=False, **kwargs): + """Only log if ``LOGGING_INTERVAL`` has elapsed since the last log line was printed. + + Arguments are identical to those of ``logging.Logger.log``, except if the log format + arguments are callable, the call only happens if the log is actually + being printed. + """ + if time.monotonic() < self.next_log_after and not force: + return + + if logger.isEnabledFor(level): + logger.log( + level, + msg, + *(arg() if callable(arg) else arg for arg in args), + **kwargs, + ) + self.next_log_after = time.monotonic() + LOGGING_INTERVAL + def store_data(self) -> None: assert self.origin if self.save_data_path: @@ -76,6 +107,20 @@ class BaseGitLoader(BaseLoader): counts: Dict[str, int] = collections.defaultdict(int) storage_summary: Dict[str, int] = collections.Counter() + def sum_counts(): + return sum(counts.values()) + + def sum_storage(): + return sum(storage_summary[f"{object_type}:add"] for object_type in counts) + + def maybe_log_summary(msg, force=False): + self.maybe_log( + msg + ": processed %s objects, %s are new", + sum_counts, + sum_storage, + force=force, + ) + if self.has_contents(): for obj in self.get_contents(): if isinstance(obj, Content): @@ -87,20 +132,31 @@ class BaseGitLoader(BaseLoader): else: raise TypeError(f"Unexpected content type: {obj}") + maybe_log_summary("In contents") + + maybe_log_summary("After contents", force=True) + if self.has_directories(): for directory in self.get_directories(): counts["directory"] += 1 storage_summary.update(self.storage.directory_add([directory])) + maybe_log_summary("In directories") + maybe_log_summary("After directories", force=True) + if self.has_revisions(): for revision in self.get_revisions(): counts["revision"] += 1 storage_summary.update(self.storage.revision_add([revision])) + maybe_log_summary("In revisions") + maybe_log_summary("After revisions", force=True) + if self.has_releases(): for release in self.get_releases(): counts["release"] += 1 storage_summary.update(self.storage.release_add([release])) + maybe_log_summary("In releases") snapshot = self.get_snapshot() counts["snapshot"] += 1 @@ -129,8 +185,4 @@ class BaseGitLoader(BaseLoader): self.statsd.increment("filtered_objects_total_sum", filtered, tags=tags) self.statsd.increment("filtered_objects_total_count", total, tags=tags) - self.log.info( - "Fetched %d objects; %d are new", - sum(counts.values()), - sum(storage_summary[f"{object_type}:add"] for object_type in counts), - ) + maybe_log_summary("After snapshot", force=True)