celeryadapter
This module provides an adapter to the Celery Distributed Task Queue.
build_csv_queue_info(query_return, date)
Construct a dictionary containing queue information and column labels for writing to a CSV file.
This function processes the output from the query_queues
function and organizes the data into a format suitable for CSV export. It includes
a timestamp to indicate when the status was recorded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
date
|
str
|
A timestamp indicating when the queue status was recorded. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, List]
|
A dictionary where keys are column labels and values are lists containing the corresponding queue information, formatted for CSV output. |
Source code in merlin/study/celeryadapter.py
build_json_queue_info(query_return, date)
Construct a dictionary containing queue information for JSON export.
This function processes the output from the query_queues
function and organizes the data into a structured format suitable for JSON
serialization. It includes a timestamp to indicate when the queue status was
recorded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
date
|
str
|
A timestamp indicating when the queue status was recorded. |
required |
Returns:
| Type | Description |
|---|---|
Dict
|
A dictionary structured for JSON output, where the keys are timestamps and the values are dictionaries containing queue names and their corresponding statistics (tasks and consumers). |
Source code in merlin/study/celeryadapter.py
build_set_of_queues(spec, steps, specific_queues, verbose=True, app=None)
Construct a set of queues to query based on the provided parameters.
This function builds a set of queues by querying a MerlinSpec
object for queues associated with specified steps and/or filtering for specific queue names.
If no spec or specific queues are provided, it defaults to querying active queues from the Celery
application.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
MerlinSpec
|
A |
required |
steps
|
List[str]
|
A list of step names to query. If empty, all steps are considered. |
required |
specific_queues
|
List[str]
|
A list of specific queue names to filter. Can be None. |
required |
verbose
|
bool
|
If True, log statements will be output. Defaults to True. |
True
|
app
|
Celery
|
A Celery application instance. If None, it will be imported. |
None
|
Returns:
| Type | Description |
|---|---|
Set[str]
|
A set of queue names to investigate based on the provided parameters. |
Source code in merlin/study/celeryadapter.py
celerize_queues(queues, config=None)
Prepend a queue tag to each queue in the provided list to conform to Celery's queue naming requirements.
This function modifies the input list of queues by adding a specified queue tag from the configuration. If no configuration is provided, it defaults to using the global configuration settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
List[str]
|
A list of queue names that need the queue tag prepended. |
required |
config
|
SimpleNamespace
|
A SimpleNamespace of configuration settings. If not provided, the function will use the default configuration. |
None
|
Source code in merlin/study/celeryadapter.py
check_celery_workers_processing(queues_in_spec, app)
Check if any Celery workers are currently processing tasks from specified queues.
This function queries the Celery application to determine if there are any active tasks being processed by workers for the given list of queues. It returns a boolean indicating whether any tasks are currently active.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues_in_spec
|
List[str]
|
A list of queue names to check for active tasks. |
required |
app
|
Celery
|
The Celery application instance used for querying. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if any workers are processing tasks in the specified queues; False otherwise. |
Source code in merlin/study/celeryadapter.py
dump_celery_queue_info(query_return, dump_file)
Format and dump Celery queue information to a specified file.
This function processes the output from the query_queues function, formats
the data according to the file type (CSV or JSON), and adds a timestamp
to the information before writing it to the specified file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
dump_file
|
str
|
The filepath of the file where the queue information will be written. The file extension determines the format (CSV or JSON). |
required |
Source code in merlin/study/celeryadapter.py
examine_and_log_machines(worker_val, yenv)
Determine if a worker should be skipped based on machine availability and log any errors.
This function checks the specified machines for a worker and determines
whether the worker can be started. If the machines are not available,
it logs an error message regarding the output path for the Celery worker.
If the environment variables (yenv) are not provided or do not specify
an output path, a warning is logged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
worker_val
|
Dict
|
A dictionary containing worker configuration, including the list of machines associated with the worker. |
required |
yenv
|
Dict[str, str]
|
A dictionary of environment variables that may include the output path for logging. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Returns |
Source code in merlin/study/celeryadapter.py
get_active_celery_queues(app)
Retrieve all active queues and their associated workers for a Celery application.
This function queries the application's server to obtain a comprehensive view of active queues and the workers connected to them. It returns a dictionary where each key is a queue name and the value is a list of workers attached to that queue. Additionally, it provides a list of all active workers in the application.
Note
Unlike get_running_queues,
this function goes through the application's server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
Celery
|
The Celery application instance. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[Dict[str, List[str]], List[str]]
|
A tuple containing:
|
Example
Source code in merlin/study/celeryadapter.py
get_active_workers(app)
Retrieve a mapping of active workers to their associated queues for a Celery application.
This function serves as the inverse of
get_active_celery_queues(). It constructs
a dictionary where each key is a worker's name and the corresponding value is a
list of queues that the worker is connected to. This allows for easy identification
of which queues are being handled by each worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
Celery
|
The Celery application instance. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
A dictionary mapping active worker names to lists of queue names they are attached to. If no active workers are found, an empty dictionary is returned. |
Source code in merlin/study/celeryadapter.py
get_celery_cmd(queue_names, worker_args='', just_return_command=False)
Construct the command to launch Celery workers for the specified queues.
This function generates a command string that can be used to start Celery workers associated with the provided queue names. It allows for optional worker arguments to be included and can return the command without executing it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_names
|
str
|
A comma-separated string of the queue name(s) to which the worker will be associated. |
required |
worker_args
|
str
|
Additional command-line arguments for the Celery worker. |
''
|
just_return_command
|
bool
|
If True, the function will return the constructed command without executing it. |
False
|
Returns:
| Type | Description |
|---|---|
str
|
The constructed command string for launching the Celery worker. If
|
Source code in merlin/study/celeryadapter.py
get_running_queues(celery_app_name, test_mode=False)
Check for running Celery workers and retrieve their associated queues.
This function inspects currently running processes to identify active
Celery workers. It extracts queue names from the -Q tag in the
command line of the worker processes. The returned list contains
only unique Celery queue names. This function must be executed
on the allocation where the workers are running.
Note
Unlike get_active_celery_queues,
this function does not go through the application's server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
celery_app_name
|
str
|
The name of the Celery app (typically "merlin" unless in test mode). |
required |
test_mode
|
bool
|
If True, the function runs in test mode. |
False
|
Returns:
| Type | Description |
|---|---|
List[str]
|
A unique list of Celery queue names with workers attached to them. |
Source code in merlin/study/celeryadapter.py
get_workers_from_app()
Retrieve a list of all workers connected to the Celery application.
This function uses the Celery control interface to inspect the current state of the application and returns a list of workers that are currently connected. If no workers are found, an empty list is returned.
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of worker names that are currently connected to the Celery application. If no workers are connected, an empty list is returned. |
Source code in merlin/study/celeryadapter.py
launch_celery_worker(worker_cmd, worker_list, kwargs)
Launch a Celery worker using the specified command and parameters.
This function executes the provided Celery command to start a worker as a subprocess. It appends the command to the given list of worker commands for tracking purposes. If the worker fails to start, an error is logged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
worker_cmd
|
str
|
The command string used to launch the Celery worker. |
required |
worker_list
|
List[str]
|
A list that will be updated to include the launched worker command for tracking active workers. |
required |
kwargs
|
Dict
|
A dictionary of additional keyword arguments to pass to
|
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If the worker fails to start, an error is logged, and the exception is re-raised. |
Side Effects
- Launches a Celery worker process in the background.
- Modifies the
worker_listby appending the launched worker command.
Source code in merlin/study/celeryadapter.py
purge_celery_tasks(queues, force)
Purge Celery tasks from the specified queues.
This function constructs and executes a command to purge tasks from the
specified Celery queues. If the force parameter is set to True, the
purge operation will be executed without prompting for confirmation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
str
|
A comma-separated string of the queue name(s) from which tasks should be purged. |
required |
force
|
bool
|
If True, the purge operation will be executed without asking for user confirmation. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The return code from the subprocess execution. A return code of 0 indicates success, while any non-zero value indicates an error occurred during the purge operation. |
Source code in merlin/study/celeryadapter.py
query_celery_queues(queues, app=None, config=None)
Retrieve information about the number of jobs and consumers for specified Celery queues.
This function constructs a dictionary containing details about the number of jobs and consumers associated with each queue provided in the input list. It connects to the Celery application to gather this information, handling both Redis and RabbitMQ brokers.
Notes
- If the specified queue does not exist or has no jobs, it will be handled gracefully.
- For Redis brokers, the function counts consumers by inspecting active queues since Redis does not track consumers like RabbitMQ does.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
List[str]
|
A list of queue names for which to gather information. |
required |
app
|
Celery
|
The Celery application instance. Defaults to None, which triggers an import for testing purposes. |
None
|
config
|
Config
|
A configuration object containing broker details. Defaults to None, which also triggers an import for testing. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, int]]
|
A dictionary where each key is a queue name and the value is another dictionary containing:
|
Source code in merlin/study/celeryadapter.py
query_celery_workers(spec_worker_names, queues, workers_regex)
Query and filter existing Celery workers based on specified criteria, and print a table of the workers along with their associated queues.
This function retrieves the list of active Celery workers and filters them according to the provided specifications, including worker names from a spec file, specific queues, and regular expressions for worker names. It then constructs and displays a table of the matching workers and their associated queues.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec_worker_names
|
List[str]
|
A list of worker names defined in a spec file to filter the workers. |
required |
queues
|
List[str]
|
A list of queues to filter the workers by. |
required |
workers_regex
|
List[str]
|
A list of regular expressions to filter the worker names. |
required |
Source code in merlin/study/celeryadapter.py
run_celery(study, run_mode=None)
Run the given MerlinStudy object with optional
Celery configuration.
This function executes the provided MerlinStudy
object. If the run_mode is set to "local", it configures Celery to run in
local mode (without utilizing workers). Otherwise, it connects to the Celery
server to queue tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
study
|
MerlinStudy
|
The study object to be executed. |
required |
run_mode
|
str
|
The mode in which to run the study. If set to "local", Celery runs locally. |
None
|
Source code in merlin/study/celeryadapter.py
start_celery_workers(spec, steps, celery_args, disable_logs, just_return_command)
Start Celery workers based on the provided specifications and steps.
This function initializes and starts Celery workers for the specified steps
in the given MerlinSpec. It constructs
the necessary command-line arguments and handles the launching of subprocesses
for each worker. If the just_return_command flag is set to True, it will
return the command(s) to start the workers without actually launching them.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
MerlinSpec
|
A |
required |
steps
|
List[str]
|
A list of steps for which to start workers. |
required |
celery_args
|
str
|
A string of additional arguments to pass to the Celery workers. |
required |
disable_logs
|
bool
|
A flag to disable logging for the Celery workers. |
required |
just_return_command
|
bool
|
If |
required |
Returns:
| Type | Description |
|---|---|
str
|
A string containing all the worker launch commands. |
Side Effects
- Starts subprocesses for each worker that is launched, so long as
just_return_commandis not True.
Example
Below is an example configuration for Merlin workers:
Source code in merlin/study/celeryadapter.py
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 | |
stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None)
Send a stop command to Celery workers.
This function sends a shutdown command to Celery workers associated with specified queues. By default, it stops all connected workers, but it can be configured to target specific workers based on queue names or regular expression patterns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues
|
List[str]
|
A list of queue names to which the stop command will be sent. If None, all connected workers across all queues will be stopped. |
None
|
spec_worker_names
|
List[str]
|
A list of specific worker names to stop, in addition
to those matching the |
None
|
worker_regex
|
List[str]
|
A regular expression string used to match worker names. If None, no regex filtering will be applied. |
None
|
Side Effects
- Broadcasts a shutdown signal to Celery workers
Example
Source code in merlin/study/celeryadapter.py
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 | |
verify_args(spec, worker_args, worker_name, overlap, disable_logs=False)
Validate and enhance the arguments passed to a Celery worker for completeness.
This function checks the provided worker arguments to ensure that they include
recommended settings for running parallel tasks. It adds default values for
concurrency, prefetch multiplier, and logging level if they are not specified.
Additionally, it generates a unique worker name based on the current time if
the -n argument is not provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
MerlinSpec
|
A |
required |
worker_args
|
str
|
A string of arguments passed to the worker that may need validation. |
required |
worker_name
|
str
|
The name of the worker, used for generating a unique worker identifier. |
required |
overlap
|
bool
|
A flag indicating whether multiple workers can overlap in their queue processing. |
required |
disable_logs
|
bool
|
A flag to disable logging configuration for the worker. |
False
|
Returns:
| Type | Description |
|---|---|
str
|
The validated and potentially modified worker arguments string. |