dp3.snapshots.snapshooter ¶
Module managing creation of snapshots, enabling data correlation and saving snapshots to DB.
-
Snapshots are created periodically (user configurable period)
-
When a snapshot is created, several things need to happen:
- all registered timeseries processing modules must be called
- this should result in
observations
orplain
datapoints, which will be saved to db and forwarded in processing - current value must be computed for all observations
- load relevant section of observation's history and perform configured history analysis. Result = plain values
- load plain attributes saved in master collection
- A record of described plain data makes a
profile
- Profile is additionally extended by related entities
- Callbacks for data correlation and fusion should happen here
- Save the complete results into database as snapshots
SnapShooter ¶
SnapShooter(db: EntityDatabase, task_queue_writer: TaskQueueWriter, platform_config: PlatformConfig, scheduler: Scheduler, elog: Optional[EventGroupType] = None)
Class responsible for creating entity snapshots.
Source code in dp3/snapshots/snapshooter.py
start ¶
Connect to RabbitMQ and start consuming from TaskQueue.
Source code in dp3/snapshots/snapshooter.py
stop ¶
Stop consuming from TaskQueue, disconnect from RabbitMQ.
Source code in dp3/snapshots/snapshooter.py
register_timeseries_hook ¶
register_timeseries_hook(hook: Callable[[str, str, list[dict]], list[DataPointTask]], entity_type: str, attr_type: str)
Registers passed timeseries hook to be called during snapshot creation.
Binds hook to specified entity_type
and attr_type
(though same hook can be bound
multiple times).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook
|
Callable[[str, str, list[dict]], list[DataPointTask]]
|
|
required |
entity_type
|
str
|
specifies entity type |
required |
attr_type
|
str
|
specifies attribute type |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If entity_type and attr_type do not specify a valid timeseries attribute, a ValueError is raised. |
Source code in dp3/snapshots/snapshooter.py
register_correlation_hook ¶
register_correlation_hook(hook: Callable[[str, dict], Union[None, list[DataPointTask]]], entity_type: str, depends_on: list[list[str]], may_change: list[list[str]])
Registers passed hook to be called during snapshot creation.
Binds hook to specified entity_type (though same hook can be bound multiple times).
entity_type
and attribute specifications are validated, ValueError
is raised on failure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook
|
Callable[[str, dict], Union[None, list[DataPointTask]]]
|
|
required |
entity_type
|
str
|
specifies entity type |
required |
depends_on
|
list[list[str]]
|
each item should specify an attribute that is depended on in the form of a path from the specified entity_type to individual attributes (even on linked entities). |
required |
may_change
|
list[list[str]]
|
each item should specify an attribute that |
required |
Raises:
Type | Description |
---|---|
ValueError
|
On failure of specification validation. |
Source code in dp3/snapshots/snapshooter.py
register_run_init_hook ¶
Registers passed hook to be called before a run of snapshot creation begins.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook
|
Callable[[], list[DataPointTask]]
|
|
required |
Source code in dp3/snapshots/snapshooter.py
register_run_finalize_hook ¶
Registers passed hook to be called after a run of snapshot creation ends.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook
|
Callable[[], list[DataPointTask]]
|
|
required |
Source code in dp3/snapshots/snapshooter.py
make_snapshots ¶
Creates snapshots for all entities currently active in database.
Source code in dp3/snapshots/snapshooter.py
get_linked_entities ¶
Get weakly connected components from entity graph.
Source code in dp3/snapshots/snapshooter.py
process_snapshot_task ¶
Acknowledges the received message and makes a snapshot according to the task
.
This function should not be called directly, but set as callback for TaskQueueReader.
Source code in dp3/snapshots/snapshooter.py
make_snapshots_by_hash ¶
Make snapshots for all entities with routing key belonging to this worker.
Source code in dp3/snapshots/snapshooter.py
make_linkless_snapshot ¶
Make a snapshot for given entity master_record
and time
.
Runs timeseries and correlation hooks. The resulting snapshot is saved into DB.
Source code in dp3/snapshots/snapshooter.py
add_mirrored_links ¶
This function adds mirrored links to the dict with current values of an entity.
The links are added in the same format as normal links, i.e. as a list of dicts.
Source code in dp3/snapshots/snapshooter.py
make_snapshot ¶
Make a snapshot for entities and time specified by task
.
Runs timeseries and correlation hooks. The resulting snapshots are saved into DB.
Source code in dp3/snapshots/snapshooter.py
run_timeseries_processing ¶
- all registered timeseries processing modules must be called
- this should result in
observations
orplain
datapoints, which will be saved to db and forwarded in processing
Source code in dp3/snapshots/snapshooter.py
extend_master_record
staticmethod
¶
Update existing master record with datapoints from new tasks
Source code in dp3/snapshots/snapshooter.py
load_linked_entity_ids ¶
Loads the subgraph of entities linked to the current entity, returns a list of their types and ids.
Source code in dp3/snapshots/snapshooter.py
get_linked_entity_ids ¶
Returns a set of tuples (entity_type, entity_id) identifying entities linked by
current_values
.
Source code in dp3/snapshots/snapshooter.py
get_value_at_time ¶
get_value_at_time(attr_spec: AttrSpecObservations, attr_history, time: datetime) -> tuple[Any, float]
Get current value of an attribute from its history. Assumes multi_value = False
.
Source code in dp3/snapshots/snapshooter.py
get_multi_value_at_time ¶
get_multi_value_at_time(attr_spec: AttrSpecObservations, attr_history, time: datetime) -> tuple[list, list[float]]
Get current value of a multi_value attribute from its history.
Source code in dp3/snapshots/snapshooter.py
extrapolate_confidence
staticmethod
¶
extrapolate_confidence(datapoint: dict, time: datetime, history_params: ObservationsHistoryParams) -> float
Get the confidence value at given time.