Skip to content

dp3.testing

Testing helpers for DP3 applications.

DP3ModuleTestCase

Bases: ModuleAssertions, TestCase, Generic[ModuleT]

Base class for unit tests of DP3 secondary modules.

By default the app configuration directory is read from DP3_CONFIG_DIR. Subclasses may set config_dir explicitly when they need a fixed fixture config.

make_timeseries_datapoint

make_timeseries_datapoint(etype: str, eid: Any, attr: str, v: Mapping[str, Sequence[Any]], src: str = 'test', t1: datetime | None = None, t2: datetime | None = None, **fields) -> DataPointBase

Create a validated timeseries datapoint.

For regular timeseries attributes, t2 is inferred when omitted by using the attribute's configured time_step and the number of samples in v: t2 = t1 + len(series) * time_step. For irregular timeseries, t1 is inferred from the first time value when omitted. For irregular-interval timeseries, t1 is inferred from the first time_first value when omitted.

Source code in dp3/testing/case.py
def make_timeseries_datapoint(
    self,
    etype: str,
    eid: Any,
    attr: str,
    v: Mapping[str, Sequence[Any]],
    src: str = "test",
    t1: datetime | None = None,
    t2: datetime | None = None,
    **fields,
) -> DataPointBase:
    """Create a validated timeseries datapoint.

    For regular timeseries attributes, ``t2`` is inferred when omitted by using the
    attribute's configured ``time_step`` and the number of samples in ``v``:
    ``t2 = t1 + len(series) * time_step``. For irregular timeseries, ``t1`` is inferred from
    the first ``time`` value when omitted. For irregular-interval timeseries, ``t1`` is inferred
    from the first ``time_first`` value when omitted.
    """
    attr_spec = self.model_spec.attributes[etype, attr]
    if attr_spec.t != AttrType.TIMESERIES:
        raise ValueError(f"Attribute {etype}/{attr} is not a timeseries attribute.")

    values = dict(v)
    t1 = t1 or self._infer_timeseries_t1(attr_spec, values) or datetime.now(UTC)
    if t2 is None and attr_spec.timeseries_type == "regular":
        time_step = attr_spec.timeseries_params.time_step
        if time_step is None:
            raise ValueError(f"Regular timeseries attribute {etype}/{attr} has no time_step.")
        t2 = t1 + self._timeseries_length(values) * time_step

    data = {"t1": t1, **fields}
    if t2 is not None:
        data["t2"] = t2
    return self._make_datapoint(etype, eid, attr, values, src=src, **data)

registered

registered(kind: str | None = None, **fields) -> list[HookRegistration]

Return registrations matching kind and the supplied registration fields.

Source code in dp3/testing/case.py
def registered(self, kind: str | None = None, **fields) -> list[HookRegistration]:
    """Return registrations matching ``kind`` and the supplied registration fields."""
    return [
        registration
        for registration in self.registrar.registrations
        if self._registration_matches(registration, kind, fields)
    ]

assert_registered

assert_registered(kind: str, **fields) -> HookRegistration

Assert that at least one callback registration matches the supplied fields.

Source code in dp3/testing/case.py
def assert_registered(self, kind: str, **fields) -> HookRegistration:
    """Assert that at least one callback registration matches the supplied fields."""
    matches = self.registered(kind, **fields)
    if not matches:
        self.fail(
            f"No registration matched kind={kind!r}, fields={fields!r}. "
            f"Registered callbacks: {self.registrar.registrations!r}"
        )
    return matches[0]

assert_registered_once

assert_registered_once(kind: str, **fields) -> HookRegistration

Assert that exactly one callback registration matches the supplied fields.

Source code in dp3/testing/case.py
def assert_registered_once(self, kind: str, **fields) -> HookRegistration:
    """Assert that exactly one callback registration matches the supplied fields."""
    matches = self.registered(kind, **fields)
    if len(matches) != 1:
        self.fail(
            f"Expected one registration matching kind={kind!r}, fields={fields!r}; "
            f"found {len(matches)}: {matches!r}"
        )
    return matches[0]

assert_registered_attrs

assert_registered_attrs(entity: str, expected_attrs: Iterable[str], *, kind: str = 'on_new_attr', exact: bool = True) -> list[HookRegistration]

Assert that attribute hook registrations exist for the supplied entity attributes.

Source code in dp3/testing/case.py
def assert_registered_attrs(
    self,
    entity: str,
    expected_attrs: Iterable[str],
    *,
    kind: str = "on_new_attr",
    exact: bool = True,
) -> list[HookRegistration]:
    """Assert that attribute hook registrations exist for the supplied entity attributes."""
    expected = set(expected_attrs)
    matches = self.registered(kind, entity=entity)
    actual = {registration.attr for registration in matches if registration.attr is not None}
    if exact:
        self.assertEqual(expected, actual)
    else:
        missing = expected - actual
        if missing:
            self.fail(f"Missing registrations for attributes: {sorted(missing)!r}")
    return [registration for registration in matches if registration.attr in expected]

assert_scheduler_registered

assert_scheduler_registered(**fields) -> HookRegistration

Assert that at least one scheduler callback registration matches the supplied fields.

Source code in dp3/testing/case.py
def assert_scheduler_registered(self, **fields) -> HookRegistration:
    """Assert that at least one scheduler callback registration matches the supplied fields."""
    return self.assert_registered("scheduler", **fields)

HookRegistration dataclass

HookRegistration(kind: str, hook: Callable, entity: str | None = None, attr: str | None = None, hook_type: str | None = None, hook_id: str | None = None, entity_type: str | None = None, attr_type: str | None = None, depends_on: list[list[str]] = list(), may_change: list[list[str]] = list(), refresh: Any = None, period: Any = None, deprecated: bool = False, extra: dict[str, Any] = dict())

Captured callback registration made by a secondary module.

TestCallbackRegistrar

TestCallbackRegistrar(model_spec: ModelSpec, log: Logger | None = None, update_batch_period: Any = None)

Callback registrar implementation for module unit tests.

Source code in dp3/testing/registrar.py
def __init__(
    self,
    model_spec: ModelSpec,
    log: logging.Logger | None = None,
    update_batch_period: Any = None,
):
    self.model_spec = model_spec
    self.log = log or logging.getLogger(self.__class__.__name__)
    self.update_batch_period = update_batch_period
    self.registrations: list[HookRegistration] = []
    self._task_hooks: defaultdict[str, list[HookRegistration]] = defaultdict(list)
    self._allow_creation_hooks: defaultdict[str, list[HookRegistration]] = defaultdict(list)
    self._on_creation_hooks: defaultdict[str, list[HookRegistration]] = defaultdict(list)
    self._attr_hooks: defaultdict[tuple[str, str], list[HookRegistration]] = defaultdict(list)
    self._snapshot_init_hooks: list[HookRegistration] = []
    self._snapshot_finalize_hooks: list[HookRegistration] = []
    self._periodic_record_hooks: defaultdict[
        tuple[float, str, bool], dict[str, HookRegistration]
    ] = defaultdict(dict)
    self._periodic_eid_hooks: defaultdict[
        tuple[float, str, bool], dict[str, HookRegistration]
    ] = defaultdict(dict)
    self._scheduler_jobs: list[HookRegistration] = []

    self._correlation_hooks = SnapshotCorrelationHookContainer(
        self.log, model_spec, DummyEventGroup()
    )
    self._timeseries_hooks = SnapshotTimeseriesHookContainer(
        self.log, model_spec, DummyEventGroup()
    )

get_scheduler_job

get_scheduler_job(job: int | str | Callable | HookRegistration) -> HookRegistration

Return a registered scheduler job by id, callable, or callable name.

Source code in dp3/testing/registrar.py
def get_scheduler_job(self, job: int | str | Callable | HookRegistration) -> HookRegistration:
    """Return a registered scheduler job by id, callable, or callable name."""
    if isinstance(job, int):
        for reg in self._scheduler_jobs:
            if reg.extra["job_id"] == job:
                return reg
        raise ValueError(f"No scheduler job has id {job!r}.")
    if isinstance(job, HookRegistration):
        if job.kind != "scheduler":
            raise ValueError(f"Registration kind '{job.kind}' is not a scheduler job.")
        return job

    matches = [reg for reg in self._scheduler_jobs if _callable_matches(reg.hook, job)]
    if not matches:
        raise ValueError(f"No scheduler job matches {job!r}.")
    if len(matches) > 1:
        raise ValueError(f"Multiple scheduler jobs match {job!r}.")
    return matches[0]

resolve_config_dir

resolve_config_dir(config_dir: str | None = None, env_var: str = CONFIG_DIR_ENV) -> str

Return an absolute DP3 config directory path.

Explicit config_dir values take precedence. If no explicit path is supplied, the path is read from env_var.

Source code in dp3/testing/config.py
def resolve_config_dir(config_dir: str | None = None, env_var: str = CONFIG_DIR_ENV) -> str:
    """Return an absolute DP3 config directory path.

    Explicit ``config_dir`` values take precedence. If no explicit path is supplied, the path is
    read from ``env_var``.
    """
    resolved = config_dir or os.environ.get(env_var)
    if not resolved:
        raise ValueError(
            f"DP3 module tests require a config directory. Set {env_var} or pass "
            "config_dir explicitly."
        )
    return os.path.abspath(resolved)