Skip to content

dp3.task_processing.task_hooks

TaskGenericHooksContainer

TaskGenericHooksContainer(log: logging.Logger, elog: EventGroupType)

Container for generic hooks

Possible hooks:

  • on_task_start: receives Task, no return value requirements
Source code in dp3/task_processing/task_hooks.py
def __init__(self, log: logging.Logger, elog: EventGroupType):
    self.log = log.getChild("genericHooks")
    self.elog = elog

    self._on_start = []

TaskEntityHooksContainer

TaskEntityHooksContainer(entity: str, model_spec: ModelSpec, log: logging.Logger, elog: EventGroupType)

Container for entity hooks

Possible hooks:

  • allow_entity_creation: receives eid and Task, may prevent entity record creation (by returning False)
  • on_entity_creation: receives eid and Task, may return list of DataPointTasks
Source code in dp3/task_processing/task_hooks.py
def __init__(
    self, entity: str, model_spec: ModelSpec, log: logging.Logger, elog: EventGroupType
):
    self.entity = entity
    self.log = log.getChild(f"entityHooks.{entity}")
    self.elog = elog
    self.model_spec = model_spec

    self._allow_creation = []
    self._on_creation = []

TaskAttrHooksContainer

TaskAttrHooksContainer(entity: str, attr: str, attr_type: AttrType, model_spec: ModelSpec, log: logging.Logger, elog: EventGroupType)

Container for attribute hooks

Possible hooks:

  • on_new_plain, on_new_observation, on_new_ts_chunk: receives eid and DataPointBase, may return a list of DataPointTasks
Source code in dp3/task_processing/task_hooks.py
def __init__(
    self,
    entity: str,
    attr: str,
    attr_type: AttrType,
    model_spec: ModelSpec,
    log: logging.Logger,
    elog: EventGroupType,
):
    self.entity = entity
    self.attr = attr
    self.log = log.getChild(f"attributeHooks.{entity}.{attr}")
    self.elog = elog
    self.model_spec = model_spec

    if attr_type == AttrType.PLAIN:
        self.on_new_hook_type = "on_new_plain"
    elif attr_type == AttrType.OBSERVATIONS:
        self.on_new_hook_type = "on_new_observation"
    elif attr_type == AttrType.TIMESERIES:
        self.on_new_hook_type = "on_new_ts_chunk"
    else:
        raise ValueError(f"Invalid attribute type '{attr_type}'")

    self._on_new = []