Skip to content

dp3.common.task

Task

Bases: BaseModel, ABC

A generic task type class.

An abstraction for the task queue classes to depend upon.

routing_key abstractmethod

routing_key() -> str

Returns:

Type Description
str

A string to be used as a routing key between workers.

Source code in dp3/common/task.py
@abstractmethod
def routing_key(self) -> str:
    """
    Returns:
        A string to be used as a routing key between workers.
    """

hashed_routing_key

hashed_routing_key() -> int

Returns:

Type Description
int

An integer to be used as a hashed routing key between workers.

Source code in dp3/common/task.py
def hashed_routing_key(self) -> int:
    """
    Returns:
        An integer to be used as a hashed routing key between workers.
    """
    return HASH(self.routing_key())

as_message abstractmethod

as_message() -> str

Returns:

Type Description
str

A string representation of the object.

Source code in dp3/common/task.py
@abstractmethod
def as_message(self) -> str:
    """
    Returns:
        A string representation of the object.
    """

DataPointTask

DataPointTask(__pydantic_self__, **data: Any)

Bases: Task

DataPointTask

Contains single task to be pushed to TaskQueue and processed. Attributes: etype: Entity type eid: Entity id / key data_points: List of DataPoints to process tags: List of tags ttl_tokens: Dictionary of TTL tokens. delete: If True, delete entity

Set the model_spec context variable and initialize the Task.

See https://docs.pydantic.dev/latest/concepts/validators/#validation-context

Source code in dp3/common/task.py
def __init__(__pydantic_self__, **data: Any) -> None:
    """Set the `model_spec` context variable and initialize the Task.

    See https://docs.pydantic.dev/latest/concepts/validators/#validation-context
    """
    __pydantic_self__.__pydantic_validator__.validate_python(
        data,
        self_instance=__pydantic_self__,
        context=_init_context_var.get(),
    )

Snapshot

Bases: Task

Snapshot

Contains a list of entities, the meaning of which depends on the type. If type is "task", then the list contains linked entities for which a snapshot should be created. Otherwise type is "linked_entities", indicating which entities must be skipped in a parallelized creation of unlinked entities.

Attributes:

Name Type Description
entities list[EntityTuple]

List of (entity_type, entity_id)

time datetime

timestamp for snapshot creation

final bool

If True, this is the last linked snapshot for the given time

HASH

HASH(key: str) -> int

Hash function used to distribute tasks to worker processes. Args: key: to be hashed Returns: last 4 bytes of MD5

Source code in dp3/common/task.py
def HASH(key: str) -> int:
    """Hash function used to distribute tasks to worker processes.
    Args:
        key: to be hashed
    Returns:
        last 4 bytes of MD5
    """
    return int(hashlib.md5(key.encode("utf8")).hexdigest()[-4:], 16)

task_context

task_context(model_spec: ModelSpec) -> Iterator[None]

Context manager for setting the model_spec context variable.

Source code in dp3/common/task.py
@contextmanager
def task_context(model_spec: ModelSpec) -> Iterator[None]:
    """Context manager for setting the `model_spec` context variable."""
    token = _init_context_var.set({"model_spec": model_spec})
    try:
        yield
    finally:
        _init_context_var.reset(token)

parse_eids_from_cache

parse_eids_from_cache(model_spec: ModelSpec, link_entity_entries: list[str]) -> list[AnyEidT]

Parses entity IDs from the "Link" cache.

Parameters:

Name Type Description Default
model_spec ModelSpec

Model specification.

required
link_entity_entries list[str]

List of entity entries from the cache.

required

Returns:

Type Description
list[AnyEidT]

List of entity IDs.

Source code in dp3/common/task.py
def parse_eids_from_cache(model_spec: ModelSpec, link_entity_entries: list[str]) -> list[AnyEidT]:
    """Parses entity IDs from the "Link" cache.

    Args:
        model_spec: Model specification.
        link_entity_entries: List of entity entries from the cache.

    Returns:
        List of entity IDs.
    """
    raw = [entry.split("#", maxsplit=1) for entry in link_entity_entries]
    return [eid for etype, eid in validate_entities(model_spec, raw)]

parse_eid_tuples_from_cache

parse_eid_tuples_from_cache(model_spec: ModelSpec, link_entity_entries: list[str]) -> list[tuple[str, AnyEidT]]

Parses entity IDs from the "Link" cache.

Parameters:

Name Type Description Default
model_spec ModelSpec

Model specification.

required
link_entity_entries list[str]

List of entity entries from the cache.

required

Returns:

Type Description
list[tuple[str, AnyEidT]]

List of tuples (entity type, entity ID).

Source code in dp3/common/task.py
def parse_eid_tuples_from_cache(
    model_spec: ModelSpec, link_entity_entries: list[str]
) -> list[tuple[str, AnyEidT]]:
    """Parses entity IDs from the "Link" cache.

    Args:
        model_spec: Model specification.
        link_entity_entries: List of entity entries from the cache.

    Returns:
        List of tuples (entity type, entity ID).
    """
    raw = [entry.split("#", maxsplit=1) for entry in link_entity_entries]
    return validate_entities(model_spec, raw)