tasks
This module contains Celery task definitions.
The purpose of this module is to convert the Directed Acyclic Graph
(DAG) provided by Maestro into smaller tasks that
Celery can manage.
add_merlin_expanded_chain_to_chord(self, task_type, chain_, samples, labels, sample_index, adapter_config, min_sample_id)
Expand tasks in a chain and add the expanded tasks to the current chord.
This Celery task recursively expands a chain of tasks based on provided sample values and their corresponding labels. The expanded tasks are configured with specific parameters and added to the current chord for execution. The function handles both the expansion of tasks and the management of task dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self
|
Task
|
The current task instance. |
required |
task_type
|
Signature
|
The Celery task signature type for the new tasks to be created. |
required |
chain_
|
List[Step]
|
A list of tasks to expand into a chain. |
required |
samples
|
List[Any]
|
The sample values to use for each new task. |
required |
labels
|
List[str]
|
The sample labels corresponding to the samples. |
required |
sample_index
|
SampleIndex
|
The sample index that contains the directory structure for tasks. |
required |
adapter_config
|
Dict
|
Configuration settings for the adapter used in task execution. |
required |
min_sample_id
|
int
|
An offset to use for the sample index. |
required |
Source code in merlin/common/tasks.py
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 | |
add_simple_chain_to_chord(self, task_type, chain_, adapter_config)
Add a chain of tasks to the current chord for execution.
This function takes a list of tasks, modifies their signatures based on provided parameters, and adds them to the current chord. Each task in the chain is transformed into a new task signature with specific configurations such as queue and task ID.
This function takes a list of steps and creates signatures based on the parameters they provide, such as queue and workspace. It then adds these signatures to the current chord for later execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self
|
Task
|
The current task instance invoking this method. |
required |
task_type
|
Signature
|
The Celery task signature type that the new tasks should be based on. |
required |
chain_
|
List[Step]
|
A list of tasks to expand into a chain. Each task should provide necessary parameters for signature creation. |
required |
adapter_config
|
Dict
|
Configuration settings for the adapter used in task execution. |
required |
Source code in merlin/common/tasks.py
chordfinisher(*args, **kwargs)
Synchronization callback for Celery chords.
This function serves as a synchronization point between groups of tasks
in a Celery workflow. In Celery, using chain(group, group) does not
guarantee that the second group will execute only after the first group
has completed. Instead, both groups are executed independently.
To enforce a synchronization point between these groups, this function is used as a callback in a chord. It allows for the declaration of chains of groups dynamically, ensuring that subsequent tasks wait for the completion of all tasks in the preceding groups.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
List
|
Variable length argument list. Needed by Celery. |
()
|
**kwargs
|
Dict
|
Arbitrary keyword arguments. Needed by Celery. |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
A constant string "SYNC" indicating the synchronization point has been reached. |
Source code in merlin/common/tasks.py
condense_status_files(self, *args, **kwargs)
Condenses status files after a section of the sample tree has completed processing.
This task gathers status information from a specified
SampleIndex and condenses it into a single
JSON file. It handles potential race conditions by using a file lock during
the write operation. If the condensed status file already exists, it merges
the new statuses with the existing ones.
Notes
- The task will remove the original status files after condensing them into the JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self
|
Task
|
The current task instance. |
required |
*args
|
Any
|
Additional positional arguments (not used in this task). |
()
|
**kwargs
|
Any
|
Keyword arguments containing:
|
{}
|
Returns:
| Type | Description |
|---|---|
ReturnCode
|
A |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If the file lock cannot be acquired within the specified timeout period, which triggers a task restart. |
Source code in merlin/common/tasks.py
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 | |
expand_tasks_with_samples(self, dag, chain_, samples, labels, task_type, adapter_config, level_max_dirs)
Expands a chain of task names into a group of Celery chains, using samples and labels for variable substitution.
This task determines whether the provided chain of tasks requires
expansion based on the structure of the Directed Acyclic Graph (DAG),
samples, and labels. If expansion is needed, it generates and queues new tasks
for each range of samples. Otherwise, it queues a simple chain task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self
|
Task
|
The current task instance. |
required |
dag
|
DAG
|
A Merlin Directed Acyclic Graph
( |
required |
chain_
|
List[str]
|
A list of task names to be expanded into a Celery group of chains. |
required |
samples
|
List[List[str]]
|
A list of lists containing Merlin sample values for variable substitution. |
required |
labels
|
List[str]
|
A list of strings representing the labels associated with each column in the samples. |
required |
task_type
|
Callable
|
The Celery task type to create, currently expected
to be |
required |
adapter_config
|
Dict
|
A configuration dictionary for Maestro script adapters. |
required |
level_max_dirs
|
int
|
The maximum number of directories allowed per level in the sample hierarchy. |
required |
Source code in merlin/common/tasks.py
707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 | |
gather_statuses(sample_index, workspace, condensed_workspace, files_to_remove)
Traverse the sample index and gather all statuses into a single dictionary.
This function iterates through the provided
SampleIndex object,
reading status files from each sample's workspace. It condenses
the statuses into a single dictionary while tracking which files
need to be removed after condensing. The function ensures that
only completed statuses are included in the condensed output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sample_index
|
SampleIndex
|
A
|
required |
workspace
|
str
|
The full path to the workspace for the step being condensed. |
required |
condensed_workspace
|
str
|
A shortened version of the workspace path that will be used in the status files. |
required |
files_to_remove
|
List[str]
|
A list that will be populated with file paths of status files that need to be removed after condensing. |
required |
Returns:
| Type | Description |
|---|---|
Dict
|
A dictionary containing the condensed statuses gathered from the status files. |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If a timeout occurs while reading a status file, triggering a restart of the task. |
FileNotFoundError
|
If a status file is not found during the condensing process. |
Source code in merlin/common/tasks.py
get_1d_chain(all_chains)
Convert a 2D list of task chains into a 1D list of task signatures.
This function takes a two-dimensional list of task signatures, where each inner list represents a parallel group of tasks. It transforms this structure into a one-dimensional list suitable for creating a linear chain of tasks. If there is only one chain, it returns that chain directly. If there are multiple chains, it sets up dependencies between tasks to ensure proper execution order.
Notes
- The function processes the chains in reverse order to correctly set up the dependencies before adding them to the final list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
all_chains
|
List[List[Signature]]
|
A two-dimensional list of task signatures, where each inner list represents a group of tasks that can be executed in parallel. |
required |
Returns:
| Type | Description |
|---|---|
List[Signature]
|
A one-dimensional list of task signatures representing a chain of tasks, with dependencies set up for proper execution order. |
Source code in merlin/common/tasks.py
is_chain_expandable(chain_, labels)
Determine if the steps in the given chain are expandable.
A chain is considered expandable if all steps within the chain require
expansion. Conversely, if none of the steps require expansion, the chain
is not expandable. If there is a mix of steps that require expansion and
those that do not, an InvalidChainException is raised, indicating that
the chain is incompatible.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain_
|
List[Step]
|
A list of |
required |
labels
|
List[str]
|
The labels associated with the steps in the chain, used to determine if expansion is needed. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if all steps in the chain are expandable, False if none are expandable. |
Raises:
| Type | Description |
|---|---|
InvalidChainException
|
If there is a mix of steps that require expansion and those that do not, indicating an incompatible chain. |
Source code in merlin/common/tasks.py
launch_chain(self, chain_1d, condense_sig=None)
Launch a 1D chain of task signatures appropriately based on the execution context.
This function handles the launching of a list of task signatures in a one-dimensional chain. The behavior varies depending on whether the execution is local or remote, and whether the tasks involve sample processing that requires condensing status files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self
|
Task
|
The current task instance invoking this method. |
required |
chain_1d
|
List[Signature]
|
A one-dimensional list of task signatures to be launched. |
required |
condense_sig
|
Signature
|
A signature for condensing the status files after task execution. If None, condensing is not required. |
None
|
Source code in merlin/common/tasks.py
merlin_step(self, *args, **kwargs)
Executes a Merlin step.
This task executes a step in the Merlin workflow, handling various outcomes such as success, retries, and failures. It can also manage chaining to the next step in the workflow.
Notes
- If the step has already been completed, it will be skipped.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self
|
Task
|
The current task instance. |
required |
*args
|
Any
|
Positional arguments, one of which should be an instance
of |
()
|
**kwargs
|
Any
|
Optional keyword arguments that include:
Example kwargs dict where |
{}
|
Returns:
| Type | Description |
|---|---|
ReturnCode
|
The result of the step execution, which can indicate success, various failure modes, or a request to retry. |
Source code in merlin/common/tasks.py
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | |
prepare_chain_workspace(sample_index, chain_)
Prepares a user's workspace for each step in the given chain of dependent steps.
This function iterates through a list of Step objects and
prepares the necessary workspace for each step by creating directories and writing
sample index files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sample_index
|
SampleIndex
|
An object that manages sample indexing and workspace preparation. |
required |
chain_
|
List[Step]
|
A list of |
required |
Source code in merlin/common/tasks.py
queue_merlin_study(study, adapter)
Launch a chain of tasks based on a MerlinStudy.
This Celery task initiates a series of tasks derived from a
MerlinStudy object. It processes
the study's Directed Acyclic Graph (DAG)
to group tasks and convert them into a chain of Celery tasks
for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
study
|
MerlinStudy
|
The study object containing samples, sample labels,
and the Directed Acyclic Graph ( |
required |
adapter
|
Dict
|
An adapter object used to facilitate interactions with the study's data or processing logic. |
required |
Returns:
| Type | Description |
|---|---|
AsyncResult
|
An instance representing the asynchronous result of the task chain, allowing for tracking and management of the task's execution. |
Source code in merlin/common/tasks.py
shutdown_workers(self, shutdown_queues)
Initiates the shutdown of Celery workers.
This task wraps the stop_celery_workers
function, allowing for the graceful shutdown of specified Celery worker queues. It is
acknowledged immediately upon execution, ensuring that it will not be requeued, even
if executed by a worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self
|
Task
|
The current task instance. |
required |
shutdown_queues
|
List[str]
|
A list of specific queues to shut down. If None, all queues will be shut down. |
required |