router
This module routes actions from the Merlin CLI to the appropriate tasking logic.
This module is intended to help keep the task management layer (i.e., Celery) decoupled from the logic the tasks are running.
check_merlin_status(args, spec)
Function to check Merlin workers and queues to keep the allocation alive.
This function monitors the status of workers and jobs within the specified task server and the provided Merlin specification. It checks for active tasks and workers, ensuring that the allocation remains valid.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args
|
Namespace
|
Parsed command-line interface arguments, including task server specifications and sleep duration. |
required |
spec
|
MerlinSpec
|
The parsed spec.yaml as a
|
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if there are still tasks being processed, False otherwise. |
Source code in merlin/router.py
check_workers_processing(queues_in_spec, task_server)
Check if any workers are still processing tasks by querying the task server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queues_in_spec
|
List[str]
|
A list of queue names to check for active tasks. |
required |
task_server
|
str
|
The task server from which to query the processing status. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if workers are still processing tasks, False otherwise. |
Source code in merlin/router.py
dump_queue_info(task_server, query_return, dump_file)
Formats and dumps queue information for the specified task server.
This function prepares the queue data returned from the queue
query and formats it in a way that the Dumper
class can process. It also adds a timestamp to the information before
dumping it to the specified file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server from which to query queues. |
required |
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
dump_file
|
str
|
The filepath where the queue information will be dumped. |
required |
Source code in merlin/router.py
get_active_queues(task_server)
Retrieve a dictionary of active queues and their associated workers for the specified task server.
This function queries the given task server for its active queues and gathers information about which workers are currently monitoring these queues. It supports the 'celery' task server and returns a structured dictionary containing the queue names as keys and lists of worker names as values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server to query for active queues. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
A dictionary where:
|
Source code in merlin/router.py
get_workers(task_server)
This function queries the designated task server to obtain a list of all workers that are currently connected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server to query. |
required |
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of all connected workers. If the task server is not supported, an empty list is returned. |
Source code in merlin/router.py
launch_workers(spec, steps, worker_args='', disable_logs=False, just_return_command=False)
Launches workers for the specified study based on the provided specification and steps.
This function checks if Celery is configured as the task server and initiates the specified workers accordingly. It provides options for additional worker arguments, logging control, and command-only execution without launching the workers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
MerlinSpec
|
Specification details necessary for launching the workers. |
required |
steps
|
List[str]
|
The specific steps in the specification that the workers will be associated with. |
required |
worker_args
|
str
|
Additional arguments to be passed to the workers. Defaults to an empty string. |
''
|
disable_logs
|
bool
|
Flag to disable logging during worker execution. Defaults to False. |
False
|
just_return_command
|
bool
|
If True, the function will not execute the command but will return it instead. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
str
|
A string of the worker launch command(s). |
Source code in merlin/router.py
purge_tasks(task_server, spec, force, steps)
Purges all tasks from the specified task server.
This function removes tasks from the designated queues associated
with the specified steps. It operates without confirmation if
the force parameter is set to True. The function logs the
steps being purged and checks if Celery is the configured task
server before proceeding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server from which to purge tasks. |
required |
spec
|
MerlinSpec
|
A
|
required |
force
|
bool
|
If True, purge the tasks without any confirmation prompt. |
required |
steps
|
List[str]
|
A space-separated list of step names that define which queues to purge. If not specified, defaults to purging all steps. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The result of the purge operation; -1 if the task server is not supported (i.e., not Celery). |
Source code in merlin/router.py
query_queues(task_server, spec, steps, specific_queues, verbose=True)
Queries the status of queues from the specified task server.
This function checks the status of queues tied to a given task server, building a list of queues based on the provided steps and specific queue names. It supports querying Celery task servers and returns the results in a structured format. Logging behavior can be controlled with the verbose parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server from which to query queues. |
required |
spec
|
MerlinSpec
|
A
|
required |
steps
|
List[str]
|
A space-separated list of step names to query. Default is to query all available steps if this is empty. |
required |
specific_queues
|
List[str]
|
A list of specific queue names to query. Can be empty or None to query all relevant queues. |
required |
verbose
|
bool
|
If True, enables logging of query operations. Defaults to True. |
True
|
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, int]]
|
A dictionary where the keys are queue names and the values are dictionaries containing the number of workers (consumers) and tasks (jobs) attached to each queue. |
Source code in merlin/router.py
query_workers(task_server, spec_worker_names, queues, workers_regex)
Retrieves information from workers associated with the specified task server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server to query. |
required |
spec_worker_names
|
List[str]
|
A list of specific worker names to query. |
required |
queues
|
List[str]
|
A list of queues to search for associated workers. |
required |
workers_regex
|
str
|
A regex pattern used to filter worker names during the query. |
required |
Source code in merlin/router.py
run_task_server(study, run_mode=None)
Creates the task server interface for managing task communications.
This function determines which server to send tasks to. It checks if Celery is set as the task server; if not, it logs an error message. The run mode can be specified to determine how tasks should be executed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
study
|
MerlinStudy
|
The study object representing the current experiment setup, containing configuration details for the task server. |
required |
run_mode
|
str
|
The type of run mode to use for task execution. This can include options such as 'local' or 'batch'. |
None
|
Source code in merlin/router.py
stop_workers(task_server, spec_worker_names, queues, workers_regex)
This function sends a command to stop workers that match the specified criteria from the designated task server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server from which to stop workers. |
required |
spec_worker_names
|
List[str]
|
A list of worker names to stop, as defined in a specification. |
required |
queues
|
List[str]
|
A list of queues from which to stop associated workers. |
required |
workers_regex
|
str
|
A regex pattern used to filter the workers to stop. |
required |
Source code in merlin/router.py
wait_for_workers(sleep, task_server, spec)
Wait for workers to start up by checking their status at regular intervals.
This function monitors the specified task server for the startup of worker processes. It checks for the existence of the expected workers up to 10 times, sleeping for a specified number of seconds between each check. If no workers are detected after the maximum number of attempts, it raises an error to terminate the monitoring process, indicating a potential issue with the task server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sleep
|
int
|
The number of seconds to pause between each check for worker status. |
required |
task_server
|
str
|
The task server from which to query for worker status. |
required |
spec
|
MerlinSpec
|
An instance of the
|
required |
Raises:
| Type | Description |
|---|---|
NoWorkersException
|
If no workers are detected after the maximum number of checks. |