dp3.task_processing.task_queue ¶
Functions to work with the main task queue (RabbitMQ)
There are two queues for each worker process: - "normal" queue for tasks added by other components, this has a limit of 100 tasks. - "priority" one for tasks added by workers themselves, this has no limit since workers mustn't be stopped by waiting for the queue.
These queues are presented as a single one by this wrapper. The TaskQueueReader first looks into the "priority" queue and only if there is no task waiting, it reads the normal one.
Tasks are distributed to worker processes (and threads) by hash of the entity which is to be modified. The destination queue is decided by the message source, so each source must know how many worker processes are there.
Exchange and queues must be declared externally!
Related configuration keys and their defaults: (should be part of global DP3 config files)
rabbitmq:
host: localhost
port: 5672
virtual_host: /
username: guest
password: guest
worker_processes: 1
RobustAMQPConnection ¶
Common TaskQueue wrapper, handles connection to RabbitMQ server with automatic reconnection. TaskQueueWriter and TaskQueueReader are derived from this.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rabbit_config
|
dict
|
RabbitMQ connection parameters, dict with following keys (all optional): host, port, virtual_host, username, password |
None
|
Source code in dp3/task_processing/task_queue.py
connect ¶
Create a connection (or reconnect after error).
If connection can't be established, try it again indefinitely.
Source code in dp3/task_processing/task_queue.py
TaskQueueWriter ¶
TaskQueueWriter(app_name: str, workers: int = 1, rabbit_config: dict = None, exchange: str = None, priority_exchange: str = None, parent_logger: logging.Logger = None)
Bases: RobustAMQPConnection
Writes tasks into main Task Queue
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app_name
|
str
|
DP3 application name (used as prefix for RMQ queues and exchanges) |
required |
workers
|
int
|
Number of worker processes in the system |
1
|
rabbit_config
|
dict
|
RabbitMQ connection parameters, dict with following keys (all optional): host, port, virtual_host, username, password |
None
|
exchange
|
str
|
Name of the exchange to write tasks to
(default: |
None
|
priority_exchange
|
str
|
Name of the exchange to write priority tasks to
(default: |
None
|
parent_logger
|
Logger
|
Logger to inherit prefix from. |
None
|
Source code in dp3/task_processing/task_queue.py
check ¶
Check that needed exchanges are declared, return True or raise RuntimeError.
If needed exchanges are not declared, reconnect and try again. (max 5 times)
Source code in dp3/task_processing/task_queue.py
broadcast_task ¶
Broadcast task to all workers
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task
|
Task
|
prepared task |
required |
priority
|
bool
|
if true, the task is placed into priority queue (should only be used internally by workers) |
False
|
Source code in dp3/task_processing/task_queue.py
put_task ¶
Put task (update_request) to the queue of corresponding worker
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task
|
Task
|
prepared task |
required |
priority
|
bool
|
if true, the task is placed into priority queue (should only be used internally by workers) |
False
|
Source code in dp3/task_processing/task_queue.py
TaskQueueReader ¶
TaskQueueReader(callback: Callable, parse_task: Callable[[str], Task], app_name: str, worker_index: int = 0, rabbit_config: dict = None, queue: str = None, priority_queue: Union[str, bool] = None, parent_logger: logging.Logger = None)
Bases: RobustAMQPConnection
TaskQueueReader consumes messages from two RabbitMQ queues (normal and priority one for given worker) and passes them to the given callback function.
Tasks from the priority queue are passed before the normal ones.
Each received message must be acknowledged by calling .ack(msg_tag)
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback
|
Callable
|
Function called when a message is received, prototype: func(tag, Task) |
required |
parse_task
|
Callable[[str], Task]
|
Function called to parse message body into a task, prototype: func(body) -> Task |
required |
app_name
|
str
|
DP3 application name (used as prefix for RMQ queues and exchanges) |
required |
worker_index
|
int
|
index of this worker (filled into DEFAULT_QUEUE string using .format() method) |
0
|
rabbit_config
|
dict
|
RabbitMQ connection parameters, dict with following keys (all optional): host, port, virtual_host, username, password |
None
|
queue
|
str
|
Name of RabbitMQ queue to read from (default: |
None
|
priority_queue
|
Union[str, bool]
|
Name of RabbitMQ queue to read from (priority messages)
or |
None
|
parent_logger
|
Logger
|
Logger to inherit prefix from. |
None
|
Source code in dp3/task_processing/task_queue.py
start ¶
Start receiving tasks.
Source code in dp3/task_processing/task_queue.py
stop ¶
Stop receiving tasks.
Source code in dp3/task_processing/task_queue.py
reconnect ¶
check ¶
Check that needed queues are declared, return True or raise RuntimeError.
If needed queues are not declared, reconnect and try again. (max 5 times)
Source code in dp3/task_processing/task_queue.py
ack ¶
Acknowledge processing of the message/task
Will reconnect by itself on channel error. Args: msg_tag: Message tag received as the first param of the callback function. Returns: Whether the message was acknowledged successfully and can be processed further.
Source code in dp3/task_processing/task_queue.py
watchdog ¶
Check whether both threads are running and perform a reset if not.
Register to be called periodically by scheduler.