TaskDistributor(task_executor: TaskExecutor, platform_config: PlatformConfig, registrar: CallbackRegistrar, daemon_stop_lock: threading.Lock)
TaskDistributor uses task queues to distribute tasks between all running processes.
Tasks are assigned to worker processes based on hash of entity key, so each
entity is always processed by the same worker. Therefore, all requests
modifying a particular entity are done sequentially and no locking is
necessary.
Tasks that are assigned to the current process are passed to task_executor
for execution.
Parameters:
Source code in dp3/task_processing/task_distributor.py
| def __init__(
self,
task_executor: TaskExecutor,
platform_config: PlatformConfig,
registrar: CallbackRegistrar,
daemon_stop_lock: threading.Lock,
) -> None:
assert (
0 <= platform_config.process_index < platform_config.num_processes
), "process index must be smaller than number of processes"
self.log = logging.getLogger("TaskDistributor")
self.process_index = platform_config.process_index
self.num_processes = platform_config.num_processes
self.model_spec = platform_config.model_spec
self.daemon_stop_lock = daemon_stop_lock
self.rabbit_params = platform_config.config.get("processing_core.msg_broker", {})
self.entity_types = list(
platform_config.config.get("db_entities").keys()
) # List of configured entity types
self.running = False
# List of worker threads for processing the update requests
self._worker_threads = []
self.num_threads = platform_config.config.get("processing_core.worker_threads", 8)
# Internal queues for each worker
self._queues = [queue.Queue(10) for _ in range(self.num_threads)]
# Connections to main task queue
# Reader - reads tasks from a pair of queues (one pair per process)
# and distributes them to worker threads
self._task_queue_reader = TaskQueueReader(
callback=self._distribute_task,
parse_task=partial(parse_data_point_task, model_spec=self.model_spec),
app_name=platform_config.app_name,
worker_index=self.process_index,
rabbit_config=self.rabbit_params,
)
registrar.scheduler_register(self._task_queue_reader.watchdog, second="20,50")
# Writer - allows modules to write new tasks
self._task_queue_writer = TaskQueueWriter(
platform_config.app_name, self.num_processes, self.rabbit_params
)
self.task_executor = task_executor
# Object to store thread-local data (e.g. worker-thread index)
# (each thread sees different object contents)
self._current_thread_data = threading.local()
# Number of restarts of threads by watchdog
self._watchdog_restarts = 0
# Register watchdog to scheduler
registrar.scheduler_register(self._watchdog, second="*/30")
|
start
Run the worker threads and start consuming from TaskQueue.
Source code in dp3/task_processing/task_distributor.py
| def start(self) -> None:
"""Run the worker threads and start consuming from TaskQueue."""
self.log.info("Connecting to RabbitMQ")
self._task_queue_reader.connect()
self._task_queue_reader.check() # check presence of needed queues
self._task_queue_writer.connect()
self._task_queue_writer.check() # check presence of needed exchanges
self.log.info(f"Starting {self.num_threads} worker threads")
self.running = True
self._worker_threads = [
threading.Thread(
target=self._worker_func, args=(i,), name=f"Worker-{self.process_index}-{i}"
)
for i in range(self.num_threads)
]
for worker in self._worker_threads:
worker.start()
self.log.info("Starting consuming tasks from main queue")
self._task_queue_reader.start()
|
stop
Stop the worker threads.
Source code in dp3/task_processing/task_distributor.py
| def stop(self) -> None:
"""Stop the worker threads."""
self.log.info("Waiting for worker threads to finish their current tasks ...")
# Thread for printing debug messages about worker status
threading.Thread(target=self._dbg_worker_status_print, daemon=True).start()
# Stop receiving new tasks from global queue
self._task_queue_reader.stop()
# Signalize stop to worker threads
self.running = False
# Wait until all workers stopped
for worker in self._worker_threads:
worker.join()
self._task_queue_reader.disconnect()
self._task_queue_writer.disconnect()
# Cleanup
self._worker_threads = []
|
push_new_tasks
push_new_tasks(new_tasks)
Push new tasks (resulting from hooks) to the priority queue.
(priority queue is not limited in size, so put_task() never blocks; the normal
queue has limited size, so if it was used here, a deadlock could occur if all
workers try to push new tasks to a full queue)
Source code in dp3/task_processing/task_distributor.py
| def push_new_tasks(self, new_tasks):
"""Push new tasks (resulting from hooks) to the priority queue.
(priority queue is not limited in size, so put_task() never blocks; the normal
queue has limited size, so if it was used here, a deadlock could occur if all
workers try to push new tasks to a full queue)
"""
try:
for task in new_tasks:
self._task_queue_writer.put_task(task, priority=True)
except Exception as e:
self.log.error(f"Failed to push tasks created from hooks: {e}")
|