celery_monitor
This module provides the CeleryMonitor class, a concrete implementation of the
TaskServerMonitor interface for monitoring
Celery task servers. Celery is a distributed task queue system commonly used for executing
asynchronous tasks and managing worker nodes.
The CeleryMonitor class combines task and worker monitoring functionality specific to Celery.
It provides methods to:
- Wait for workers to start.
- Check for tasks in the queues.
- Monitor worker activity.
- Run health checks to ensure workers are alive and functioning.
CeleryMonitor
Bases: TaskServerMonitor
Implementation of TaskServerMonitor
for Celery task servers. This class provides methods to monitor Celery workers, tasks,
and workflows.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_handler |
CeleryWorkerHandler
|
The worker handler for managing Celery workers. |
Methods:
| Name | Description |
|---|---|
wait_for_workers |
Wait for Celery workers to start up. |
check_workers_processing |
Check if any Celery workers are still processing tasks. |
_restart_workers |
Restart a list of (dead) Celery workers. |
_get_dead_workers |
Get a list of dead Celery workers. |
run_worker_health_check |
Check the health of Celery workers and restart any that are dead. |
check_tasks |
Checks the status of tasks in the Celery queues for a given workflow run. |
Source code in merlin/monitor/celery_monitor.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | |
__init__(merlin_db=None, app=None)
Constructor for CeleryMonitor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
merlin_db
|
MerlinDatabase
|
The MerlinDatabase instance or None. |
None
|
app
|
Celery
|
The Celery application instance or None. |
None
|
Source code in merlin/monitor/celery_monitor.py
check_tasks(run)
Check the status of tasks in Celery queues for the given workflow run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run
|
RunEntity
|
A |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if tasks are active in the workflow (i.e., jobs are present in the queues), False otherwise. |
Source code in merlin/monitor/celery_monitor.py
check_workers_processing(queues)
Check if any Celery workers are still processing tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
List[str]
|
A list of queue names to check for active tasks. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if workers are processing tasks in the specified queues, False otherwise. |
Source code in merlin/monitor/celery_monitor.py
run_worker_health_check(workers)
Check the health of Celery workers and restart any that are dead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workers
|
List[str]
|
A list of worker names or IDs to check for health. |
required |
Raises:
| Type | Description |
|---|---|
WorkerRestartException
|
If a worker fails to restart. |
Source code in merlin/monitor/celery_monitor.py
wait_for_workers(workers, sleep)
Wait for Celery workers to start up.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workers
|
List[str]
|
A list of worker names or IDs to wait for. |
required |
sleep
|
int
|
The interval (in seconds) between checks for worker availability. |
required |
Raises:
| Type | Description |
|---|---|
NoWorkersException
|
When workers don't start in ( |