Skip to content

dp3.history_management.history_manager

SnapshotCleaningConfig

Bases: BaseModel

Configuration for snapshot cleaning.

Attributes:

Name Type Description
schedule CronExpression

Schedule for snapshot cleaning.

older_than ParsedTimedelta

Snapshots older than this will be deleted.

DPArchivationConfig

Bases: BaseModel

Configuration for datapoint archivation.

Attributes:

Name Type Description
schedule CronExpression

Schedule for datapoint archivation.

older_than ParsedTimedelta

Datapoints older than this will be archived.

archive_dir Optional[str]

Directory where to archive datapoints. Can be None to only delete them.

HistoryManagerConfig

Bases: BaseModel

Configuration for history manager.

Attributes:

Name Type Description
aggregation_schedule CronExpression

Schedule for master document aggregation.

datapoint_cleaning_schedule CronExpression

Schedule for datapoint cleaning.

mark_datapoints_schedule CronExpression

Schedule for marking datapoints in master docs.

snapshot_cleaning SnapshotCleaningConfig

Configuration for snapshot cleaning.

datapoint_archivation DPArchivationConfig

Configuration for datapoint archivation.

HistoryManager

HistoryManager(db: EntityDatabase, platform_config: PlatformConfig, registrar: CallbackRegistrar)
Source code in dp3/history_management/history_manager.py
def __init__(
    self, db: EntityDatabase, platform_config: PlatformConfig, registrar: CallbackRegistrar
) -> None:
    self.log = logging.getLogger("HistoryManager")

    self.db = db
    self.model_spec = platform_config.model_spec
    self.worker_index = platform_config.process_index
    self.num_workers = platform_config.num_processes
    self.config = HistoryManagerConfig.model_validate(
        platform_config.config.get("history_manager")
    )

    # Schedule master document aggregation
    registrar.scheduler_register(
        self.aggregate_master_docs, **self.config.aggregation_schedule.model_dump()
    )

    if platform_config.process_index != 0:
        self.log.debug(
            "History management will be disabled in this worker to avoid race conditions."
        )
        return

    # Schedule datapoints cleaning
    datapoint_marking_schedule = self.config.mark_datapoints_schedule
    registrar.scheduler_register(
        self.mark_datapoints_in_master_docs, **datapoint_marking_schedule.model_dump()
    )

    datapoint_cleaning_schedule = self.config.datapoint_cleaning_schedule
    registrar.scheduler_register(
        self.delete_old_dps, **datapoint_cleaning_schedule.model_dump()
    )

    snapshot_cleaning_schedule = self.config.snapshot_cleaning.schedule
    self.keep_snapshot_delta = self.config.snapshot_cleaning.older_than
    registrar.scheduler_register(
        self.delete_old_snapshots, **snapshot_cleaning_schedule.model_dump()
    )

    # Schedule datapoint archivation
    archive_config = self.config.datapoint_archivation
    self.keep_raw_delta = archive_config.older_than
    if archive_config.archive_dir is not None:
        self.log_dir = self._ensure_log_dir(archive_config.archive_dir)
    else:
        self.log_dir = None
    registrar.scheduler_register(self.archive_old_dps, **archive_config.schedule.model_dump())

delete_old_dps

delete_old_dps()

Deletes old data points from master collection.

Source code in dp3/history_management/history_manager.py
def delete_old_dps(self):
    """Deletes old data points from master collection."""
    self.log.debug("Deleting old records ...")

    for etype_attr, attr_conf in self.model_spec.attributes.items():
        etype, attr_name = etype_attr
        max_age = None

        if attr_conf.t == AttrType.OBSERVATIONS:
            max_age = attr_conf.history_params.max_age
        elif attr_conf.t == AttrType.TIMESERIES:
            max_age = attr_conf.timeseries_params.max_age

        if not max_age:
            continue

        t_old = datetime.utcnow() - max_age

        try:
            self.db.delete_old_dps(etype, attr_name, t_old)
        except DatabaseError as e:
            self.log.error(e)

mark_datapoints_in_master_docs

mark_datapoints_in_master_docs()

Marks the timestamps of all datapoints in master documents.

Source code in dp3/history_management/history_manager.py
def mark_datapoints_in_master_docs(self):
    """Marks the timestamps of all datapoints in master documents."""
    self.log.debug("Marking the datapoint timestamps for all entity records ...")

    for entity, attr_conf in self.model_spec.entity_attributes.items():
        attrs_to_mark = []
        for attr, conf in attr_conf.items():
            if conf.t in AttrType.OBSERVATIONS | AttrType.TIMESERIES:
                attrs_to_mark.append(attr)

        if not attrs_to_mark:
            continue
        try:
            res = self.db.mark_all_entity_dps_t2(entity, attrs_to_mark)
            self.log.debug("Marked %s records of %s", res.modified_count, entity)
        except DatabaseError as e:
            self.log.error(e)

delete_old_snapshots

delete_old_snapshots()

Deletes old snapshots.

Source code in dp3/history_management/history_manager.py
def delete_old_snapshots(self):
    """Deletes old snapshots."""
    t_old = datetime.now() - self.keep_snapshot_delta
    self.log.debug("Deleting all snapshots before %s", t_old)

    deleted_total = 0
    try:
        deleted_total = self.db.snapshots.delete_old(t_old)
    except DatabaseError as e:
        self.log.exception(e)
    self.log.debug("Deleted %s snapshots in total.", deleted_total)

archive_old_dps

archive_old_dps()

Archives old data points from raw collection.

Updates already saved archive files, if present.

Source code in dp3/history_management/history_manager.py
def archive_old_dps(self):
    """
    Archives old data points from raw collection.

    Updates already saved archive files, if present.
    """

    t_old = datetime.utcnow() - self.keep_raw_delta
    self.log.debug("Archiving all records before %s ...", t_old)

    for etype in self.model_spec.entities:
        res = self.db.move_raw_to_archive(etype)
        if res:
            self.log.info("Current %s raw collection was moved to archive: %s", etype, res)

    max_date, min_date, total_dps = self._get_raw_dps_summary(t_old)
    if total_dps == 0:
        self.log.debug("Found no datapoints to archive.")
        return
    self.log.debug(
        "Found %s datapoints to archive in the range %s - %s", total_dps, min_date, max_date
    )

    if self.log_dir is None:
        self.log.debug("No archive directory specified, skipping archivation.")
    else:
        min_date_string = min_date.strftime("%Y%m%dT%H%M%S")
        max_date_string = max_date.strftime("%Y%m%dT%H%M%S")
        date_logfile = self.log_dir / f"dp-log-{min_date_string}--{max_date_string}.jsonl"
        datapoints = 0

        with open(date_logfile, "w", encoding="utf-8") as logfile:
            for etype in self.model_spec.entities:
                for result_cursor in self.db.get_archive(etype, after=min_date, before=t_old):
                    for dp in result_cursor:
                        logfile.write(f"{json.dumps(self._reformat_dp(dp), cls=DP3Encoder)}\n")
                        datapoints += 1

        self.log.info("Archived %s datapoints to %s", datapoints, date_logfile)
        compress_file(date_logfile)
        os.remove(date_logfile)
        self.log.debug("Saved archive was compressed")

    deleted_count = 0
    for etype in self.model_spec.entities:
        for deleted_res in self.db.delete_old_archived_dps(etype, before=t_old):
            deleted_count += deleted_res.deleted_count
    self.log.info("Deleted %s datapoints", deleted_count)

    dropped_count = 0
    for etype in self.model_spec.entities:
        dropped_count += self.db.drop_empty_archives(etype)
    if dropped_count:
        self.log.info("Dropped %s empty archive collection(s)", dropped_count)

aggregate_multivalue_dp_history_on_equal

aggregate_multivalue_dp_history_on_equal(history: list[dict], spec: AttrSpecObservations)

Merge multivalue datapoints in the history with equal values and overlapping time validity.

Avergages the confidence. Will keep a pool of "active" datapoints and merge them with the next datapoint if they have the same value and overlapping time validity.

FIXME

The average calculation only works for the current iteration, but for the next call of the algorithm, the count of aggregated datapoints is lost.

Source code in dp3/history_management/history_manager.py
def aggregate_multivalue_dp_history_on_equal(history: list[dict], spec: AttrSpecObservations):
    """
    Merge multivalue datapoints in the history with equal values and overlapping time validity.

    Avergages the confidence.
    Will keep a pool of "active" datapoints and merge them with the next datapoint
    if they have the same value and overlapping time validity.

    FIXME:
      The average calculation only works for the current iteration,
      but for the next call of the algorithm, the count of aggregated datapoints is lost.
    """
    history = sorted(history, key=lambda x: x["t1"])
    aggregated_history = []
    pre = spec.history_params.pre_validity
    post = spec.history_params.post_validity

    if spec.data_type.hashable:
        current_dps = {}

        for dp in history:
            v = dp["v"]
            if v in current_dps:
                current_dp = current_dps[v]
                if current_dp["t2"] + post >= dp["t1"] - pre:  # Merge with current_dp
                    current_dp["t2"] = max(dp["t2"], current_dp["t2"])
                    current_dp["c"] += dp["c"]
                    current_dp["cnt"] += 1
                else:  # No overlap, finalize current_dp and reset
                    current_dp["c"] /= current_dp["cnt"]
                    del current_dp["cnt"]
                    aggregated_history.append(current_dp)
                    current_dps[v] = dp
                    current_dps[v]["cnt"] = 1
            else:  # New value, finalize initialize current_dp
                current_dps[v] = dp
                current_dps[v]["cnt"] = 1

        for _v, current_dp in current_dps.items():  # Finalize remaining dps
            current_dp["c"] /= current_dp["cnt"]
            del current_dp["cnt"]
            aggregated_history.append(current_dp)
        return aggregated_history
    else:
        current_dps = []

        for dp in history:
            v = dp["v"]
            for i, current_dp in enumerate(current_dps):
                if current_dp["v"] != v:
                    continue

                if current_dp["t2"] + post >= dp["t1"] - pre:  # Merge with current_dp
                    current_dp["t2"] = max(dp["t2"], current_dp["t2"])
                    current_dp["c"] += dp["c"]
                    current_dp["cnt"] += 1
                else:  # No overlap, finalize current_dp and reset
                    current_dp["c"] /= current_dp["cnt"]
                    del current_dp["cnt"]
                    aggregated_history.append(current_dp)
                    dp["cnt"] = 1
                    current_dps[i] = dp
                break
            else:  # New value, finalize initialize current_dp
                dp["cnt"] = 1
                current_dps.append(dp)

        for current_dp in current_dps:  # Finalize remaining dps
            current_dp["c"] /= current_dp["cnt"]
            del current_dp["cnt"]
            aggregated_history.append(current_dp)
        return aggregated_history

aggregate_dp_history_on_equal

aggregate_dp_history_on_equal(history: list[dict], spec: ObservationsHistoryParams)

Merge datapoints in the history with equal values and overlapping time validity.

Avergages the confidence.

FIXME

The average calculation only works for the current iteration, but for the next call of the algorithm, the count of aggregated datapoints is lost.

Source code in dp3/history_management/history_manager.py
def aggregate_dp_history_on_equal(history: list[dict], spec: ObservationsHistoryParams):
    """
    Merge datapoints in the history with equal values and overlapping time validity.

    Avergages the confidence.

    FIXME:
      The average calculation only works for the current iteration,
      but for the next call of the algorithm, the count of aggregated datapoints is lost.
    """
    history = sorted(history, key=lambda x: x["t1"])
    aggregated_history = []
    current_dp = None
    merged_cnt = 0
    pre = spec.pre_validity
    post = spec.post_validity

    for dp in history:
        if not current_dp:
            current_dp = dp
            merged_cnt += 1
            continue

        if current_dp["v"] == dp["v"] and current_dp["t2"] + post >= dp["t1"] - pre:
            current_dp["t2"] = max(dp["t2"], current_dp["t2"])
            current_dp["c"] += dp["c"]
            merged_cnt += 1
        else:
            aggregated_history.append(current_dp)
            current_dp["c"] /= merged_cnt

            merged_cnt = 1
            current_dp = dp
    if current_dp:
        current_dp["c"] /= merged_cnt
        aggregated_history.append(current_dp)
    return aggregated_history