Skip to content

dp3.task_processing.task_distributor

TaskDistributor

TaskDistributor(task_executor: TaskExecutor, platform_config: PlatformConfig, registrar: CallbackRegistrar, daemon_stop_lock: threading.Lock)

TaskDistributor uses task queues to distribute tasks between all running processes.

Tasks are assigned to worker processes based on hash of entity key, so each entity is always processed by the same worker. Therefore, all requests modifying a particular entity are done sequentially and no locking is necessary.

Tasks that are assigned to the current process are passed to task_executor for execution.

Parameters:

Name Type Description Default
platform_config PlatformConfig

Platform config

required
task_executor TaskExecutor

Instance of TaskExecutor

required
registrar CallbackRegistrar

Interface for callback registration

required
daemon_stop_lock Lock

Lock used to control when the program stops. (see dp3.worker)

required
Source code in dp3/task_processing/task_distributor.py
def __init__(
    self,
    task_executor: TaskExecutor,
    platform_config: PlatformConfig,
    registrar: CallbackRegistrar,
    daemon_stop_lock: threading.Lock,
) -> None:
    assert (
        0 <= platform_config.process_index < platform_config.num_processes
    ), "process index must be smaller than number of processes"

    self.log = logging.getLogger("TaskDistributor")

    self.process_index = platform_config.process_index
    self.num_processes = platform_config.num_processes
    self.model_spec = platform_config.model_spec
    self.daemon_stop_lock = daemon_stop_lock

    self.rabbit_params = platform_config.config.get("processing_core.msg_broker", {})

    self.entity_types = list(
        platform_config.config.get("db_entities").keys()
    )  # List of configured entity types

    self.running = False

    # List of worker threads for processing the update requests
    self._worker_threads = []
    self.num_threads = platform_config.config.get("processing_core.worker_threads", 8)

    # Internal queues for each worker
    self._queues = [queue.Queue(10) for _ in range(self.num_threads)]

    # Connections to main task queue
    # Reader - reads tasks from a pair of queues (one pair per process)
    # and distributes them to worker threads
    self._task_queue_reader = TaskQueueReader(
        callback=self._distribute_task,
        parse_task=partial(parse_data_point_task, model_spec=self.model_spec),
        app_name=platform_config.app_name,
        worker_index=self.process_index,
        rabbit_config=self.rabbit_params,
    )
    registrar.scheduler_register(self._task_queue_reader.watchdog, second="20,50")

    # Writer - allows modules to write new tasks
    self._task_queue_writer = TaskQueueWriter(
        platform_config.app_name, self.num_processes, self.rabbit_params
    )
    self.task_executor = task_executor
    # Object to store thread-local data (e.g. worker-thread index)
    # (each thread sees different object contents)
    self._current_thread_data = threading.local()

    # Number of restarts of threads by watchdog
    self._watchdog_restarts = 0
    # Register watchdog to scheduler
    registrar.scheduler_register(self._watchdog, second="*/30")

start

start() -> None

Run the worker threads and start consuming from TaskQueue.

Source code in dp3/task_processing/task_distributor.py
def start(self) -> None:
    """Run the worker threads and start consuming from TaskQueue."""
    self.log.info("Connecting to RabbitMQ")
    self._task_queue_reader.connect()
    self._task_queue_reader.check()  # check presence of needed queues
    self._task_queue_writer.connect()
    self._task_queue_writer.check()  # check presence of needed exchanges

    self.log.info(f"Starting {self.num_threads} worker threads")
    self.running = True
    self._worker_threads = [
        threading.Thread(
            target=self._worker_func, args=(i,), name=f"Worker-{self.process_index}-{i}"
        )
        for i in range(self.num_threads)
    ]
    for worker in self._worker_threads:
        worker.start()

    self.log.info("Starting consuming tasks from main queue")
    self._task_queue_reader.start()

stop

stop() -> None

Stop the worker threads.

Source code in dp3/task_processing/task_distributor.py
def stop(self) -> None:
    """Stop the worker threads."""
    self.log.info("Waiting for worker threads to finish their current tasks ...")
    # Thread for printing debug messages about worker status
    threading.Thread(target=self._dbg_worker_status_print, daemon=True).start()

    # Stop receiving new tasks from global queue
    self._task_queue_reader.stop()

    # Signalize stop to worker threads
    self.running = False

    # Wait until all workers stopped
    for worker in self._worker_threads:
        worker.join()

    self._task_queue_reader.disconnect()
    self._task_queue_writer.disconnect()

    # Cleanup
    self._worker_threads = []

push_new_tasks

push_new_tasks(new_tasks)

Push new tasks (resulting from hooks) to the priority queue.

(priority queue is not limited in size, so put_task() never blocks; the normal queue has limited size, so if it was used here, a deadlock could occur if all workers try to push new tasks to a full queue)

Source code in dp3/task_processing/task_distributor.py
def push_new_tasks(self, new_tasks):
    """Push new tasks (resulting from hooks) to the priority queue.

    (priority queue is not limited in size, so put_task() never blocks; the normal
    queue has limited size, so if it was used here, a deadlock could occur if all
    workers try to push new tasks to a full queue)
    """
    try:
        for task in new_tasks:
            self._task_queue_writer.put_task(task, priority=True)
    except Exception as e:
        self.log.error(f"Failed to push tasks created from hooks: {e}")