Commit Diff


commit - a742d967b190b295de060fea86e73294009c5c5f
commit + a0b6043ad8bb7d1709d24648ebe155d125373bf9
blob - 4579d7956a956f7649104843eb1d6129d4971283
blob + 4880ce47f67e486e84c40b56791d6693a02c6b1f
--- swh/loader/git/base.py
+++ swh/loader/git/base.py
@@ -4,6 +4,8 @@
 # See top-level LICENSE file for more information
 
 import collections
+import logging
+import time
 from typing import Dict, Iterable
 
 from swh.loader.core.loader import BaseLoader
@@ -17,13 +19,23 @@ from swh.model.model import (
     Snapshot,
 )
 
+logger = logging.getLogger(__name__)
 
+# Print a log message every LOGGING_INTERVAL
+LOGGING_INTERVAL = 180
+
+
 class BaseGitLoader(BaseLoader):
     """This base class is a pattern for both git loaders
 
     Those loaders are able to load all the data in one go.
     """
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        self.next_log_after = time.monotonic() + LOGGING_INTERVAL
+
     def cleanup(self) -> None:
         """Clean up an eventual state installed for computations."""
         pass
@@ -68,6 +80,25 @@ class BaseGitLoader(BaseLoader):
         """Whether the load was eventful"""
         raise NotImplementedError
 
+    def maybe_log(self, msg: str, *args, level=logging.INFO, force=False, **kwargs):
+        """Only log if ``LOGGING_INTERVAL`` has elapsed since the last log line was printed.
+
+        Arguments are identical to those of ``logging.Logger.log``, except if the log format
+        arguments are callable, the call only happens if the log is actually
+        being printed.
+        """
+        if time.monotonic() < self.next_log_after and not force:
+            return
+
+        if logger.isEnabledFor(level):
+            logger.log(
+                level,
+                msg,
+                *(arg() if callable(arg) else arg for arg in args),
+                **kwargs,
+            )
+            self.next_log_after = time.monotonic() + LOGGING_INTERVAL
+
     def store_data(self) -> None:
         assert self.origin
         if self.save_data_path:
@@ -76,6 +107,20 @@ class BaseGitLoader(BaseLoader):
         counts: Dict[str, int] = collections.defaultdict(int)
         storage_summary: Dict[str, int] = collections.Counter()
 
+        def sum_counts():
+            return sum(counts.values())
+
+        def sum_storage():
+            return sum(storage_summary[f"{object_type}:add"] for object_type in counts)
+
+        def maybe_log_summary(msg, force=False):
+            self.maybe_log(
+                msg + ": processed %s objects, %s are new",
+                sum_counts,
+                sum_storage,
+                force=force,
+            )
+
         if self.has_contents():
             for obj in self.get_contents():
                 if isinstance(obj, Content):
@@ -87,20 +132,31 @@ class BaseGitLoader(BaseLoader):
                 else:
                     raise TypeError(f"Unexpected content type: {obj}")
 
+                maybe_log_summary("In contents")
+
+            maybe_log_summary("After contents", force=True)
+
         if self.has_directories():
             for directory in self.get_directories():
                 counts["directory"] += 1
                 storage_summary.update(self.storage.directory_add([directory]))
+                maybe_log_summary("In directories")
 
+            maybe_log_summary("After directories", force=True)
+
         if self.has_revisions():
             for revision in self.get_revisions():
                 counts["revision"] += 1
                 storage_summary.update(self.storage.revision_add([revision]))
+                maybe_log_summary("In revisions")
 
+            maybe_log_summary("After revisions", force=True)
+
         if self.has_releases():
             for release in self.get_releases():
                 counts["release"] += 1
                 storage_summary.update(self.storage.release_add([release]))
+                maybe_log_summary("In releases")
 
         snapshot = self.get_snapshot()
         counts["snapshot"] += 1
@@ -129,8 +185,4 @@ class BaseGitLoader(BaseLoader):
             self.statsd.increment("filtered_objects_total_sum", filtered, tags=tags)
             self.statsd.increment("filtered_objects_total_count", total, tags=tags)
 
-        self.log.info(
-            "Fetched %d objects; %d are new",
-            sum(counts.values()),
-            sum(storage_summary[f"{object_type}:add"] for object_type in counts),
-        )
+        maybe_log_summary("After snapshot", force=True)