Manages the shared Link cache and updates links after entity deletion.
Source code in dp3/core/link_manager.py
| def __init__(
self,
db: EntityDatabase,
platform_config: PlatformConfig,
registrar: CallbackRegistrar,
):
self.log = logging.getLogger("LinkManager")
self.model_spec = platform_config.model_spec
self.db = db
self.cache = self.db.get_module_cache("Link")
self._setup_cache_indexes()
self.db.register_on_entity_delete(
self.remove_link_cache_of_deleted, self.remove_link_cache_of_many_deleted
)
self.max_date = datetime.max.replace(tzinfo=None)
for (entity, attr), spec in self.model_spec.relations.items():
if spec.t == AttrType.PLAIN:
if spec.is_iterable:
func = self.add_iterable_plain_to_link_cache
else:
func = self.add_plain_to_link_cache
registrar.register_attr_hook(
"on_new_plain", partial(func, spec.relation_to), entity, attr
)
elif spec.t == AttrType.OBSERVATIONS:
if spec.is_iterable:
func = self.add_iterable_observation_to_link_cache
else:
func = self.add_observation_to_link_cache
registrar.register_attr_hook(
"on_new_observation",
partial(func, spec.relation_to, spec.history_params.post_validity),
entity,
attr,
)
|