Skip to content

dp3.snapshots.snapshot_hooks

Module managing registered hooks and their dependencies on one another.

SnapshotTimeseriesHookContainer

SnapshotTimeseriesHookContainer(log: logging.Logger, model_spec: ModelSpec, elog: EventGroupType)

Container for timeseries analysis hooks

Source code in dp3/snapshots/snapshot_hooks.py
def __init__(self, log: logging.Logger, model_spec: ModelSpec, elog: EventGroupType):
    self.log = log.getChild("TimeseriesHooks")
    self.elog = elog
    self.model_spec = model_spec

    self._hooks = defaultdict(list)

register

register(hook: Callable[[str, str, list[dict]], list[DataPointTask]], entity_type: str, attr_type: str)

Registers passed timeseries hook to be called during snapshot creation.

Binds hook to specified entity_type and attr_type (though same hook can be bound multiple times). If entity_type and attr_type do not specify a valid timeseries attribute, a ValueError is raised. Args: hook: hook callable should expect entity_type, attr_type and attribute history as arguments and return a list of Task objects. entity_type: specifies entity type attr_type: specifies attribute type

Source code in dp3/snapshots/snapshot_hooks.py
def register(
    self,
    hook: Callable[[str, str, list[dict]], list[DataPointTask]],
    entity_type: str,
    attr_type: str,
):
    """
    Registers passed timeseries hook to be called during snapshot creation.

    Binds hook to specified entity_type and attr_type (though same hook can be bound
    multiple times).
    If entity_type and attr_type do not specify a valid timeseries attribute,
    a ValueError is raised.
    Args:
        hook: `hook` callable should expect entity_type, attr_type and attribute
            history as arguments and return a list of `Task` objects.
        entity_type: specifies entity type
        attr_type: specifies attribute type
    """
    if (entity_type, attr_type) not in self.model_spec.attributes:
        raise ValueError(f"Attribute '{attr_type}' of entity '{entity_type}' does not exist.")
    spec = self.model_spec.attributes[entity_type, attr_type]
    if spec.t != AttrType.TIMESERIES:
        raise ValueError(f"'{entity_type}.{attr_type}' is not a timeseries, but '{spec.t}'")
    self._hooks[entity_type, attr_type].append(hook)
    self.log.debug(f"Added hook: '{get_func_name(hook)}'")

run

run(entity_type: str, attr_type: str, attr_history: list[dict]) -> list[DataPointTask]

Runs registered hooks.

Source code in dp3/snapshots/snapshot_hooks.py
def run(
    self, entity_type: str, attr_type: str, attr_history: list[dict]
) -> list[DataPointTask]:
    """Runs registered hooks."""
    tasks = []
    with task_context(self.model_spec):
        for hook in self._hooks[entity_type, attr_type]:
            try:
                new_tasks = hook(entity_type, attr_type, attr_history)
                tasks.extend(new_tasks)
            except Exception as e:
                self.elog.log("module_error")
                self.log.error(f"Error during running hook {hook}: {e}")
    return tasks

SnapshotCorrelationHookContainer

SnapshotCorrelationHookContainer(log: logging.Logger, model_spec: ModelSpec, elog: EventGroupType)

Container for data fusion and correlation hooks.

Source code in dp3/snapshots/snapshot_hooks.py
def __init__(self, log: logging.Logger, model_spec: ModelSpec, elog: EventGroupType):
    self.log = log.getChild("CorrelationHooks")
    self.elog = elog
    self.model_spec = model_spec

    self._hooks: defaultdict[str, list[tuple[str, Callable]]] = defaultdict(list)
    self._short_hook_ids: dict = {}

    self._dependency_graph = DependencyGraph(self.log)
    self.used_links = set()

register

register(hook: Callable[[str, dict], Union[None, list[DataPointTask]]], entity_type: str, depends_on: list[list[str]], may_change: list[list[str]]) -> str

Registers passed hook to be called during snapshot creation.

Binds hook to specified entity_type (though same hook can be bound multiple times).

If entity_type and attribute specifications are validated and ValueError is raised on failure. Args: hook: hook callable should expect entity type as str and its current values, including linked entities, as dict. Can optionally return a list of DataPointTask objects to perform. entity_type: specifies entity type depends_on: each item should specify an attribute that is depended on in the form of a path from the specified entity_type to individual attributes (even on linked entities). may_change: each item should specify an attribute that hook may change. specification format is identical to depends_on. Returns: Generated hook id.

Source code in dp3/snapshots/snapshot_hooks.py
def register(
    self,
    hook: Callable[[str, dict], Union[None, list[DataPointTask]]],
    entity_type: str,
    depends_on: list[list[str]],
    may_change: list[list[str]],
) -> str:
    """
    Registers passed hook to be called during snapshot creation.

    Binds hook to specified entity_type (though same hook can be bound multiple times).

    If entity_type and attribute specifications are validated
    and ValueError is raised on failure.
    Args:
        hook: `hook` callable should expect entity type as str
            and its current values, including linked entities, as dict.
            Can optionally return a list of DataPointTask objects to perform.
        entity_type: specifies entity type
        depends_on: each item should specify an attribute that is depended on
            in the form of a path from the specified entity_type to individual attributes
            (even on linked entities).
        may_change: each item should specify an attribute that `hook` may change.
            specification format is identical to `depends_on`.
    Returns:
        Generated hook id.
    """

    if entity_type not in self.model_spec.entities:
        raise ValueError(f"Entity '{entity_type}' does not exist.")

    self.used_links |= self._validate_attr_paths(entity_type, depends_on)
    self.used_links |= self._validate_attr_paths(entity_type, may_change)

    depends_on = self._get_attr_path_destinations(entity_type, depends_on)
    may_change = self._get_attr_path_destinations(entity_type, may_change)

    hook_args = f"({entity_type}, [{','.join(depends_on)}], [{','.join(may_change)}])"
    hook_id = f"{get_func_name(hook)}{hook_args}"
    self._short_hook_ids[hook_id] = hook_args
    self._dependency_graph.add_hook_dependency(hook_id, depends_on, may_change)

    self._hooks[entity_type].append((hook_id, hook))
    self._restore_hook_order(self._hooks[entity_type])

    self.log.info(f"Added hook: '{hook_id}'")
    return hook_id

run

run(entities: dict) -> list[DataPointTask]

Runs registered hooks.

Source code in dp3/snapshots/snapshot_hooks.py
def run(self, entities: dict) -> list[DataPointTask]:
    """Runs registered hooks."""
    entity_types = {etype for etype, _ in entities}
    hook_subset = [
        (hook_id, hook, etype) for etype in entity_types for hook_id, hook in self._hooks[etype]
    ]
    topological_order = self._dependency_graph.topological_order
    hook_subset.sort(key=lambda x: topological_order.index(x[0]))
    entities_by_etype = defaultdict(dict)
    for (etype, eid), values in entities.items():
        entities_by_etype[etype][eid] = values

    created_tasks = []

    with task_context(self.model_spec):
        for hook_id, hook, etype in hook_subset:
            short_id = hook_id if len(hook_id) < 160 else self._short_hook_ids[hook_id]
            for eid, entity_values in entities_by_etype[etype].items():
                self.log.debug("Running hook %s on entity %s", short_id, eid)
                try:
                    tasks = hook(etype, entity_values)
                    if tasks is not None and tasks:
                        created_tasks.extend(tasks)
                except Exception as e:
                    self.elog.log("module_error")
                    self.log.error(f"Error during running hook {hook_id}: {e}")
                    self.log.exception(e)

    return created_tasks

GraphVertex dataclass

GraphVertex(adj: list = list(), in_degree: int = 0, type: str = 'attr')

Vertex in a graph of dependencies

DependencyGraph

DependencyGraph(log)

Class representing a graph of dependencies between correlation hooks.

Source code in dp3/snapshots/snapshot_hooks.py
def __init__(self, log):
    self.log = log.getChild("DependencyGraph")

    # dictionary of adjacency lists for each edge
    self._vertices = defaultdict(GraphVertex)
    self.topological_order = []

add_hook_dependency

add_hook_dependency(hook_id: str, depends_on: list[str], may_change: list[str])

Add hook to dependency graph and recalculate if any cycles are created.

Source code in dp3/snapshots/snapshot_hooks.py
def add_hook_dependency(self, hook_id: str, depends_on: list[str], may_change: list[str]):
    """Add hook to dependency graph and recalculate if any cycles are created."""
    if hook_id in self._vertices:
        raise ValueError(f"Hook id '{hook_id}' already present in the vertices.")
    for path in depends_on:
        self.add_edge(path, hook_id)
    for path in may_change:
        self.add_edge(hook_id, path)
    self._vertices[hook_id].type = "hook"
    try:
        self.topological_sort()
    except ValueError as err:
        raise ValueError(f"Hook {hook_id} introduces a circular dependency.") from err
    self.check_multiple_writes()

add_edge

add_edge(id_from: Hashable, id_to: Hashable)

Add oriented edge between specified vertices.

Source code in dp3/snapshots/snapshot_hooks.py
def add_edge(self, id_from: Hashable, id_to: Hashable):
    """Add oriented edge between specified vertices."""
    self._vertices[id_from].adj.append(id_to)
    # Ensure vertex with 'id_to' exists to avoid iteration errors later.
    _ = self._vertices[id_to]

calculate_in_degrees

calculate_in_degrees()

Calculate number of incoming edges for each vertex. Time complexity O(V + E).

Source code in dp3/snapshots/snapshot_hooks.py
def calculate_in_degrees(self):
    """Calculate number of incoming edges for each vertex. Time complexity O(V + E)."""
    for vertex_node in self._vertices.values():
        vertex_node.in_degree = 0

    for vertex_node in self._vertices.values():
        for adjacent_name in vertex_node.adj:
            self._vertices[adjacent_name].in_degree += 1

topological_sort

topological_sort()

Implementation of Kahn's algorithm for topological sorting. Raises ValueError if there is a cycle in the graph.

See https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm

Source code in dp3/snapshots/snapshot_hooks.py
def topological_sort(self):
    """
    Implementation of Kahn's algorithm for topological sorting.
    Raises ValueError if there is a cycle in the graph.

    See https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm
    """
    self.calculate_in_degrees()
    queue = [(node_id, node) for node_id, node in self._vertices.items() if node.in_degree == 0]
    topological_order = []
    processed_vertices_cnt = 0

    while queue:
        curr_node_id, curr_node = queue.pop(0)
        topological_order.append(curr_node_id)

        # Decrease neighbouring nodes' in-degree by 1
        for neighbor in curr_node.adj:
            neighbor_node = self._vertices[neighbor]
            neighbor_node.in_degree -= 1
            # If in-degree becomes zero, add it to queue
            if neighbor_node.in_degree == 0:
                queue.append((neighbor, neighbor_node))

        processed_vertices_cnt += 1

    if processed_vertices_cnt != len(self._vertices):
        raise ValueError("Dependency graph contains a cycle.")
    else:
        self.topological_order = topological_order
        return topological_order