Skip to content

dp3.history_management.telemetry

TelemetryReader

TelemetryReader(db: EntityDatabase)

Reader of telemetry data

Used by API. Not contained inside Telemetry class due to usage of CallbackRegistrar and all of it's requirements (doesn't make sense for API).

Source code in dp3/history_management/telemetry.py
def __init__(self, db: EntityDatabase) -> None:
    self.db = db
    self.cache_col = self.db.get_module_cache("Telemetry")

get_sources_validity

get_sources_validity() -> dict[str, datetime]

Return timestamps (datetimes) of current validity of all sources.

Source code in dp3/history_management/telemetry.py
def get_sources_validity(self) -> dict[str, datetime]:
    """Return timestamps (datetimes) of current validity of all sources."""
    src_data = self.cache_col.find({}).sort([("_id", ASCENDING)])

    return {src["_id"]: src["src_t"] for src in src_data}