Skip to content

dp3.common.context

entity_context

entity_context(self_spec, entities: dict) -> Iterator[None]

Context manager for AttrSpec initialization.

Source code in dp3/common/context.py
@contextmanager
def entity_context(self_spec, entities: dict) -> Iterator[None]:
    """Context manager for AttrSpec initialization."""
    token = _init_attr_spec_context_var.set({"self": self_spec, "entities": entities})
    try:
        yield
    finally:
        _init_attr_spec_context_var.reset(token)

get_entity_context

get_entity_context() -> dict

Get entity spec context.

Source code in dp3/common/context.py
def get_entity_context() -> dict:
    """Get entity spec context."""
    cxt = _init_attr_spec_context_var.get()
    if cxt is None or not isinstance(cxt, dict):
        raise ValueError("Entity spec context is not set")
    return cxt