worker_handler
Defines an abstract base class for worker handlers in the Merlin workflow framework.
Worker handlers are responsible for launching, stopping, and querying the status of task server workers (e.g., Celery workers). This interface allows support for different task servers to be plugged in with consistent behavior.
MerlinWorkerHandler
Bases: ABC
Abstract base class for launching and managing Merlin worker processes.
Subclasses must implement the methods to launch, stop, and query workers using a particular task server (e.g., Celery, Kafka, etc.).
Attributes:
| Name | Type | Description |
|---|---|---|
merlin_db |
MerlinDatabase
|
The database instance used for worker management. |
Methods:
| Name | Description |
|---|---|
start_workers |
Launch a list of MerlinWorker instances with optional configuration. |
stop_workers |
Stop running worker processes managed by this handler. |
query_workers |
Query the status of running workers and return summary information. |
Source code in merlin/workers/handlers/worker_handler.py
__init__(merlin_db=None)
Initialize the worker handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
merlin_db
|
MerlinDatabase
|
The database instance used for worker management or None. |
None
|
query_workers(formatter, queues=None, workers=None, local_db=False)
abstractmethod
Query the status of all currently running workers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
formatter
|
str
|
The worker formatter to use (rich or json). |
required |
queues
|
List[str]
|
List of queue names to filter by (optional). |
None
|
workers
|
List[str]
|
List of worker names to filter by (optional). |
None
|
local_db
|
bool
|
Whether to use the local database for querying (optional). |
False
|
Source code in merlin/workers/handlers/worker_handler.py
start_workers(workers, **kwargs)
abstractmethod
Launch a list of worker instances.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workers
|
List[MerlinWorker]
|
The list of workers to launch. |
required |
**kwargs
|
Optional keyword arguments passed to subclass-specific logic. |
{}
|
Source code in merlin/workers/handlers/worker_handler.py
stop_workers(queues=None, workers=None)
abstractmethod
Stop worker processes, optionally filtered by queue or worker name.
This method terminates active worker processes based on the provided filters. The behavior varies by implementation:
- If both
queuesandworkersare None, all active workers are stopped. - If
queuesis provided, only workers attached to those queues are stopped. - If
workersis provided, only workers matching those names/patterns are stopped. - If both are provided, workers must match both criteria (intersection).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
List[str]
|
Optional list of queue names to filter workers by. Queue names will be normalized with the appropriate task server prefix if needed. |
None
|
workers
|
List[str]
|
Optional list of worker names or patterns to match. For Celery, these can be logical worker names from the spec or regex patterns matching physical worker names (e.g., "celery@worker1.*"). |
None
|
Example
handler = CeleryWorkerHandler()
# Stop all workers
handler.stop_workers()
# Stop workers on specific queues
handler.stop_workers(queues=['hello_queue', 'world_queue'])
# Stop specific workers by name
handler.stop_workers(workers=['worker1', 'worker2'])
# Stop workers matching both criteria
handler.stop_workers(queues=['hello_queue'], workers=['worker1.*'])