celeryadapter
This module provides an adapter to the Celery Distributed Task Queue.
build_csv_queue_info(query_return, date)
Construct a dictionary containing queue information and column labels for writing to a CSV file.
This function processes the output from the query_queues
function and organizes the data into a format suitable for CSV export. It includes
a timestamp to indicate when the status was recorded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
date
|
str
|
A timestamp indicating when the queue status was recorded. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, List]
|
A dictionary where keys are column labels and values are lists containing the corresponding queue information, formatted for CSV output. |
Source code in merlin/study/celeryadapter.py
build_json_queue_info(query_return, date)
Construct a dictionary containing queue information for JSON export.
This function processes the output from the query_queues
function and organizes the data into a structured format suitable for JSON
serialization. It includes a timestamp to indicate when the queue status was
recorded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
date
|
str
|
A timestamp indicating when the queue status was recorded. |
required |
Returns:
| Type | Description |
|---|---|
Dict
|
A dictionary structured for JSON output, where the keys are timestamps and the values are dictionaries containing queue names and their corresponding statistics (tasks and consumers). |
Source code in merlin/study/celeryadapter.py
build_set_of_queues(spec, steps, specific_queues, verbose=True, app=None)
Construct a set of queues to query based on the provided parameters.
This function builds a set of queues by querying a MerlinSpec
object for queues associated with specified steps and/or filtering for specific queue names.
If no spec or specific queues are provided, it defaults to querying active queues from the Celery
application.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
MerlinSpec
|
A |
required |
steps
|
List[str]
|
A list of step names to query. If empty, all steps are considered. |
required |
specific_queues
|
List[str]
|
A list of specific queue names to filter. Can be None. |
required |
verbose
|
bool
|
If True, log statements will be output. Defaults to True. |
True
|
app
|
Celery
|
A Celery application instance. If None, it will be imported. |
None
|
Returns:
| Type | Description |
|---|---|
Set[str]
|
A set of queue names to investigate based on the provided parameters. |
Source code in merlin/study/celeryadapter.py
celerize_queues(queues, config=None)
Prepend a queue tag to each queue in the provided list to conform to Celery's queue naming requirements.
This function modifies the input list of queues by adding a specified queue tag from the configuration. If no configuration is provided, it defaults to using the global configuration settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
List[str]
|
A list of queue names that need the queue tag prepended. |
required |
config
|
SimpleNamespace
|
A SimpleNamespace of configuration settings. If not provided, the function will use the default configuration. |
None
|
Source code in merlin/study/celeryadapter.py
dump_celery_queue_info(query_return, dump_file)
Format and dump Celery queue information to a specified file.
This function processes the output from the query_queues function, formats
the data according to the file type (CSV or JSON), and adds a timestamp
to the information before writing it to the specified file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
dump_file
|
str
|
The filepath of the file where the queue information will be written. The file extension determines the format (CSV or JSON). |
required |
Source code in merlin/study/celeryadapter.py
get_active_celery_queues(app)
Retrieve all active queues and their associated workers for a Celery application.
This function queries the application's server to obtain a comprehensive view of active queues and the workers connected to them. It returns a dictionary where each key is a queue name and the value is a list of workers attached to that queue. Additionally, it provides a list of all active workers in the application.
Note
Unlike get_running_queues,
this function goes through the application's server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
Celery
|
The Celery application instance. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[Dict[str, List[str]], List[str]]
|
A tuple containing:
|
Example
Source code in merlin/study/celeryadapter.py
get_running_queues(celery_app_name, test_mode=False)
Check for running Celery workers and retrieve their associated queues.
This function inspects currently running processes to identify active
Celery workers. It extracts queue names from the -Q tag in the
command line of the worker processes. The returned list contains
only unique Celery queue names. This function must be executed
on the allocation where the workers are running.
Note
Unlike get_active_celery_queues,
this function does not go through the application's server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
celery_app_name
|
str
|
The name of the Celery app (typically "merlin" unless in test mode). |
required |
test_mode
|
bool
|
If True, the function runs in test mode. |
False
|
Returns:
| Type | Description |
|---|---|
List[str]
|
A unique list of Celery queue names with workers attached to them. |
Source code in merlin/study/celeryadapter.py
purge_celery_tasks(queues, force)
Purge Celery tasks from the specified queues.
This function constructs and executes a command to purge tasks from the
specified Celery queues. If the force parameter is set to True, the
purge operation will be executed without prompting for confirmation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
str
|
A comma-separated string of the queue name(s) from which tasks should be purged. |
required |
force
|
bool
|
If True, the purge operation will be executed without asking for user confirmation. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The return code from the subprocess execution. A return code of 0 indicates success, while any non-zero value indicates an error occurred during the purge operation. |
Source code in merlin/study/celeryadapter.py
query_celery_queues(queues, app=None, config=None)
Retrieve information about the number of jobs and consumers for specified Celery queues.
This function constructs a dictionary containing details about the number of jobs and consumers associated with each queue provided in the input list. It connects to the Celery application to gather this information, handling both Redis and RabbitMQ brokers.
Notes
- If the specified queue does not exist or has no jobs, it will be handled gracefully.
- For Redis brokers, the function counts consumers by inspecting active queues since Redis does not track consumers like RabbitMQ does.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
List[str]
|
A list of queue names for which to gather information. |
required |
app
|
Celery
|
The Celery application instance. Defaults to None, which triggers an import for testing purposes. |
None
|
config
|
Config
|
A configuration object containing broker details. Defaults to None, which also triggers an import for testing. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, int]]
|
A dictionary where each key is a queue name and the value is another dictionary containing:
|
Source code in merlin/study/celeryadapter.py
run_celery(study, run_mode=None)
Run the given MerlinStudy object with optional
Celery configuration.
This function executes the provided MerlinStudy
object. If the run_mode is set to "local", it configures Celery to run in
local mode (without utilizing workers). Otherwise, it connects to the Celery
server to queue tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
study
|
MerlinStudy
|
The study object to be executed. |
required |
run_mode
|
str
|
The mode in which to run the study. If set to "local", Celery runs locally. |
None
|