Skip to content

dp3.task_processing.task_queue

Functions to work with the main task queue (RabbitMQ)

There are two queues for each worker process: - "normal" queue for tasks added by other components, this has a limit of 100 tasks. - "priority" one for tasks added by workers themselves, this has no limit since workers mustn't be stopped by waiting for the queue.

These queues are presented as a single one by this wrapper. The TaskQueueReader first looks into the "priority" queue and only if there is no task waiting, it reads the normal one.

Tasks are distributed to worker processes (and threads) by hash of the entity which is to be modified. The destination queue is decided by the message source, so each source must know how many worker processes are there.

Exchange and queues must be declared externally!

Related configuration keys and their defaults: (should be part of global DP3 config files)

rabbitmq:
  host: localhost
  port: 5672
  virtual_host: /
  username: guest
  password: guest

worker_processes: 1

RobustAMQPConnection

RobustAMQPConnection(rabbit_config: dict = None)

Common TaskQueue wrapper, handles connection to RabbitMQ server with automatic reconnection. TaskQueueWriter and TaskQueueReader are derived from this.

Parameters:

Name Type Description Default
rabbit_config dict

RabbitMQ connection parameters, dict with following keys (all optional): host, port, virtual_host, username, password

None
Source code in dp3/task_processing/task_queue.py
def __init__(self, rabbit_config: dict = None) -> None:
    rabbit_config = {} if rabbit_config is None else rabbit_config
    self.log = logging.getLogger("RobustAMQPConnection")
    self.conn_params = {
        "hostname": rabbit_config.get("host", "localhost"),
        "port": int(rabbit_config.get("port", 5672)),
        "virtual_host": rabbit_config.get("virtual_host", "/"),
        "username": rabbit_config.get("username", "guest"),
        "password": rabbit_config.get("password", "guest"),
    }
    self.connection: amqpstorm.Connection = None
    self.channel: amqpstorm.Channel = None
    self._connection_id = 0

connect

connect() -> None

Create a connection (or reconnect after error).

If connection can't be established, try it again indefinitely.

Source code in dp3/task_processing/task_queue.py
def connect(self) -> None:
    """Create a connection (or reconnect after error).

    If connection can't be established, try it again indefinitely.
    """
    if self.connection:
        self.connection.close()
    self._connection_id += 1

    attempts = 0
    while True:
        attempts += 1
        try:
            self.connection = amqpstorm.Connection(**self.conn_params)
            self.log.debug(
                "AMQP connection created, server: "
                "'{hostname}:{port}/{virtual_host}'".format_map(self.conn_params)
            )
            if attempts > 1:
                # This was a repeated attempt, print success message with ERROR level
                self.log.error("... it's OK now, we're successfully connected!")

            self.channel = self.connection.channel()
            self.channel.confirm_deliveries()
            self.channel.basic.qos(PREFETCH_COUNT)
            break
        except amqpstorm.AMQPError as e:
            sleep_time = RECONNECT_DELAYS[min(attempts, len(RECONNECT_DELAYS)) - 1]
            self.log.error(
                f"RabbitMQ connection error (will try to reconnect in {sleep_time}s): {e}"
            )
            time.sleep(sleep_time)
        except KeyboardInterrupt:
            break

TaskQueueWriter

TaskQueueWriter(app_name: str, workers: int = 1, rabbit_config: dict = None, exchange: str = None, priority_exchange: str = None, parent_logger: logging.Logger = None)

Bases: RobustAMQPConnection

Writes tasks into main Task Queue

Parameters:

Name Type Description Default
app_name str

DP3 application name (used as prefix for RMQ queues and exchanges)

required
workers int

Number of worker processes in the system

1
rabbit_config dict

RabbitMQ connection parameters, dict with following keys (all optional): host, port, virtual_host, username, password

None
exchange str

Name of the exchange to write tasks to (default: "<app-name>-main-task-exchange")

None
priority_exchange str

Name of the exchange to write priority tasks to (default: "<app-name>-priority-task-exchange")

None
parent_logger Logger

Logger to inherit prefix from.

None
Source code in dp3/task_processing/task_queue.py
def __init__(
    self,
    app_name: str,
    workers: int = 1,
    rabbit_config: dict = None,
    exchange: str = None,
    priority_exchange: str = None,
    parent_logger: logging.Logger = None,
) -> None:
    rabbit_config = {} if rabbit_config is None else rabbit_config
    assert isinstance(workers, int) and workers >= 1, "count of workers must be positive number"
    assert isinstance(exchange, str) or exchange is None, "exchange argument has to be string!"
    assert (
        isinstance(priority_exchange, str) or priority_exchange is None
    ), "priority_exchange has to be string"

    super().__init__(rabbit_config)

    if parent_logger is not None:
        self.log = parent_logger.getChild("TaskQueueWriter")
    else:
        self.log = logging.getLogger("TaskQueueWriter")

    if exchange is None:
        exchange = DEFAULT_EXCHANGE.format(app_name)
    if priority_exchange is None:
        priority_exchange = DEFAULT_PRIORITY_EXCHANGE.format(app_name)

    self.workers = workers
    self.exchange = exchange
    self.exchange_pri = priority_exchange

check

check() -> bool

Check that needed exchanges are declared, return True or raise RuntimeError.

If needed exchanges are not declared, reconnect and try again. (max 5 times)

Source code in dp3/task_processing/task_queue.py
def check(self) -> bool:
    """
    Check that needed exchanges are declared, return True or raise RuntimeError.

    If needed exchanges are not declared, reconnect and try again. (max 5 times)
    """
    for attempt, sleep_time in enumerate(RECONNECT_DELAYS):
        if self.check_exchange_existence(self.exchange) and self.check_exchange_existence(
            self.exchange_pri
        ):
            return True
        self.log.warning(
            "RabbitMQ exchange configuration doesn't match (attempt %d of %d, retrying in %ds)",
            attempt + 1,
            len(RECONNECT_DELAYS),
            sleep_time,
        )
        time.sleep(sleep_time)
        self.disconnect()
        self.connect()
    if not self.check_exchange_existence(self.exchange):
        raise ExchangeNotDeclared(self.exchange)
    if not self.check_exchange_existence(self.exchange_pri):
        raise ExchangeNotDeclared(self.exchange_pri)
    return True

broadcast_task

broadcast_task(task: Task, priority: bool = False) -> None

Broadcast task to all workers

Parameters:

Name Type Description Default
task Task

prepared task

required
priority bool

if true, the task is placed into priority queue (should only be used internally by workers)

False
Source code in dp3/task_processing/task_queue.py
def broadcast_task(self, task: Task, priority: bool = False) -> None:
    """
    Broadcast task to all workers

    Args:
        task: prepared task
        priority: if true, the task is placed into priority queue
            (should only be used internally by workers)
    """
    if not self.channel:
        self.connect()

    task_str = str(task) if len(str(task)) < 500 else f"{str(task)[:500]}...(truncated)"
    self.log.debug(f"Received new broadcast task: {task_str}")

    body = task.as_message()
    exchange = self.exchange_pri if priority else self.exchange

    for routing_key in range(self.workers):
        self._send_message(routing_key, exchange, body)

put_task

put_task(task: Task, priority: bool = False) -> None

Put task (update_request) to the queue of corresponding worker

Parameters:

Name Type Description Default
task Task

prepared task

required
priority bool

if true, the task is placed into priority queue (should only be used internally by workers)

False
Source code in dp3/task_processing/task_queue.py
def put_task(self, task: Task, priority: bool = False) -> None:
    """
    Put task (update_request) to the queue of corresponding worker

    Args:
        task: prepared task
        priority: if true, the task is placed into priority queue
            (should only be used internally by workers)
    """
    if not self.channel:
        self.connect()

    task_str = str(task) if len(str(task)) < 500 else f"{str(task)[:500]}...(truncated)"
    self.log.debug(f"Received new task: {task_str}")

    # Prepare routing key
    body = task.as_message()
    # index of the worker to send the task to
    routing_key = task.hashed_routing_key() % self.workers

    exchange = self.exchange_pri if priority else self.exchange
    self._send_message(routing_key, exchange, body)

TaskQueueReader

TaskQueueReader(callback: Callable, parse_task: Callable[[str], Task], app_name: str, worker_index: int = 0, rabbit_config: dict = None, queue: str = None, priority_queue: Union[str, bool] = None, parent_logger: logging.Logger = None)

Bases: RobustAMQPConnection

TaskQueueReader consumes messages from two RabbitMQ queues (normal and priority one for given worker) and passes them to the given callback function.

Tasks from the priority queue are passed before the normal ones.

Each received message must be acknowledged by calling .ack(msg_tag).

Parameters:

Name Type Description Default
callback Callable

Function called when a message is received, prototype: func(tag, Task)

required
parse_task Callable[[str], Task]

Function called to parse message body into a task, prototype: func(body) -> Task

required
app_name str

DP3 application name (used as prefix for RMQ queues and exchanges)

required
worker_index int

index of this worker (filled into DEFAULT_QUEUE string using .format() method)

0
rabbit_config dict

RabbitMQ connection parameters, dict with following keys (all optional): host, port, virtual_host, username, password

None
queue str

Name of RabbitMQ queue to read from (default: "<app-name>-worker-<index>")

None
priority_queue Union[str, bool]

Name of RabbitMQ queue to read from (priority messages) or False to disable. (default: "<app-name>-worker-<index>-pri")

None
parent_logger Logger

Logger to inherit prefix from.

None
Source code in dp3/task_processing/task_queue.py
def __init__(
    self,
    callback: Callable,
    parse_task: Callable[[str], Task],
    app_name: str,
    worker_index: int = 0,
    rabbit_config: dict = None,
    queue: str = None,
    priority_queue: Union[str, bool] = None,
    parent_logger: logging.Logger = None,
) -> None:
    rabbit_config = {} if rabbit_config is None else rabbit_config
    assert callable(callback), "callback must be callable object"
    assert (
        isinstance(worker_index, int) and worker_index >= 0
    ), "worker_index must be positive number"
    assert isinstance(queue, str) or queue is None, "queue must be string"
    assert (
        isinstance(priority_queue, str) or priority_queue is None or priority_queue is False
    ), "priority_queue must be string or False to disable"

    super().__init__(rabbit_config)

    if parent_logger is not None:
        self.log = parent_logger.getChild("TaskQueueReader")
    else:
        self.log = logging.getLogger("TaskQueueReader")

    self.callback = callback
    self.parse_task = parse_task

    if queue is None:
        queue = DEFAULT_QUEUE.format(app_name, worker_index)
    if priority_queue is None:
        priority_queue = DEFAULT_PRIORITY_QUEUE.format(app_name, worker_index)
    elif priority_queue is False:
        priority_queue = None
    self.queue_name = queue
    self.priority_queue_name = priority_queue
    self.worker_index = worker_index

    self.running = False

    self._consuming_thread = None
    self._processing_thread = None

    # Receive messages into 2 temporary queues
    # (max length should be equal to prefetch_count set in RabbitMQReader)
    self.cache = collections.deque()
    self.cache_pri = collections.deque()
    self.cache_full = threading.Event()  # signalize there's something in the cache

start

start() -> None

Start receiving tasks.

Source code in dp3/task_processing/task_queue.py
def start(self) -> None:
    """Start receiving tasks."""
    if self.running:
        raise RuntimeError("Already running")

    if not self.connection:
        self.connect()

    self.log.info("Starting TaskQueueReader")

    # Start thread for message consuming from server
    self.running = True
    self._consuming_thread = threading.Thread(target=self._consuming_thread_func)
    thread_n = self._consuming_thread.name.split("-")[-1]
    self._consuming_thread.name = f"Consumer-{self.worker_index}-{thread_n}"
    self._consuming_thread.start()

    # Start thread for message processing and passing to user's callback
    self._processing_thread = threading.Thread(target=self._msg_processing_thread_func)
    thread_n = self._processing_thread.name.split("-")[-1]
    self._processing_thread.name = f"Processor-{self.worker_index}-{thread_n}"
    self._processing_thread.start()

stop

stop() -> None

Stop receiving tasks.

Source code in dp3/task_processing/task_queue.py
def stop(self) -> None:
    """Stop receiving tasks."""
    if not self.running:
        raise RuntimeError("Not running")

    self.running = False
    self._stop_consuming_thread()
    self._stop_processing_thread()
    self.log.info("TaskQueueReader stopped")

reconnect

reconnect() -> None

Clear local message cache and reconnect to RabbitMQ server.

Source code in dp3/task_processing/task_queue.py
def reconnect(self) -> None:
    """Clear local message cache and reconnect to RabbitMQ server."""
    self.cache.clear()
    self.cache_pri.clear()

    self.connect()

check

check() -> bool

Check that needed queues are declared, return True or raise RuntimeError.

If needed queues are not declared, reconnect and try again. (max 5 times)

Source code in dp3/task_processing/task_queue.py
def check(self) -> bool:
    """
    Check that needed queues are declared, return True or raise RuntimeError.

    If needed queues are not declared, reconnect and try again. (max 5 times)
    """

    for attempt, sleep_time in enumerate(RECONNECT_DELAYS):
        if self.check_queue_existence(self.queue_name) and self.check_queue_existence(
            self.priority_queue_name
        ):
            return True
        self.log.warning(
            "RabbitMQ queue configuration doesn't match (attempt %d of %d, retrying in %ds)",
            attempt + 1,
            len(RECONNECT_DELAYS),
            sleep_time,
        )
        time.sleep(sleep_time)
        self.disconnect()
        self.connect()
    if not self.check_queue_existence(self.queue_name):
        raise QueueNotDeclared(self.queue_name)
    if not self.check_queue_existence(self.priority_queue_name):
        raise QueueNotDeclared(self.priority_queue_name)
    return True

ack

ack(msg_tag: tuple[int, int]) -> bool

Acknowledge processing of the message/task

Will reconnect by itself on channel error. Args: msg_tag: Message tag received as the first param of the callback function. Returns: Whether the message was acknowledged successfully and can be processed further.

Source code in dp3/task_processing/task_queue.py
def ack(self, msg_tag: tuple[int, int]) -> bool:
    """Acknowledge processing of the message/task

    Will reconnect by itself on channel error.
    Args:
        msg_tag: Message tag received as the first param of the callback function.
    Returns:
        Whether the message was acknowledged successfully and can be processed further.
    """
    conn_id, msg_tag = msg_tag
    if conn_id != self._connection_id:
        return False
    try:
        self.channel.basic.ack(delivery_tag=msg_tag)
    except amqpstorm.AMQPChannelError as why:
        self.log.error("Channel error while acknowledging message: %s", why)
        self.reconnect()
        return False
    return True

watchdog

watchdog()

Check whether both threads are running and perform a reset if not.

Register to be called periodically by scheduler.

Source code in dp3/task_processing/task_queue.py
def watchdog(self):
    """
    Check whether both threads are running and perform a reset if not.

    Register to be called periodically by scheduler.
    """
    proc = self._processing_thread.is_alive()
    cons = self._consuming_thread.is_alive()

    if not proc or not cons:
        self.log.error(
            "Dead threads detected, processing=%s, consuming=%s, restarting TaskQueueReader.",
            "alive" if proc else "dead",
            "alive" if cons else "dead",
        )
        self._stop_consuming_thread()
        self._stop_processing_thread()

        self.channel.close()
        self.channel = None
        self.cache.clear()
        self.cache_pri.clear()

        self.connect()
        self.start()