router
This module routes actions from the Merlin CLI to the appropriate tasking logic.
This module is intended to help keep the task management layer (i.e., Celery) decoupled from the logic the tasks are running.
dump_queue_info(task_server, query_return, dump_file)
Formats and dumps queue information for the specified task server.
This function prepares the queue data returned from the queue
query and formats it in a way that the Dumper
class can process. It also adds a timestamp to the information before
dumping it to the specified file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server from which to query queues. |
required |
query_return
|
List[Tuple[str, int, int]]
|
The output from the |
required |
dump_file
|
str
|
The filepath where the queue information will be dumped. |
required |
Source code in merlin/router.py
purge_tasks(task_server, spec, force, steps)
Purges all tasks from the specified task server.
This function removes tasks from the designated queues associated
with the specified steps. It operates without confirmation if
the force parameter is set to True. The function logs the
steps being purged and checks if Celery is the configured task
server before proceeding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server from which to purge tasks. |
required |
spec
|
MerlinSpec
|
A
|
required |
force
|
bool
|
If True, purge the tasks without any confirmation prompt. |
required |
steps
|
List[str]
|
A space-separated list of step names that define which queues to purge. If not specified, defaults to purging all steps. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The result of the purge operation; -1 if the task server is not supported (i.e., not Celery). |
Source code in merlin/router.py
query_queues(task_server, spec, steps, specific_queues, verbose=True)
Queries the status of queues from the specified task server.
This function checks the status of queues tied to a given task server, building a list of queues based on the provided steps and specific queue names. It supports querying Celery task servers and returns the results in a structured format. Logging behavior can be controlled with the verbose parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_server
|
str
|
The task server from which to query queues. |
required |
spec
|
MerlinSpec
|
A
|
required |
steps
|
List[str]
|
A space-separated list of step names to query. Default is to query all available steps if this is empty. |
required |
specific_queues
|
List[str]
|
A list of specific queue names to query. Can be empty or None to query all relevant queues. |
required |
verbose
|
bool
|
If True, enables logging of query operations. Defaults to True. |
True
|
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, int]]
|
A dictionary where the keys are queue names and the values are dictionaries containing the number of workers (consumers) and tasks (jobs) attached to each queue. |
Source code in merlin/router.py
run_task_server(study, run_mode=None)
Creates the task server interface for managing task communications.
This function determines which server to send tasks to. It checks if Celery is set as the task server; if not, it logs an error message. The run mode can be specified to determine how tasks should be executed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
study
|
MerlinStudy
|
The study object representing the current experiment setup, containing configuration details for the task server. |
required |
run_mode
|
str
|
The type of run mode to use for task execution. This can include options such as 'local' or 'batch'. |
None
|