Skip to content

results_backend

This module provides functionality for managing and configuring connection strings and SSL settings for various results backends, including MySQL, Redis, Rediss, and SQLite. The module relies on the application's configuration file (app.yaml) to determine backend settings and certificate paths.

get_connection_string(include_password=True)

Determines the appropriate results backend to use based on the package configuration and returns the corresponding connection string.

If a URL is explicitly defined in the configuration (CONFIG.results_backend.url), it is returned as the connection string.

Parameters:

Name Type Description Default
include_password bool

Whether to include the password in the connection string. If True, the password will be included; otherwise, it will be masked.

True

Returns:

Type Description
str

The connection string for the configured results backend.

Raises:

Type Description
ValueError

If the specified results backend in the configuration is not supported.

Source code in merlin/config/results_backend.py
def get_connection_string(include_password: bool = True) -> str:
    """
    Determines the appropriate results backend to use based on the package configuration
    and returns the corresponding connection string.

    If a URL is explicitly defined in the configuration (`CONFIG.results_backend.url`),
    it is returned as the connection string.

    Args:
        include_password (bool, optional): Whether to include the password in the connection string.
            If True, the password will be included; otherwise, it will be masked.

    Returns:
        The connection string for the configured results backend.

    Raises:
        ValueError: If the specified results backend in the configuration is not supported.
    """
    try:
        return CONFIG.results_backend.url
    except AttributeError:
        pass

    try:
        backend = CONFIG.results_backend.name.lower()
    except AttributeError:
        backend = ""

    if backend not in BACKENDS:
        msg = f"'{backend}' is not a supported results backend"
        raise ValueError(msg)

    try:
        certs_path = CONFIG.celery.certs
        certs_path = os.path.abspath(os.path.expanduser(certs_path))
    except AttributeError:
        certs_path = None

    return _resolve_backend_string(backend, certs_path, include_password)

get_mysql(certs_path=None, mysql_certs=None, include_password=True)

Constructs and returns a formatted MySQL connection string based on the provided parameters and application configuration.

Parameters:

Name Type Description Default
certs_path str

The path to the directory containing SSL certificates and password files.

None
mysql_certs dict

A dictionary mapping MySQL certificate keys (e.g., 'ssl_key', 'ssl_cert', 'ssl_ca') to their expected filenames. If this is None, it uses the default MYSQL_CONFIG_FILENAMES.

None
include_password bool

Whether to include the password in the connection string. If True, the password will be included; otherwise, it will be masked.

True

Returns:

Type Description
str

A formatted MySQL connection string.

Raises:

Type Description
TypeError
  • If the server configuration is missing or invalid.
  • If the MySQL connection information cannot be set due to missing certificates or configuration.
Source code in merlin/config/results_backend.py
def get_mysql(certs_path: str = None, mysql_certs: Dict = None, include_password: bool = True) -> str:
    """
    Constructs and returns a formatted MySQL connection string based on the provided parameters
    and application configuration.

    Args:
        certs_path (str, optional): The path to the directory containing SSL certificates and password files.
        mysql_certs (dict, optional): A dictionary mapping MySQL certificate keys (e.g., 'ssl_key', 'ssl_cert', 'ssl_ca')
            to their expected filenames. If this is None, it uses the default `MYSQL_CONFIG_FILENAMES`.
        include_password (bool, optional): Whether to include the password in the connection string.
            If True, the password will be included; otherwise, it will be masked.

    Returns:
        A formatted MySQL connection string.

    Raises:
        TypeError: \n
            - If the `server` configuration is missing or invalid.
            - If the MySQL connection information cannot be set due to missing certificates or configuration.
    """
    dbname = CONFIG.results_backend.dbname
    server = CONFIG.results_backend.server

    # Adding an initial start for printing configurations. This should
    # eventually be configured to use a logger. This logic should also
    # eventually be decoupled so we can print debug messages similar to our
    # Python debugging messages.
    LOG.debug(f"Results backend: database name is {'configured' if dbname else 'missing'}.")
    LOG.debug(f"Results backend: server address is {'configured' if server else 'missing'}.")

    if not server:
        msg = f"Results backend: server {server} does not have a configuration"
        raise TypeError(msg)  # TypeError since server is None and not str

    try:
        password = resolve_password(CONFIG.results_backend.password, "Results backend", certs_path=certs_path)
    except (AttributeError, KeyError) as exc:
        raise ValueError("Results backend: No password provided for SQL") from exc

    if mysql_certs is None:
        mysql_certs = MYSQL_CONFIG_FILENAMES

    mysql_config = get_mysql_config(certs_path, mysql_certs)

    if not mysql_config:
        msg = f"""The connection information for MySQL could not be set, cannot find:\n
        {mysql_certs}\ncheck the celery/certs path or set the ssl information in the app.yaml file."""
        raise TypeError(msg)  # TypeError since mysql_config is None when it shouldn't be

    mysql_config["user"] = CONFIG.results_backend.username
    if include_password:
        mysql_config["password"] = password
    else:
        mysql_config["password"] = "******"
    mysql_config["server"] = server

    # Ensure the ssl_key, ssl_ca, and ssl_cert keys are all set
    if mysql_certs == MYSQL_CONFIG_FILENAMES:
        for key, cert_file in mysql_certs.items():
            if key not in mysql_config:
                mysql_config[key] = os.path.join(certs_path, cert_file)

    return MYSQL_CONNECTION_STRING.format(**mysql_config)

get_mysql_config(certs_path, mysql_certs)

Determines whether all required information for connecting to MySQL as the Celery results backend is available, and returns the MySQL SSL configuration or certificate paths.

Parameters:

Name Type Description Default
certs_path str

The path to the directory containing SSL certificates and password files.

required
mysql_certs Dict

A dictionary mapping certificate keys (e.g., 'cert', 'key', 'ca') to their expected filenames.

required

Returns:

Type Description
Dict

A dictionary containing the paths to the required MySQL certificates if they exist.

Source code in merlin/config/results_backend.py
def get_mysql_config(certs_path: str, mysql_certs: Dict) -> Dict:
    """
    Determines whether all required information for connecting to MySQL as the Celery
    results backend is available, and returns the MySQL SSL configuration or certificate paths.

    Args:
        certs_path (str): The path to the directory containing SSL certificates and password files.
        mysql_certs (Dict): A dictionary mapping certificate keys (e.g., 'cert', 'key', 'ca')
            to their expected filenames.

    Returns:
        A dictionary containing the paths to the required MySQL certificates if they exist.
    """
    mysql_ssl = get_ssl_config(celery_check=False)
    if mysql_ssl:
        return mysql_ssl

    if not os.path.exists(certs_path):
        return False
    files = os.listdir(certs_path)

    certs = {}
    for key, filename in mysql_certs.items():
        for f in files:  # pylint: disable=C0103
            if not f == filename:
                continue

            certs[key] = os.path.join(certs_path, f)

    return certs

get_redis(certs_path=None, include_password=True, ssl=False)

Constructs and returns a Redis or Rediss connection URL based on the provided parameters and configuration.

Parameters:

Name Type Description Default
certs_path str

The path to SSL certificates and password files.

None
include_password bool

Whether to include the password in the connection URL. If True, the password will be included; otherwise, it will be masked.

True
ssl bool

Flag indicating whether to use SSL for the connection (Rediss). If True, the connection URL will use the "rediss" protocol; otherwise, it will use "redis".

False

Returns:

Type Description
str

A Redis or Rediss connection URL formatted based on the provided parameters and configuration.

Source code in merlin/config/results_backend.py
def get_redis(certs_path: str = None, include_password: bool = True, ssl: bool = False) -> str:  # noqa C901
    """
    Constructs and returns a Redis or Rediss connection URL based on the provided parameters and configuration.

    Args:
        certs_path (str, optional): The path to SSL certificates and password files.
        include_password (bool, optional): Whether to include the password in the connection URL.
            If True, the password will be included; otherwise, it will be masked.
        ssl (bool, optional): Flag indicating whether to use SSL for the connection (Rediss).
            If True, the connection URL will use the "rediss" protocol; otherwise, it will use "redis".

    Returns:
        A Redis or Rediss connection URL formatted based on the provided parameters and configuration.
    """
    try:
        port = CONFIG.results_backend.port
    except (KeyError, AttributeError):
        port = 6379
        LOG.debug("Results backend: using default Redis port.")

    try:
        db_num = CONFIG.results_backend.db_num
    except (KeyError, AttributeError):
        db_num = 0
        LOG.debug("Results backend: using default Redis database number.")

    try:
        username = CONFIG.results_backend.username
    except (KeyError, AttributeError):
        username = ""

    try:
        password = resolve_password(CONFIG.results_backend.password, "Results backend", certs_path=certs_path)
        if include_password:
            spass = f"{username}:{password}@"
        else:
            spass = f"{username}:******@"
    except (KeyError, AttributeError):
        spass = ""
        LOG.debug("Results backend: no Redis password configured in backend config.")

    urlbase = "rediss" if ssl else "redis"
    server = CONFIG.results_backend.server
    LOG.debug(f"Results backend: Redis server address {'configured' if server else 'not found in config'}.")

    return f"{urlbase}://{spass}{server}:{port}/{db_num}"

get_ssl_config(celery_check=False)

Retrieves the SSL configuration for the results backend based on the settings specified in the app.yaml configuration file.

This function determines whether SSL should be enabled for the results backend and returns the appropriate configuration. It supports various backend types such as MySQL, Redis, and Rediss.

Parameters:

Name Type Description Default
celery_check bool

If True, the function returns the SSL settings specifically for configuring Celery.

False

Returns:

Type Description
bool

The SSL configuration for the results backend. Returns True if SSL is enabled, False otherwise.

Source code in merlin/config/results_backend.py
def get_ssl_config(celery_check: bool = False) -> bool:
    """
    Retrieves the SSL configuration for the results backend based on the settings
    specified in the `app.yaml` configuration file.

    This function determines whether SSL should be enabled for the results backend
    and returns the appropriate configuration. It supports various backend types
    such as MySQL, Redis, and Rediss.

    Args:
        celery_check (bool, optional): If True, the function returns the SSL settings
            specifically for configuring Celery.

    Returns:
        The SSL configuration for the results backend. Returns `True` if SSL is enabled,
            `False` otherwise.
    """
    results_backend = ""
    try:
        results_backend = CONFIG.results_backend.url.split(":")[0]
    except AttributeError:
        # The results_backend may not have a url
        pass

    try:
        results_backend = CONFIG.results_backend.name.lower()
    except AttributeError:
        # The results_backend may not have a name
        pass

    if results_backend not in BACKENDS:
        return False

    try:
        certs_path = CONFIG.celery.certs
    except AttributeError:
        certs_path = None

    results_backend_ssl = get_ssl_entries("Results Backend", results_backend, CONFIG.results_backend, certs_path)

    if results_backend == "rediss":
        if not results_backend_ssl:
            results_backend_ssl = True
        return results_backend_ssl

    if results_backend and "mysql" in results_backend:
        if not celery_check:
            return results_backend_ssl

    return False