def main(app_name: str, config_dir: str, process_index: int, verbose: bool) -> None:
"""
Run worker process.
Args:
app_name: Name of the application to distinct it from other DP3-based apps.
For example, it's used as a prefix for RabbitMQ queue names.
config_dir: Path to directory containing configuration files.
process_index: Index of this worker process. For each application
there must be N processes running simultaneously, each started with a
unique index (from 0 to N-1). N is read from configuration
('worker_processes' in 'processing_core.yml').
verbose: More verbose output (set log level to DEBUG).
"""
##############################################
# Initialize logging mechanism
threading.current_thread().name = f"MainThread-{process_index}"
LOGFORMAT = "%(asctime)-15s,%(threadName)s,%(name)s,[%(levelname)s] %(message)s"
LOGDATEFORMAT = "%Y-%m-%dT%H:%M:%S"
logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO, format=LOGFORMAT, datefmt=LOGDATEFORMAT
)
log = logging.getLogger()
# Disable INFO and DEBUG messages from some libraries
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("amqpstorm").setLevel(logging.WARNING)
##############################################
# Load configuration
config_base_path = os.path.abspath(config_dir)
log.debug(f"Loading config directory {config_base_path}")
# Whole configuration should be loaded
config = read_config_dir(config_base_path, recursive=True)
try:
model_spec = ModelSpec(config.get("db_entities"))
except ValidationError as e:
log.fatal("Invalid model specification: %s", e)
sys.exit(2)
# Print whole attribute specification
log.debug(model_spec)
num_processes = config.get("processing_core.worker_processes")
platform_config = PlatformConfig(
app_name=app_name,
config_base_path=config_base_path,
config=config,
model_spec=model_spec,
process_index=process_index,
num_processes=num_processes,
)
##############################################
# Create instances of core components
log.info(f"***** {app_name} worker {process_index} of {num_processes} start *****")
# EventCountLogger
ecl = EventCountLogger(
platform_config.config.get("event_logging.groups"),
platform_config.config.get("event_logging.redis"),
)
elog = ecl.get_group("te") or DummyEventGroup()
elog_by_src = ecl.get_group("tasks_by_src") or DummyEventGroup()
db = EntityDatabase(config, model_spec, num_processes, process_index, elog)
if process_index == 0:
db.update_schema()
else:
db.await_updated_schema()
global_scheduler = scheduler.Scheduler()
task_executor = TaskExecutor(db, platform_config, elog, elog_by_src)
snap_shooter = SnapShooter(
db,
TaskQueueWriter(app_name, num_processes, config.get("processing_core.msg_broker")),
platform_config,
global_scheduler,
elog,
)
updater = Updater(
db,
TaskQueueWriter(app_name, num_processes, config.get("processing_core.msg_broker")),
platform_config,
global_scheduler,
elog,
)
registrar = CallbackRegistrar(global_scheduler, task_executor, snap_shooter, updater)
LinkManager(db, platform_config, registrar)
HistoryManager(db, platform_config, registrar)
Telemetry(db, platform_config, registrar)
GarbageCollector(db, platform_config, registrar)
# Lock used to control when the program stops.
daemon_stop_lock = threading.Lock()
daemon_stop_lock.acquire()
# Signal handler releasing the lock on SIGINT or SIGTERM
def sigint_handler(signum, frame):
log.debug(
"Signal {} received, stopping worker".format(
{signal.SIGINT: "SIGINT", signal.SIGTERM: "SIGTERM"}.get(signum, signum)
)
)
daemon_stop_lock.release()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGABRT, sigint_handler)
task_distributor = TaskDistributor(task_executor, platform_config, registrar, daemon_stop_lock)
control = Control(platform_config)
control.set_action_handler(ControlAction.make_snapshots, snap_shooter.make_snapshots)
control.set_action_handler(
ControlAction.refresh_on_entity_creation,
partial(refresh_on_entity_creation, task_distributor, task_executor),
)
modules = {}
control.set_action_handler(
ControlAction.refresh_module_config,
partial(reload_module_config, log, platform_config, modules),
)
global_scheduler.register(control.control_queue.watchdog, second="15,45")
##############################################
# Load all plug-in modules
module_dir = config.get("processing_core.modules_dir")
module_dir = os.path.abspath(os.path.join(config_base_path, module_dir))
loaded_modules = load_modules(
module_dir,
config.get("processing_core.enabled_modules"),
log,
registrar,
platform_config,
)
modules.update(loaded_modules)
################################################
# Initialization completed, run ...
# Run update manager thread
log.info("***** Initialization completed, starting all modules *****")
# Run modules that have their own threads (TODO: there are no such modules, should be kept?)
# (if they don't, the start() should do nothing)
for module in loaded_modules.values():
module.start()
core_modules = [
updater, # Updater will throw exceptions when misconfigured (best start first)
task_distributor, # TaskDistributor (which starts TaskExecutors in several worker threads)
db,
snap_shooter,
control,
global_scheduler,
]
running_core_modules = []
try:
for module in core_modules:
module.start()
running_core_modules.append(module)
# Wait until someone wants to stop the program by releasing this Lock.
# It may be a user by pressing Ctrl-C or some program module.
# (try to acquire the lock again,
# effectively waiting until it's released by signal handler or another thread)
if os.name == "nt":
# This is needed on Windows in order to catch Ctrl-C, which doesn't break the waiting.
while not daemon_stop_lock.acquire(timeout=1):
pass
else:
daemon_stop_lock.acquire()
except Exception as e:
log.exception(e)
################################################
# Finalization & cleanup
# Set signal handlers back to their defaults,
# so the second Ctrl-C closes the program immediately
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGABRT, signal.SIG_DFL)
log.info("Stopping running components ...")
for module in reversed(running_core_modules):
module.stop()
for module in loaded_modules.values():
module.stop()
log.info("***** Finished, main thread exiting. *****")
logging.shutdown()