Skip to content

dp3.core.link_manager

Core module managing links between entities.

LinkManager(db: EntityDatabase, platform_config: PlatformConfig, registrar: CallbackRegistrar)

Manages the shared Link cache and updates links after entity deletion.

Source code in dp3/core/link_manager.py
def __init__(
    self,
    db: EntityDatabase,
    platform_config: PlatformConfig,
    registrar: CallbackRegistrar,
):
    self.log = logging.getLogger("LinkManager")
    self.model_spec = platform_config.model_spec
    self.db = db

    self.cache = self.db.get_module_cache("Link")
    self._setup_cache_indexes()
    self.db.register_on_entity_delete(
        self.remove_link_cache_of_deleted, self.remove_link_cache_of_many_deleted
    )
    self.max_date = datetime.max.replace(tzinfo=None)
    for (entity, attr), spec in self.model_spec.relations.items():
        if spec.t == AttrType.PLAIN:
            if spec.is_iterable:
                func = self.add_iterable_plain_to_link_cache
            else:
                func = self.add_plain_to_link_cache
            registrar.register_attr_hook(
                "on_new_plain", partial(func, spec.relation_to), entity, attr
            )
        elif spec.t == AttrType.OBSERVATIONS:
            if spec.is_iterable:
                func = self.add_iterable_observation_to_link_cache
            else:
                func = self.add_observation_to_link_cache
            registrar.register_attr_hook(
                "on_new_observation",
                partial(func, spec.relation_to, spec.history_params.post_validity),
                entity,
                attr,
            )