commit - a742d967b190b295de060fea86e73294009c5c5f
commit + a0b6043ad8bb7d1709d24648ebe155d125373bf9
blob - 4579d7956a956f7649104843eb1d6129d4971283
blob + 4880ce47f67e486e84c40b56791d6693a02c6b1f
--- swh/loader/git/base.py
+++ swh/loader/git/base.py
# See top-level LICENSE file for more information
import collections
+import logging
+import time
from typing import Dict, Iterable
from swh.loader.core.loader import BaseLoader
Snapshot,
)
+logger = logging.getLogger(__name__)
+# Print a log message every LOGGING_INTERVAL
+LOGGING_INTERVAL = 180
+
+
class BaseGitLoader(BaseLoader):
"""This base class is a pattern for both git loaders
Those loaders are able to load all the data in one go.
"""
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.next_log_after = time.monotonic() + LOGGING_INTERVAL
+
def cleanup(self) -> None:
"""Clean up an eventual state installed for computations."""
pass
"""Whether the load was eventful"""
raise NotImplementedError
+ def maybe_log(self, msg: str, *args, level=logging.INFO, force=False, **kwargs):
+ """Only log if ``LOGGING_INTERVAL`` has elapsed since the last log line was printed.
+
+ Arguments are identical to those of ``logging.Logger.log``, except if the log format
+ arguments are callable, the call only happens if the log is actually
+ being printed.
+ """
+ if time.monotonic() < self.next_log_after and not force:
+ return
+
+ if logger.isEnabledFor(level):
+ logger.log(
+ level,
+ msg,
+ *(arg() if callable(arg) else arg for arg in args),
+ **kwargs,
+ )
+ self.next_log_after = time.monotonic() + LOGGING_INTERVAL
+
def store_data(self) -> None:
assert self.origin
if self.save_data_path:
counts: Dict[str, int] = collections.defaultdict(int)
storage_summary: Dict[str, int] = collections.Counter()
+ def sum_counts():
+ return sum(counts.values())
+
+ def sum_storage():
+ return sum(storage_summary[f"{object_type}:add"] for object_type in counts)
+
+ def maybe_log_summary(msg, force=False):
+ self.maybe_log(
+ msg + ": processed %s objects, %s are new",
+ sum_counts,
+ sum_storage,
+ force=force,
+ )
+
if self.has_contents():
for obj in self.get_contents():
if isinstance(obj, Content):
else:
raise TypeError(f"Unexpected content type: {obj}")
+ maybe_log_summary("In contents")
+
+ maybe_log_summary("After contents", force=True)
+
if self.has_directories():
for directory in self.get_directories():
counts["directory"] += 1
storage_summary.update(self.storage.directory_add([directory]))
+ maybe_log_summary("In directories")
+ maybe_log_summary("After directories", force=True)
+
if self.has_revisions():
for revision in self.get_revisions():
counts["revision"] += 1
storage_summary.update(self.storage.revision_add([revision]))
+ maybe_log_summary("In revisions")
+ maybe_log_summary("After revisions", force=True)
+
if self.has_releases():
for release in self.get_releases():
counts["release"] += 1
storage_summary.update(self.storage.release_add([release]))
+ maybe_log_summary("In releases")
snapshot = self.get_snapshot()
counts["snapshot"] += 1
self.statsd.increment("filtered_objects_total_sum", filtered, tags=tags)
self.statsd.increment("filtered_objects_total_count", total, tags=tags)
- self.log.info(
- "Fetched %d objects; %d are new",
- sum(counts.values()),
- sum(storage_summary[f"{object_type}:add"] for object_type in counts),
- )
+ maybe_log_summary("After snapshot", force=True)