Skip to content

dp3.core.updater

Core module that executes periodic update callbacks.

UpdaterConfig

Bases: BaseModel

The configuration of the Updater module.

The periodic update is executed in smaller batches for better robustness. The batch size is dynamically adjusted based the total number of entities and the estimated growth rate.

Attributes:

Name Type Description
update_batch_cron CronExpression

A CRON expression for the periodic update.

update_batch_period ParsedTimedelta

The period of the periodic update. Should equal to the period of update_batch_cron.

cache_management_cron CronExpression

A CRON expression for the cache management.

cache_max_entries int

The maximum number of finished cache entries per thread_id.

UpdateThreadState

Bases: BaseModel

A cache item describing a state of one configured update thread.

Attributes:

Name Type Description
type Literal['state']

"state"

t_created datetime

Time of creation.

t_last_update datetime

Time of last update.

t_end datetime

Time of predicted period end.

processed int

The number of currently processed entities.

total int

The total number of entities.

iteration int

The current iteration.

total_iterations int

Total number of iterations.

etype str

Entity type.

period float

Period length in seconds.

eid_only bool

Whether only eids are passed to hooks.

hook_ids list[str]

Hook ids.

runtime_secs float

Total hook runtime in seconds.

thread_id property

thread_id: tuple[float, str, bool]

A tuple of (period, entity_type, eid_only).

new classmethod

new(hooks: dict, period: float, entity_type: str, eid_only: bool = False)

Create a new instance initialized with hooks and thread_id components.

Source code in dp3/core/updater.py
@classmethod
def new(cls, hooks: dict, period: float, entity_type: str, eid_only: bool = False):
    """Create a new instance initialized with hooks and thread_id components."""
    now = datetime.now()
    return cls(
        t_created=now,
        t_last_update=now,
        t_end=now + timedelta(seconds=period),
        period=period,
        etype=entity_type,
        eid_only=eid_only,
        hook_ids=hooks.keys(),
    )

id_attributes staticmethod

id_attributes()

A list of attributes which identify the state in cache.

Source code in dp3/core/updater.py
@staticmethod
def id_attributes():
    """A list of attributes which identify the state in cache."""
    return ["type", "period", "etype", "eid_only", "t_created"]

reset

reset()

Resets counters and timestamps.

Source code in dp3/core/updater.py
def reset(self):
    """Resets counters and timestamps."""
    now = datetime.now()
    self.t_created = now
    self.t_last_update = now
    self.t_end = now + timedelta(seconds=self.period)
    self.iteration = 0
    self.processed = 0
    self.runtime_secs = 0.0
    self.finished = False

UpdaterCache

UpdaterCache(cache_collection)

The cache collection contains the metadata documents with the state of the update process for each entity type.

Source code in dp3/core/updater.py
def __init__(self, cache_collection):
    self._cache = cache_collection
    self._setup_cache_indexes()

get_unfinished

get_unfinished() -> Iterator[UpdateThreadState]

Yields all unfinished cache entries from DB.

Source code in dp3/core/updater.py
def get_unfinished(self) -> Iterator[UpdateThreadState]:
    """Yields all unfinished cache entries from DB."""
    for state in self._cache.find({"type": "state", "finished": False}):
        yield UpdateThreadState.model_validate(state)

upsert

upsert(state: UpdateThreadState) -> UpdateResult

Update or insert a state entry into DB.

Source code in dp3/core/updater.py
def upsert(self, state: UpdateThreadState) -> UpdateResult:
    """Update or insert a state entry into DB."""
    state_dict = state.model_dump()
    filter_dict = {k: v for k, v in state_dict.items() if k in state.id_attributes()}
    update_dict = {
        "$set": {k: v for k, v in state_dict.items() if k not in state.id_attributes()}
    }
    return self._cache.update_one(filter_dict, update=update_dict, upsert=True)

register_management

register_management(scheduler: Scheduler, trigger: CronExpression, max_entries: int) -> int

Registers the cache management task with the scheduler.

Source code in dp3/core/updater.py
def register_management(
    self, scheduler: Scheduler, trigger: CronExpression, max_entries: int
) -> int:
    """Registers the cache management task with the scheduler."""
    return scheduler.register(
        self._manage_cache, func_args=[max_entries], **trigger.model_dump()
    )

Updater

Updater(db: EntityDatabase, task_queue_writer: TaskQueueWriter, platform_config: PlatformConfig, scheduler: Scheduler, elog: EventGroupType)

Executes periodic update callbacks.

Source code in dp3/core/updater.py
def __init__(
    self,
    db: EntityDatabase,
    task_queue_writer: TaskQueueWriter,
    platform_config: PlatformConfig,
    scheduler: Scheduler,
    elog: EventGroupType,
):
    self.log = logging.getLogger("Updater")
    self.elog = elog

    self.model_spec = platform_config.model_spec
    self.config = UpdaterConfig.model_validate(platform_config.config.get("updater", {}))
    self.db = db
    self.task_queue_writer = task_queue_writer
    self.scheduler = scheduler

    self.enabled = platform_config.process_index == 0

    if not self.enabled:
        return

    # Get state cache
    self.cache = UpdaterCache(self.db.get_module_cache("Updater"))
    self.cache.register_management(
        scheduler, self.config.cache_management_cron, self.config.cache_max_entries
    )

    self.update_thread_hooks = defaultdict(dict)

register_record_update_hook

register_record_update_hook(hook: Callable[[str, str, dict], list[DataPointTask]], hook_id: str, entity_type: str, period: ParsedTimedelta)

Registers a hook for periodic update of entities of the specified type.

The hook receives the entity type, the entity ID and the master record.

Source code in dp3/core/updater.py
@validate_call
def register_record_update_hook(
    self,
    hook: Callable[[str, str, dict], list[DataPointTask]],
    hook_id: str,
    entity_type: str,
    period: ParsedTimedelta,
):
    """Registers a hook for periodic update of entities of the specified type.

    The hook receives the entity type, the entity ID and the master record.
    """
    self._register_hook(hook, hook_id, entity_type, period.total_seconds(), eid_only=False)

register_eid_update_hook

register_eid_update_hook(hook: Callable[[str, str], list[DataPointTask]], hook_id: str, entity_type: str, period: ParsedTimedelta)

Registers a hook for periodic update of entities of the specified type.

The hook receives the entity type and the entity ID.

Source code in dp3/core/updater.py
@validate_call
def register_eid_update_hook(
    self,
    hook: Callable[[str, str], list[DataPointTask]],
    hook_id: str,
    entity_type: str,
    period: ParsedTimedelta,
):
    """Registers a hook for periodic update of entities of the specified type.

    The hook receives the entity type and the entity ID.
    """
    self._register_hook(hook, hook_id, entity_type, period.total_seconds(), eid_only=True)

start

start()

Starts the updater.

Will fetch the state of the updater from the cache and schedule the update threads.

Source code in dp3/core/updater.py
def start(self):
    """
    Starts the updater.

    Will fetch the state of the updater from the cache and schedule the update threads.
    """
    if not self.enabled:
        return

    # Get all unfinished progress states
    saved_states = {}
    for state in self.cache.get_unfinished():
        saved_states[state.thread_id] = state

    # Confirm all saved states have configured hooks, terminate if not
    for thread_id, state in saved_states.items():
        if thread_id not in self.update_thread_hooks:
            self.log.warning(
                "Previously configured hooks %s for '%s' entity with period: %ss "
                "match no current configuration, aborting update thread.",
                state.hook_ids,
                state.etype,
                state.period,
            )
            state.finished = True
            self.cache.upsert(state)
            continue

        # Find if any new hooks are added
        configured_hooks = self.update_thread_hooks[thread_id]
        saved_hook_ids = set(state.hook_ids)
        configured_hook_ids = set(configured_hooks.keys())
        new_hook_ids = configured_hook_ids - saved_hook_ids
        deleted_hook_ids = saved_hook_ids - configured_hook_ids

        # Update the state with new hooks
        if deleted_hook_ids or new_hook_ids:
            if deleted_hook_ids:
                self.log.warning(
                    "Previously configured hooks %s were deleted for entity '%s', period: %ss",
                    deleted_hook_ids,
                    state.etype,
                    state.period,
                )
            state.hook_ids = list(configured_hook_ids)
            self.cache.upsert(state)

    # Add newly configured hooks that are not in the saved states
    for thread_id, hooks in self.update_thread_hooks.items():
        if thread_id not in saved_states:
            state = UpdateThreadState.new(hooks, *thread_id)
            saved_states[thread_id] = state

    # Schedule the update threads
    for (period, entity_type, eid_only), state in saved_states.items():
        if state.finished:
            continue
        hooks = self.update_thread_hooks[(period, entity_type, eid_only)]
        if eid_only:
            processing_func = self._process_eid_update_batch
        else:
            processing_func = self._process_update_batch

        state.total = self.db.get_estimated_entity_count(entity_type)
        try:
            total_iterations = self._calculate_iteration_count(state.period)
            if state.iteration != 0 and total_iterations != state.total_iterations:
                self.log.info("The update period was changed, resetting iteration number")
                state.iteration = 0
            state.total_iterations = total_iterations
        except ValueError:
            self.log.error(
                "Invalid period configuration for thread: %s, "
                "the update batch period must be smaller than the total period ",
                state,
            )
            raise

        self.scheduler.register(
            processing_func,
            func_args=[entity_type, hooks, state],
            **self.config.update_batch_cron.model_dump(),
        )

    # Connect the queue writer
    self.task_queue_writer.connect()
    self.task_queue_writer.check()  # check presence of needed exchanges

stop

stop()

Stops the updater.

Source code in dp3/core/updater.py
def stop(self):
    """Stops the updater."""
    if not self.enabled:
        return

    self.task_queue_writer.disconnect()