Skip to content

broker

This module provides utility functions and constants to manage broker configurations and connection strings for various messaging systems, including RabbitMQ and Redis. It supports multiple connection protocols and configurations, such as SSL, Unix sockets, and password inclusion.

The module defines constants for supported brokers and connection string templates, along with functions to construct and retrieve connection strings and SSL configurations based on settings defined in the app.yaml configuration file.

get_connection_string(include_password=True)

Constructs and returns a connection string based on the broker configuration.

This function retrieves the connection string from the CONFIG.broker.url if available. Otherwise, it determines the connection string based on the broker name specified in the configuration file (app.yaml). If the broker name is not supported, a ValueError is raised.

Parameters:

Name Type Description Default
include_password bool

Whether to include the password in the connection string.

True

Returns:

Type Description
str

A formatted connection string based on the broker configuration.

Raises:

Type Description
ValueError

If the broker name is not supported.

Source code in merlin/config/broker.py
def get_connection_string(include_password: bool = True) -> str:
    """
    Constructs and returns a connection string based on the broker configuration.

    This function retrieves the connection string from the `CONFIG.broker.url` if available.
    Otherwise, it determines the connection string based on the broker name specified in the
    configuration file (`app.yaml`). If the broker name is not supported, a `ValueError` is raised.

    Args:
        include_password (bool): Whether to include the password in the connection string.

    Returns:
        A formatted connection string based on the broker configuration.

    Raises:
        ValueError: If the broker name is not supported.
    """
    try:
        return CONFIG.broker.url
    except AttributeError:
        # The broker may not have a url
        pass

    try:
        broker = CONFIG.broker.name.lower()
    except AttributeError:
        broker = ""

    if broker not in BROKERS:
        raise ValueError(f"Error: {broker} is not a supported broker.")
    return _sort_valid_broker(broker, include_password)

get_rabbit_connection(include_password, conn='amqps')

Constructs and returns a RabbitMQ connection string based on broker configurations.

This function reads broker configurations (such as server, port, username, password, and vhost) and formats them into a RabbitMQ connection string. Optionally, the password can be included in the connection string if include_password is set to True.

Parameters:

Name Type Description Default
include_password bool

Whether to include the password in the connection string.

required
conn str

The connection protocol to use. Defaults to "amqps". Supported values are "amqp" and "amqps".

'amqps'

Returns:

Type Description
str

A formatted RabbitMQ connection string.

Raises:

Type Description
ValueError

If the password file path is not provided in the broker configuration, or if the password file does not exist or cannot be read.

Source code in merlin/config/broker.py
def get_rabbit_connection(include_password: bool, conn: str = "amqps") -> str:
    """
    Constructs and returns a RabbitMQ connection string based on broker configurations.

    This function reads broker configurations (such as server, port, username, password, and vhost)
    and formats them into a RabbitMQ connection string. Optionally, the password can be included
    in the connection string if `include_password` is set to `True`.

    Args:
        include_password (bool): Whether to include the password in the connection string.
        conn (str, optional): The connection protocol to use. Defaults to "amqps".
            Supported values are "amqp" and "amqps".

    Returns:
        A formatted RabbitMQ connection string.

    Raises:
        ValueError: If the password file path is not provided in the broker configuration, or if
            the password file does not exist or cannot be read.
    """
    LOG.debug(f"Broker: connection = {conn}")

    vhost = CONFIG.broker.vhost
    LOG.debug(f"Broker: vhost = {vhost}")

    username = CONFIG.broker.username
    LOG.debug(f"Broker: username = {username}")

    server = CONFIG.broker.server
    LOG.debug(f"Broker: server = {server}")

    try:
        password = resolve_password(CONFIG.broker.password, "Broker")
    except (AttributeError, KeyError) as exc:
        raise ValueError("Broker: No password provided for RabbitMQ") from exc

    try:
        port = CONFIG.broker.port
        LOG.debug(f"Broker: RabbitMQ port = {port}")
    except (AttributeError, KeyError):
        if conn == "amqp":
            port = 5672
        else:
            port = 5671
        LOG.debug(f"Broker: RabbitMQ using default port = {port}")

    # Test configurations.
    rabbitmq_config = {
        "conn": conn,
        "vhost": vhost,
        "username": username,
        "password": "******",
        "server": server,
        "port": port,
    }

    if include_password:
        rabbitmq_config["password"] = password

    return RABBITMQ_CONNECTION.format(**rabbitmq_config)

get_redis_connection(include_password, use_ssl=False)

Constructs and returns a Redis connection string, optionally using SSL and including a password.

This function retrieves broker configurations (such as server, port, username, password, and database number) and formats them into a Redis connection string. The connection can be configured to use SSL (rediss protocol) and optionally include the password in the connection string.

Parameters:

Name Type Description Default
include_password bool

Whether to include the password in the connection string.

required
use_ssl bool

Whether to use the rediss protocol (SSL).

False

Returns:

Type Description
str

A formatted Redis connection string.

Source code in merlin/config/broker.py
def get_redis_connection(include_password: bool, use_ssl: bool = False) -> str:  # noqa C901
    """
    Constructs and returns a Redis connection string, optionally using SSL and including a password.

    This function retrieves broker configurations (such as server, port, username, password, and database number)
    and formats them into a Redis connection string. The connection can be configured to use SSL (`rediss` protocol)
    and optionally include the password in the connection string.

    Args:
        include_password (bool): Whether to include the password in the connection string.
        use_ssl (bool, optional): Whether to use the `rediss` protocol (SSL).

    Returns:
        A formatted Redis connection string.
    """
    server = CONFIG.broker.server
    LOG.debug(f"Broker: server = {server}")

    urlbase = "rediss" if use_ssl else "redis"

    try:
        port = CONFIG.broker.port
        LOG.debug(f"Broker: redis port = {port}")
    except (AttributeError, KeyError):
        port = 6379
        LOG.debug(f"Broker: redis using default port = {port}")

    try:
        db_num = CONFIG.broker.db_num
    except (AttributeError, KeyError):
        db_num = 0
        LOG.debug(f"Broker: redis using default db_num = {db_num}")

    try:
        username = CONFIG.broker.username
    except (AttributeError, KeyError):
        username = ""

    try:
        password = resolve_password(CONFIG.broker.password, "Broker")
        if include_password:
            spass = f"{username}:{password}@"
        else:
            spass = f"{username}:******@"
    except (AttributeError, KeyError):
        spass = ""
        LOG.debug(f"Broker: redis using default password = {spass}")

    return f"{urlbase}://{spass}{server}:{port}/{db_num}"

get_redissock_connection()

Constructs and returns a Redis connection string using a Unix socket.

This function retrieves broker configurations, such as the database number (db_num) and the Unix socket file path (path), and formats them into a Redis connection string.

If the database number is not specified in the configuration, it defaults to 0.

Returns:

Type Description
str

A formatted Redis connection string using a Unix socket.

Source code in merlin/config/broker.py
def get_redissock_connection() -> str:
    """
    Constructs and returns a Redis connection string using a Unix socket.

    This function retrieves broker configurations, such as the database number (`db_num`) and
    the Unix socket file path (`path`), and formats them into a Redis connection string.

    If the database number is not specified in the configuration, it defaults to `0`.

    Returns:
        A formatted Redis connection string using a Unix socket.
    """
    try:
        db_num = CONFIG.broker.db_num
    except (AttributeError, KeyError):
        db_num = 0
        LOG.debug(f"Broker: redis+socket using default db_num = {db_num}")

    redis_config = {"db_num": db_num, "path": CONFIG.broker.path}

    return REDISSOCK_CONNECTION.format(**redis_config)

get_ssl_config()

Retrieves the SSL configuration for the broker based on the settings in the app.yaml configuration file.

This function determines whether SSL should be used for the broker connection and, if applicable, returns the SSL configuration details. If the broker does not require SSL or is unsupported, the function returns False.

Returns:

Type Description
Union[bool, Dict[str, Union[str, VerifyMode]]]

This returns either:

  • False if SSL is not required or the broker is unsupported.
  • A dictionary containing SSL configuration details if SSL is required. The dictionary may include keys such as certificate paths and verification modes.
Source code in merlin/config/broker.py
def get_ssl_config() -> Union[bool, Dict[str, Union[str, ssl.VerifyMode]]]:
    """
    Retrieves the SSL configuration for the broker based on the settings in the `app.yaml` configuration file.

    This function determines whether SSL should be used for the broker connection and, if applicable,
    returns the SSL configuration details. If the broker does not require SSL or is unsupported,
    the function returns `False`.

    Returns:
        This returns either:\n
            - `False` if SSL is not required or the broker is unsupported.
            - A dictionary containing SSL configuration details if SSL is required.
              The dictionary may include keys such as certificate paths and verification modes.
    """
    broker: Union[bool, str] = ""
    try:
        broker = CONFIG.broker.url.split(":")[0]
    except AttributeError:
        # The broker may not have a url
        pass

    try:
        broker = CONFIG.broker.name.lower()
    except AttributeError:
        # The broker may not have a name
        pass

    if broker not in BROKERS:
        return False

    certs_path: Optional[str]
    try:
        certs_path = CONFIG.celery.certs
    except AttributeError:
        certs_path = None

    broker_ssl: Union[bool, Dict[str, Union[str, ssl.VerifyMode]]] = get_ssl_entries(
        "Broker", broker, CONFIG.broker, certs_path
    )

    if not broker_ssl:
        broker_ssl = True

    if broker in ("rabbitmq", "rediss", "amqps"):
        return broker_ssl

    return False