celery_worker
Implements a Celery-based MerlinWorker.
This module defines the CeleryWorker class, which extends the abstract
MerlinWorker base class to implement worker launching and management using
Celery. Celery workers are responsible for processing tasks from specified queues
and can be launched either locally or through a batch system.
CeleryWorker
Bases: MerlinWorker
Concrete implementation of a single Celery-based Merlin worker.
This class provides logic for validating configuration, constructing launch commands, checking launch eligibility, and launching Celery workers that process jobs from specific task queues.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
The name of the worker. |
config |
dict
|
Configuration settings for the worker. |
env |
dict
|
Environment variables used by the worker process. |
args |
str
|
Additional CLI arguments passed to Celery. |
queues |
List[str]
|
Queues the worker listens to. |
batch |
dict
|
Optional batch submission settings. |
machines |
List[str]
|
List of hostnames the worker is allowed to run on. |
overlap |
bool
|
Whether this worker can overlap queues with others. |
Methods:
| Name | Description |
|---|---|
_verify_args |
Validate and adjust CLI args based on worker setup. |
get_launch_command |
Construct the Celery launch command. |
should_launch |
Determine whether the worker should be launched based on system state. |
start |
Launch the worker using subprocess. |
get_metadata |
Return identifying metadata about the worker. |
Source code in merlin/workers/celery_worker.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | |
__init__(name, config, env=None, overlap=False)
Constructor for Celery workers.
Sets up attributes used throughout this worker object and saves this worker to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the worker. |
required |
config
|
Dict
|
A dictionary containing optional configuration settings for this worker including:
|
required |
env
|
Dict[str, str]
|
A dictionary of environment variables set by the user. |
None
|
overlap
|
bool
|
If True multiple workers can pull tasks from overlapping queues. |
False
|
Source code in merlin/workers/celery_worker.py
get_launch_command(override_args='', disable_logs=False)
Construct the shell command to launch this Celery worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
override_args
|
str
|
If provided, these arguments will replace the default |
''
|
disable_logs
|
bool
|
If True, logging level will not be added to the command. |
False
|
Returns:
| Type | Description |
|---|---|
str
|
A shell command string suitable for subprocess execution. |
Source code in merlin/workers/celery_worker.py
get_metadata()
Return metadata about this worker instance.
Returns:
| Type | Description |
|---|---|
Dict
|
A dictionary containing key details about this worker. |
Source code in merlin/workers/celery_worker.py
should_launch()
Determine whether this worker should be launched.
Performs checks on allowed machines and queue overlap (if applicable).
Returns:
| Type | Description |
|---|---|
bool
|
True if the worker should be launched, False otherwise. |
Source code in merlin/workers/celery_worker.py
start(override_args='', disable_logs=False)
Launch the worker as a subprocess using the constructed launch command.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
override_args
|
str
|
Optional CLI arguments to override the default worker args. |
''
|
disable_logs
|
bool
|
If True, suppresses automatic logging level injection. |
False
|
Raises:
| Type | Description |
|---|---|
MerlinWorkerLaunchError
|
If the worker fails to launch. |
Source code in merlin/workers/celery_worker.py
stop()
Stop the worker process.
If the worker has a known PID, sends a SIGTERM to terminate it. Otherwise, logs a warning that the worker cannot be stopped.