Skip to content

dp3.common.control

Module enabling remote control of the platform's internal events.

Control

Control(platform_config: PlatformConfig)

Class enabling remote control of the platform's internal events.

Source code in dp3/common/control.py
def __init__(
    self,
    platform_config: PlatformConfig,
) -> None:
    self.log = logging.getLogger("Control")
    self.action_handlers: dict[ControlAction, Callable] = {}

    self.config = ControlConfig.model_validate(platform_config.config.get("control"))
    self.allowed_actions = set(self.config.allowed_actions)
    self.log.debug("Allowed actions: %s", self.allowed_actions)

    queue = f"{platform_config.app_name}-worker-{platform_config.process_index}-control"
    self.control_queue = TaskQueueReader(
        callback=self.process_control_task,
        parse_task=ControlMessage.model_validate_json,
        app_name=platform_config.app_name,
        worker_index=platform_config.process_index,
        rabbit_config=platform_config.config.get("processing_core.msg_broker", {}),
        queue=queue,
        priority_queue=queue,
        parent_logger=self.log,
    )

start

start()

Connect to RabbitMQ and start consuming from TaskQueue.

Source code in dp3/common/control.py
def start(self):
    """Connect to RabbitMQ and start consuming from TaskQueue."""
    unconfigured_handlers = self.allowed_actions - set(self.action_handlers)
    if unconfigured_handlers:
        raise ValueError(
            f"The following configured actions are missing handlers: {unconfigured_handlers}"
        )

    self.log.info("Connecting to RabbitMQ")
    self.control_queue.connect()
    self.control_queue.check()  # check presence of needed queues
    self.control_queue.start()

    self.log.debug(
        "Configured handlers: %s", ", ".join(get_func_name(f) for f in self.action_handlers)
    )

stop

stop()

Stop consuming from TaskQueue, disconnect from RabbitMQ.

Source code in dp3/common/control.py
def stop(self):
    """Stop consuming from TaskQueue, disconnect from RabbitMQ."""
    self.control_queue.stop()
    self.control_queue.disconnect()

set_action_handler

set_action_handler(action: ControlAction, handler: Callable)

Sets the handler for the given action

Source code in dp3/common/control.py
def set_action_handler(self, action: ControlAction, handler: Callable):
    """Sets the handler for the given action"""
    self.log.debug("Setting handler for action %s: %s", action, get_func_name(handler))
    self.action_handlers[action] = handler

process_control_task

process_control_task(msg_id, task: ControlMessage)

Acknowledges the received message and executes an action according to the task.

This function should not be called directly, but set as callback for TaskQueueReader.

Source code in dp3/common/control.py
def process_control_task(self, msg_id, task: ControlMessage):
    """
    Acknowledges the received message and executes an action according to the `task`.

    This function should not be called directly, but set as callback for TaskQueueReader.
    """
    self.control_queue.ack(msg_id)
    if task.action in self.allowed_actions:
        self.log.info("Executing action: %s", task.action)
        self.action_handlers[task.action](**task.kwargs)
        self.log.info("Action finished: %s", task.action)
    else:
        self.log.error("Action not allowed: %s", task.action)

refresh_on_entity_creation

refresh_on_entity_creation(task_distributor: TaskDistributor, task_executor: TaskExecutor, etype: str)

Refreshes hooks called on new entity creation for all entities in DB.

Source code in dp3/common/control.py
def refresh_on_entity_creation(
    task_distributor: TaskDistributor, task_executor: TaskExecutor, etype: str
):
    """Refreshes hooks called on new entity creation for all entities in DB."""
    tasks = task_executor.refresh_on_entity_creation(
        etype=etype,
        worker_id=task_distributor.process_index,
        worker_cnt=task_distributor.num_processes,
    )
    task_distributor.push_new_tasks(tasks)