Skip to content

celeryadapter

This module provides an adapter to the Celery Distributed Task Queue.

build_csv_queue_info(query_return, date)

Construct a dictionary containing queue information and column labels for writing to a CSV file.

This function processes the output from the query_queues function and organizes the data into a format suitable for CSV export. It includes a timestamp to indicate when the status was recorded.

Parameters:

Name Type Description Default
query_return List[Tuple[str, int, int]]

The output from the query_queues function, containing queue names and their associated statistics.

required
date str

A timestamp indicating when the queue status was recorded.

required

Returns:

Type Description
Dict[str, List]

A dictionary where keys are column labels and values are lists containing the corresponding queue information, formatted for CSV output.

Source code in merlin/study/celeryadapter.py
def build_csv_queue_info(query_return: List[Tuple[str, int, int]], date: str) -> Dict[str, List]:
    """
    Construct a dictionary containing queue information and column labels
    for writing to a CSV file.

    This function processes the output from the [`query_queues`][router.query_queues]
    function and organizes the data into a format suitable for CSV export. It includes
    a timestamp to indicate when the status was recorded.

    Args:
        query_return: The output from the [`query_queues`][router.query_queues] function,
            containing queue names and their associated statistics.
        date: A timestamp indicating when the queue status was recorded.

    Returns:
        A dictionary where keys are column labels and values are lists containing the
            corresponding queue information, formatted for CSV output.
    """
    # Build the list of labels if necessary
    csv_to_dump = {"time": [date]}
    for queue_name, queue_stats in query_return.items():
        csv_to_dump[f"{queue_name}:tasks"] = [str(queue_stats["jobs"])]
        csv_to_dump[f"{queue_name}:consumers"] = [str(queue_stats["consumers"])]

    return csv_to_dump

build_json_queue_info(query_return, date)

Construct a dictionary containing queue information for JSON export.

This function processes the output from the query_queues function and organizes the data into a structured format suitable for JSON serialization. It includes a timestamp to indicate when the queue status was recorded.

Parameters:

Name Type Description Default
query_return List[Tuple[str, int, int]]

The output from the query_queues function, containing queue names and their associated statistics.

required
date str

A timestamp indicating when the queue status was recorded.

required

Returns:

Type Description
Dict

A dictionary structured for JSON output, where the keys are timestamps and the values are dictionaries containing queue names and their corresponding statistics (tasks and consumers).

Source code in merlin/study/celeryadapter.py
def build_json_queue_info(query_return: List[Tuple[str, int, int]], date: str) -> Dict:
    """
    Construct a dictionary containing queue information for JSON export.

    This function processes the output from the [`query_queues`][router.query_queues]
    function and organizes the data into a structured format suitable for JSON
    serialization. It includes a timestamp to indicate when the queue status was
    recorded.

    Args:
        query_return: The output from the [`query_queues`][router.query_queues]
            function, containing queue names and their associated statistics.
        date: A timestamp indicating when the queue status was recorded.

    Returns:
        A dictionary structured for JSON output, where the keys are timestamps
            and the values are dictionaries containing queue names and their
            corresponding statistics (tasks and consumers).
    """
    # Get the datetime so we can track different entries and initalize a new json entry
    json_to_dump = {date: {}}

    # Add info for each queue (name)
    for queue_name, queue_stats in query_return.items():
        json_to_dump[date][queue_name] = {"tasks": queue_stats["jobs"], "consumers": queue_stats["consumers"]}

    return json_to_dump

build_set_of_queues(spec, steps, specific_queues, verbose=True, app=None)

Construct a set of queues to query based on the provided parameters.

This function builds a set of queues by querying a MerlinSpec object for queues associated with specified steps and/or filtering for specific queue names. If no spec or specific queues are provided, it defaults to querying active queues from the Celery application.

Parameters:

Name Type Description Default
spec MerlinSpec

A MerlinSpec object that defines the context for the query. Can be None.

required
steps List[str]

A list of step names to query. If empty, all steps are considered.

required
specific_queues List[str]

A list of specific queue names to filter. Can be None.

required
verbose bool

If True, log statements will be output. Defaults to True.

True
app Celery

A Celery application instance. If None, it will be imported.

None

Returns:

Type Description
Set[str]

A set of queue names to investigate based on the provided parameters.

Source code in merlin/study/celeryadapter.py
def build_set_of_queues(
    spec: MerlinSpec,
    steps: List[str],
    specific_queues: List[str],
    verbose: bool = True,
    app: Celery = None,
) -> Set[str]:
    """
    Construct a set of queues to query based on the provided parameters.

    This function builds a set of queues by querying a [`MerlinSpec`][spec.specification.MerlinSpec]
    object for queues associated with specified steps and/or filtering for specific queue names.
    If no spec or specific queues are provided, it defaults to querying active queues from the Celery
    application.

    Args:
        spec (spec.specification.MerlinSpec): A [`MerlinSpec`][spec.specification.MerlinSpec]
            object that defines the context for the query. Can be None.
        steps: A list of step names to query. If empty, all steps are considered.
        specific_queues: A list of specific queue names to filter. Can be None.
        verbose: If True, log statements will be output. Defaults to True.
        app: A Celery application instance. If None, it will be imported.

    Returns:
        A set of queue names to investigate based on the provided parameters.
    """
    if app is None:
        from merlin.celery import app  # pylint: disable=C0415

    queues = set()
    # If the user provided a spec file, get the queues from that spec
    if spec:
        if verbose:
            LOG.info(f"Querying queues for steps = {steps}")
        queues = set(spec.get_queue_list(steps))

    # If the user provided specific queues, search for those
    if specific_queues:
        queues = _get_specific_queues(queues, specific_queues, spec, verbose=verbose)

    # Default behavior with no options provided; display active queues
    if not spec and not specific_queues:
        if verbose:
            LOG.info("Querying active queues")
        existing_queues, _ = get_active_celery_queues(app)

        # Check if there's any active queues currently
        if len(existing_queues) == 0:
            if verbose:
                LOG.warning("No active queues found. Are your workers running yet?")
            return set()

        # Set the queues we're going to check to be all existing queues by default
        queues = set(existing_queues.keys())

    return queues

celerize_queues(queues, config=None)

Prepend a queue tag to each queue in the provided list to conform to Celery's queue naming requirements.

This function modifies the input list of queues by adding a specified queue tag from the configuration. If no configuration is provided, it defaults to using the global configuration settings.

Parameters:

Name Type Description Default
queues List[str]

A list of queue names that need the queue tag prepended.

required
config SimpleNamespace

A SimpleNamespace of configuration settings. If not provided, the function will use the default configuration.

None
Source code in merlin/study/celeryadapter.py
def celerize_queues(queues: List[str], config: SimpleNamespace = None):
    """
    Prepend a queue tag to each queue in the provided list to conform to Celery's
    queue naming requirements.

    This function modifies the input list of queues by adding a specified queue tag
    from the configuration. If no configuration is provided, it defaults to using
    the global configuration settings.

    Args:
        queues: A list of queue names that need the queue tag prepended.
        config: A SimpleNamespace of configuration settings. If not provided, the
            function will use the default configuration.
    """
    if config is None:
        from merlin.config.configfile import CONFIG as config  # pylint: disable=C0415

    for i, queue in enumerate(queues):
        queues[i] = f"{config.celery.queue_tag}{queue}"

dump_celery_queue_info(query_return, dump_file)

Format and dump Celery queue information to a specified file.

This function processes the output from the query_queues function, formats the data according to the file type (CSV or JSON), and adds a timestamp to the information before writing it to the specified file.

Parameters:

Name Type Description Default
query_return List[Tuple[str, int, int]]

The output from the query_queues function, containing queue names and their associated statistics.

required
dump_file str

The filepath of the file where the queue information will be written. The file extension determines the format (CSV or JSON).

required
Source code in merlin/study/celeryadapter.py
def dump_celery_queue_info(query_return: List[Tuple[str, int, int]], dump_file: str):
    """
    Format and dump Celery queue information to a specified file.

    This function processes the output from the `query_queues` function, formats
    the data according to the file type (CSV or JSON), and adds a timestamp
    to the information before writing it to the specified file.

    Args:
        query_return: The output from the [`query_queues`][router.query_queues]
            function, containing queue names and their associated statistics.
        dump_file: The filepath of the file where the queue information
            will be written. The file extension determines the format (CSV or JSON).
    """
    # Get a timestamp for this dump
    date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # Handle different file types
    if dump_file.endswith(".csv"):
        # Build the lists of information/labels we'll need
        dump_info = build_csv_queue_info(query_return, date)
    elif dump_file.endswith(".json"):
        # Build the dict of info to dump to the json file
        dump_info = build_json_queue_info(query_return, date)

    # Dump the information
    dump_handler(dump_file, dump_info)

get_active_celery_queues(app)

Retrieve all active queues and their associated workers for a Celery application.

This function queries the application's server to obtain a comprehensive view of active queues and the workers connected to them. It returns a dictionary where each key is a queue name and the value is a list of workers attached to that queue. Additionally, it provides a list of all active workers in the application.

Note

Unlike get_running_queues, this function goes through the application's server.

Parameters:

Name Type Description Default
app Celery

The Celery application instance.

required

Returns:

Type Description
Tuple[Dict[str, List[str]], List[str]]

A tuple containing:

  • A dictionary mapping queue names to lists of workers connected to them.
  • A list of all active workers in the application.
Example
from merlin.celery import app
queues, workers = get_active_celery_queues(app)
queue_names = list(queues)
workers_on_q0 = queues[queue_names[0]]
workers_not_on_q0 = [worker for worker in workers if worker not in workers_on_q0]
Source code in merlin/study/celeryadapter.py
def get_active_celery_queues(app: Celery) -> Tuple[Dict[str, List[str]], List[str]]:
    """
    Retrieve all active queues and their associated workers for a Celery application.

    This function queries the application's server to obtain a comprehensive
    view of active queues and the workers connected to them. It returns a
    dictionary where each key is a queue name and the value is a list of
    workers attached to that queue. Additionally, it provides a list of all
    active workers in the application.

    Note:
        Unlike [`get_running_queues`][study.celeryadapter.get_running_queues],
        this function goes through the application's server.

    Args:
        app: The Celery application instance.

    Returns:
        A tuple containing:\n
            - A dictionary mapping queue names to lists of workers connected to them.
            - A list of all active workers in the application.

    Example:
        ```python
        from merlin.celery import app
        queues, workers = get_active_celery_queues(app)
        queue_names = list(queues)
        workers_on_q0 = queues[queue_names[0]]
        workers_not_on_q0 = [worker for worker in workers if worker not in workers_on_q0]
        ```
    """
    i = app.control.inspect()
    active_workers = i.active_queues()
    if active_workers is None:
        active_workers = {}
    queues = {}
    for worker in active_workers:
        for my_queue in active_workers[worker]:
            try:
                queues[my_queue["name"]].append(worker)
            except KeyError:
                queues[my_queue["name"]] = [worker]
    return queues, [*active_workers]

get_running_queues(celery_app_name, test_mode=False)

Check for running Celery workers and retrieve their associated queues.

This function inspects currently running processes to identify active Celery workers. It extracts queue names from the -Q tag in the command line of the worker processes. The returned list contains only unique Celery queue names. This function must be executed on the allocation where the workers are running.

Note

Unlike get_active_celery_queues, this function does not go through the application's server.

Parameters:

Name Type Description Default
celery_app_name str

The name of the Celery app (typically "merlin" unless in test mode).

required
test_mode bool

If True, the function runs in test mode.

False

Returns:

Type Description
List[str]

A unique list of Celery queue names with workers attached to them.

Source code in merlin/study/celeryadapter.py
def get_running_queues(celery_app_name: str, test_mode: bool = False) -> List[str]:
    """
    Check for running Celery workers and retrieve their associated queues.

    This function inspects currently running processes to identify active
    Celery workers. It extracts queue names from the `-Q` tag in the
    command line of the worker processes. The returned list contains
    only unique Celery queue names. This function must be executed
    on the allocation where the workers are running.

    Note:
        Unlike [`get_active_celery_queues`][study.celeryadapter.get_active_celery_queues],
        this function does _not_ go through the application's server.

    Args:
        celery_app_name: The name of the Celery app (typically "merlin"
            unless in test mode).
        test_mode: If True, the function runs in test mode.

    Returns:
        A unique list of Celery queue names with workers attached to them.
    """
    running_queues = []

    if not is_running(f"{celery_app_name} worker"):
        return running_queues

    proc_name = "celery" if not test_mode else "sh"
    procs = get_procs(proc_name)
    for _, lcmd in procs:
        lcmd = list(filter(None, lcmd))
        cmdline = " ".join(lcmd)
        if "-Q" in cmdline:
            if test_mode:
                echo_cmd = lcmd.pop(2)
                lcmd.extend(echo_cmd.split())
            running_queues.extend(lcmd[lcmd.index("-Q") + 1].split(","))

    running_queues = list(set(running_queues))

    return running_queues

purge_celery_tasks(queues, force)

Purge Celery tasks from the specified queues.

This function constructs and executes a command to purge tasks from the specified Celery queues. If the force parameter is set to True, the purge operation will be executed without prompting for confirmation.

Parameters:

Name Type Description Default
queues str

A comma-separated string of the queue name(s) from which tasks should be purged.

required
force bool

If True, the purge operation will be executed without asking for user confirmation.

required

Returns:

Type Description
int

The return code from the subprocess execution. A return code of 0 indicates success, while any non-zero value indicates an error occurred during the purge operation.

Source code in merlin/study/celeryadapter.py
def purge_celery_tasks(queues: str, force: bool) -> int:
    """
    Purge Celery tasks from the specified queues.

    This function constructs and executes a command to purge tasks from the
    specified Celery queues. If the `force` parameter is set to True, the
    purge operation will be executed without prompting for confirmation.

    Args:
        queues: A comma-separated string of the queue name(s) from which
            tasks should be purged.
        force: If True, the purge operation will be executed without asking
            for user confirmation.

    Returns:
        The return code from the subprocess execution. A return code of
            0 indicates success, while any non-zero value indicates an error
            occurred during the purge operation.
    """
    # This version will purge all queues.
    # from merlin.celery import app
    # app.control.purge()
    force_com = ""
    if force:
        force_com = " -f "
    purge_command = " ".join(["celery -A merlin purge", force_com, "-Q", queues])
    LOG.debug(purge_command)
    return subprocess.run(purge_command, shell=True).returncode

query_celery_queues(queues, app=None, config=None)

Retrieve information about the number of jobs and consumers for specified Celery queues.

This function constructs a dictionary containing details about the number of jobs and consumers associated with each queue provided in the input list. It connects to the Celery application to gather this information, handling both Redis and RabbitMQ brokers.

Notes
  • If the specified queue does not exist or has no jobs, it will be handled gracefully.
  • For Redis brokers, the function counts consumers by inspecting active queues since Redis does not track consumers like RabbitMQ does.

Parameters:

Name Type Description Default
queues List[str]

A list of queue names for which to gather information.

required
app Celery

The Celery application instance. Defaults to None, which triggers an import for testing purposes.

None
config Config

A configuration object containing broker details. Defaults to None, which also triggers an import for testing.

None

Returns:

Type Description
Dict[str, Dict[str, int]]

A dictionary where each key is a queue name and the value is another dictionary containing:

  • jobs: The number of jobs in the queue.
  • consumers: The number of consumers attached to the queue.
Source code in merlin/study/celeryadapter.py
def query_celery_queues(queues: List[str], app: Celery = None, config: Config = None) -> Dict[str, Dict[str, int]]:
    """
    Retrieve information about the number of jobs and consumers for specified Celery queues.

    This function constructs a dictionary containing details about the number of jobs
    and consumers associated with each queue provided in the input list. It connects
    to the Celery application to gather this information, handling both Redis and
    RabbitMQ brokers.

    Notes:
        - If the specified queue does not exist or has no jobs, it will be handled gracefully.
        - For Redis brokers, the function counts consumers by inspecting active queues
          since Redis does not track consumers like RabbitMQ does.

    Args:
        queues: A list of queue names for which to gather information.
        app: The Celery application instance. Defaults to None, which triggers an import
            for testing purposes.
        config (config.Config): A configuration object containing broker details.
            Defaults to None, which also triggers an import for testing.

    Returns:
        A dictionary where each key is a queue name and the value is another dictionary
            containing:\n
            - `jobs`: The number of jobs in the queue.
            - `consumers`: The number of consumers attached to the queue.
    """
    if app is None:
        from merlin.celery import app  # pylint: disable=C0415
    if config is None:
        from merlin.config.configfile import CONFIG as config  # pylint: disable=C0415

    # Initialize the dictionary with the info we want about our queues
    queue_info = {queue: {"consumers": 0, "jobs": 0} for queue in queues}

    # Open a connection via our Celery app
    with app.connection() as conn:
        # Open a channel inside our connection
        with conn.channel() as channel:
            # Loop through all the queues we're searching for
            for queue in queues:
                try:
                    # Count the number of jobs and consumers for each queue
                    _, queue_info[queue]["jobs"], queue_info[queue]["consumers"] = channel.queue_declare(
                        queue=queue, passive=True
                    )
                # Redis likes to throw this error when a queue we're looking for has no jobs
                except ChannelError:
                    pass

    # Redis doesn't keep track of consumers attached to queues like rabbit does
    # so we have to count this ourselves here
    if config.broker.name in ("rediss", "redis"):
        # Get a dict of active queues by querying the celery app
        active_queues = app.control.inspect().active_queues()
        if active_queues is not None:
            # Loop through each active queue that was found
            for active_queue_list in active_queues.values():
                # Loop through each queue that each worker is watching
                for active_queue in active_queue_list:
                    # If this is a queue we're looking for, increment the consumer count
                    if active_queue["name"] in queues:
                        queue_info[active_queue["name"]]["consumers"] += 1

    return queue_info

run_celery(study, run_mode=None)

Run the given MerlinStudy object with optional Celery configuration.

This function executes the provided MerlinStudy object. If the run_mode is set to "local", it configures Celery to run in local mode (without utilizing workers). Otherwise, it connects to the Celery server to queue tasks.

Parameters:

Name Type Description Default
study MerlinStudy

The study object to be executed.

required
run_mode str

The mode in which to run the study. If set to "local", Celery runs locally.

None
Source code in merlin/study/celeryadapter.py
def run_celery(study: MerlinStudy, run_mode: str = None):
    """
    Run the given [`MerlinStudy`][study.study.MerlinStudy] object with optional
    Celery configuration.

    This function executes the provided [`MerlinStudy`][study.study.MerlinStudy]
    object. If the `run_mode` is set to "local", it configures Celery to run in
    local mode (without utilizing workers). Otherwise, it connects to the Celery
    server to queue tasks.

    Args:
        study (study.study.MerlinStudy): The study object to be executed.
        run_mode: The mode in which to run the study. If set to "local",
            Celery runs locally.
    """
    # Only import celery stuff if we want celery in charge
    # Pylint complains about circular import between merlin.common.tasks -> merlin.router -> merlin.study.celeryadapter
    # For now I think this is still the best way to do this so we'll ignore it
    from merlin.celery import app  # pylint: disable=C0415
    from merlin.common.tasks import queue_merlin_study  # pylint: disable=C0415, R0401

    adapter_config = study.get_adapter_config(override_type="local")

    if run_mode == "local":
        app.conf.task_always_eager = True
        app.conf.task_eager_propogates = True
    else:
        # Check for server
        app.connection().connect()

    # Send the tasks to the server
    queue_merlin_study(study, adapter_config)