Skip to content

merlin_db

This module contains the functionality necessary to interact with everything stored in Merlin's database.

MerlinDatabase

High-level interface for accessing Merlin database entities.

This class provides a unified interface to all entity managers in Merlin.

Attributes:

Name Type Description
backend ResultsBackend

A ResultsBackend instance.

logical_workers LogicalWorkerManager

A LogicalWorkerManager instance.

physical_workers PhysicalWorkerManager

A PhysicalWorkerManager instance.

runs RunManager

A RunManager instance.

studies StudyManager

A StudyManager instance.

Methods:

Name Description
get_db_type

Retrieve the type of the backend being used (e.g., Redis, SQL).

get_db_version

Retrieve the version of the backend.

get_connection_string

Retrieve the backend connection string.

create

Create a new entity of the specified type.

get

Get an entity by type and identifier.

get_all

Get all entities of a specific type.

delete

Delete an entity by type and identifier.

delete_all

Delete all entities of a specific type.

get_everything

Get all entities from all entity managers.

delete_everything

Delete all entities from all entity managers.

Source code in merlin/db_scripts/merlin_db.py
class MerlinDatabase:
    """
    High-level interface for accessing Merlin database entities.

    This class provides a unified interface to all entity managers in Merlin.

    Attributes:
        backend (backends.results_backend.ResultsBackend): A `ResultsBackend` instance.
        logical_workers (db_scripts.entity_managers.logical_worker_manager.LogicalWorkerManager):
            A `LogicalWorkerManager` instance.
        physical_workers (db_scripts.entity_managers.physical_worker_manager.PhysicalWorkerManager):
            A `PhysicalWorkerManager` instance.
        runs (db_scripts.entity_managers.run_manager.RunManager): A `RunManager` instance.
        studies (db_scripts.entity_managers.study_manager.StudyManager): A `StudyManager` instance.

    Methods:
        get_db_type: Retrieve the type of the backend being used (e.g., Redis, SQL).
        get_db_version: Retrieve the version of the backend.
        get_connection_string: Retrieve the backend connection string.
        create: Create a new entity of the specified type.
        get: Get an entity by type and identifier.
        get_all:  Get all entities of a specific type.
        delete: Delete an entity by type and identifier.
        delete_all: Delete all entities of a specific type.
        get_everything: Get all entities from all entity managers.
        delete_everything: Delete all entities from all entity managers.
    """

    def __init__(self):
        """
        Initialize a new MerlinDatabase instance.
        """
        from merlin.config.configfile import CONFIG  # pylint: disable=import-outside-toplevel

        self.backend: ResultsBackend = backend_factory.create(CONFIG.results_backend.name.lower())
        self._entity_managers: Dict[str, EntityManager] = {
            "study": StudyManager(self.backend),
            "run": RunManager(self.backend),
            "logical_worker": LogicalWorkerManager(self.backend),
            "physical_worker": PhysicalWorkerManager(self.backend),
        }

        # Set up cross-references for managers that need them
        for manager in self._entity_managers.values():
            if hasattr(manager, "set_db_reference"):
                manager.set_db_reference(self)

    # Provide direct access to entity managers for convenience
    @property
    def studies(self) -> StudyManager:
        """
        Get the study manager.

        Returns:
            A [`StudyManager`][db_scripts.entity_managers.study_manager.StudyManager]
                instance.
        """
        return self._entity_managers["study"]

    @property
    def runs(self) -> RunManager:
        """
        Get the run manager.

        Returns:
            A [`RunManager`][db_scripts.entity_managers.run_manager.RunManager]
                instance.
        """
        return self._entity_managers["run"]

    @property
    def logical_workers(self) -> LogicalWorkerManager:
        """
        Get the logical worker manager.

        Returns:
            A [`LogicalWorkerManager`][db_scripts.entity_managers.logical_worker_manager.LogicalWorkerManager]
                instance.
        """
        return self._entity_managers["logical_worker"]

    @property
    def physical_workers(self) -> PhysicalWorkerManager:
        """
        Get the physical worker manager.

        Returns:
            A [`PhysicalWorkerManager`][db_scripts.entity_managers.physical_worker_manager.PhysicalWorkerManager]
                instance.
        """
        return self._entity_managers["physical_worker"]

    def get_db_type(self) -> str:
        """
        Retrieve the type of backend.

        Returns:
            The type of backend (e.g. redis, sql, etc.).
        """
        return self.backend.get_name()

    def get_db_version(self) -> str:
        """
        Get the version of the backend.

        Returns:
            The version number of the backend.
        """
        return self.backend.get_version()

    def get_connection_string(self) -> str:
        """
        Get the connection string to the backend.

        Returns:
            The connection string to the backend.
        """
        return self.backend.get_connection_string()

    def _validate_entity_type(self, entity_type: str):
        """
        Check to make sure the entity type passed in is supported.

        Args:
            entity_type: The type of entity to validate (study, run, logical_worker, physical_worker).
        """
        if entity_type not in self._entity_managers:
            raise EntityManagerNotSupportedError(f"Entity type not supported: {entity_type}")

    def create(self, entity_type: str, *args, **kwargs) -> Any:
        """
        Create a new entity of the specified type.

        Args:
            entity_type: The type of entity to create (study, run, logical_worker, physical_worker).

        Returns:
            The created entity.

        Raises:
            EntityManagerNotSupportedError: If the entity type is not supported.
        """
        self._validate_entity_type(entity_type)
        return self._entity_managers[entity_type].create(*args, **kwargs)

    def get(self, entity_type: str, *args, **kwargs) -> Any:
        """
        Get an entity by type and identifier.

        Args:
            entity_type: The type of entity to get (study, run, logical_worker, physical_worker).

        Returns:
            The requested entity.

        Raises:
            EntityManagerNotSupportedError: If the entity type is not supported.
        """
        self._validate_entity_type(entity_type)
        return self._entity_managers[entity_type].get(*args, **kwargs)

    def get_all(self, entity_type: str, filters: Dict = None) -> List[Any]:
        """
        Get all entities of a specific type, optionally filtering results.

        Args:
            entity_type: The type of entities to get (study, run, logical_worker, physical_worker).
            filters: A dictionary of filter keys and values used to narrow down the query results.
                Filter keys must correspond to supported filters defined in the ENTITY_REGISTRY
                for the given entity type. Values are compared against entity attributes or
                accessor methods (e.g., {"name": "foo"}, {"queues": ["queue1", "queue2"]}).

        Returns:
            A list of all entities of the specified type matching the filters.

        Raises:
            EntityManagerNotSupportedError: If the entity type is not supported.
        """
        self._validate_entity_type(entity_type)
        return self._entity_managers[entity_type].get_all(filters)

    def delete(self, entity_type: str, *args, **kwargs) -> None:
        """
        Delete an entity by type and identifier.

        Args:
            entity_type: The type of entity to delete (study, run, logical_worker, physical_worker).

        Raises:
            EntityManagerNotSupportedError: If the entity type is not supported.
        """
        self._validate_entity_type(entity_type)
        self._entity_managers[entity_type].delete(*args, **kwargs)

    def delete_all(self, entity_type: str, **kwargs) -> None:
        """
        Delete all entities of a specific type.

        Args:
            entity_type: The type of entities to delete (study, run, logical_worker, physical_worker).

        Raises:
            EntityManagerNotSupportedError: If the entity type is not supported.
        """
        self._validate_entity_type(entity_type)
        self._entity_managers[entity_type].delete_all(**kwargs)

    def get_everything(self) -> List[Any]:
        """
        Get all entities from all entity managers.

        Returns:
            A dictionary mapping entity types to lists of entities.
        """
        result = []
        for manager in self._entity_managers.values():
            result.extend(manager.get_all())
        return result

    def delete_everything(self, force: bool = False) -> None:
        """
        Delete all entities from all entity managers.

        This method deletes studies last to ensure proper cleanup of dependencies.
        """
        flush_database = False
        if force:
            flush_database = True
        else:
            # Ask the user for confirmation
            valid_inputs = ["y", "n"]
            user_input = input("Are you sure you want to flush the entire database? (y/n): ").strip().lower()
            while user_input not in valid_inputs:
                user_input = input("Invalid input. Use 'y' for 'yes' or 'n' for 'no': ").strip().lower()

            if user_input == "y":
                flush_database = True

        if flush_database:
            LOG.info("Flushing the database...")
            self.backend.flush_database()
            LOG.info("Database successfully flushed.")
        else:
            LOG.info("Database flush cancelled.")

    def _fetch_info_data(self, max_preview: int):
        """ """
        display_config = {
            "study": {"ID": "get_id", "Name": "get_name"},
            "run": {"ID": "get_id", "Workspace": "get_workspace"},
            "logical_worker": {"ID": "get_id", "Name": "get_name", "Queues": "get_queues"},
            "physical_worker": {"ID": "get_id", "Name": "get_name"},
        }

        entity_summaries = {}

        for entity_type, manager in self._entity_managers.items():
            all_entities = manager.get_all()
            preview_config = display_config.get(entity_type, {})
            preview_data = []

            for entity in all_entities[:max_preview]:
                preview = {}
                for label, method_name in preview_config.items():
                    value = "<unknown>"
                    method = getattr(entity, method_name, None)
                    if callable(method):
                        value = method()
                    else:
                        LOG.warning(f"Method '{method}' is not callable.")
                    preview[label] = value
                preview_data.append(preview)

            entity_summaries[entity_type] = {
                "total": len(all_entities),
                "preview": preview_data,
                "fields": list(preview_config.keys()),
            }

        return entity_summaries

    def _display_info_data(self, entity_summaries: Dict):
        """ """
        # Display general information
        print("Merlin Database Information")
        print("---------------------------")
        print("General Information:")
        print(f"- Database Type: {self.get_db_type()}")
        print(f"- Database Version: {self.get_db_version()}")
        print(f"- Connection String: {self.get_connection_string()}\n")

        # Display entity-specific information
        for entity_type, summary in entity_summaries.items():
            title_words = entity_type.replace("_", " ").title().split()
            title_words[-1] = pluralize(title_words[-1])
            title = " ".join(title_words)
            print(f"{title}:")
            print(f"- Total: {summary['total']}")

            if summary["total"] > 0 and summary["fields"]:
                print(f"- Recent {title}:")
                for i, preview in enumerate(summary["preview"], start=1):
                    detail_str = ", ".join(
                        f"{field.title()}: {preview.get(field, '<unknown>')}" for field in summary["fields"]
                    )
                    print(f"    {i}. {detail_str}")
                remaining = summary["total"] - len(summary["preview"])
                if remaining > 0:
                    print(f"    (and {remaining} more {entity_type}s)")
            print()

    def info(self, max_preview: int = 3):
        """
        Print summarized information about the database contents.

        Args:
            max_preview: Number of recent entries to preview per entity type.
        """
        # Step 1: Fetch all data first to avoid log statements cluttering output
        entity_summaries = self._fetch_info_data(max_preview)

        # Step 2: Print everything after fetching
        self._display_info_data(entity_summaries)

logical_workers property

Get the logical worker manager.

Returns:

Type Description
LogicalWorkerManager

A LogicalWorkerManager instance.

physical_workers property

Get the physical worker manager.

Returns:

Type Description
PhysicalWorkerManager

A PhysicalWorkerManager instance.

runs property

Get the run manager.

Returns:

Type Description
RunManager

A RunManager instance.

studies property

Get the study manager.

Returns:

Type Description
StudyManager

A StudyManager instance.

__init__()

Initialize a new MerlinDatabase instance.

Source code in merlin/db_scripts/merlin_db.py
def __init__(self):
    """
    Initialize a new MerlinDatabase instance.
    """
    from merlin.config.configfile import CONFIG  # pylint: disable=import-outside-toplevel

    self.backend: ResultsBackend = backend_factory.create(CONFIG.results_backend.name.lower())
    self._entity_managers: Dict[str, EntityManager] = {
        "study": StudyManager(self.backend),
        "run": RunManager(self.backend),
        "logical_worker": LogicalWorkerManager(self.backend),
        "physical_worker": PhysicalWorkerManager(self.backend),
    }

    # Set up cross-references for managers that need them
    for manager in self._entity_managers.values():
        if hasattr(manager, "set_db_reference"):
            manager.set_db_reference(self)

create(entity_type, *args, **kwargs)

Create a new entity of the specified type.

Parameters:

Name Type Description Default
entity_type str

The type of entity to create (study, run, logical_worker, physical_worker).

required

Returns:

Type Description
Any

The created entity.

Raises:

Type Description
EntityManagerNotSupportedError

If the entity type is not supported.

Source code in merlin/db_scripts/merlin_db.py
def create(self, entity_type: str, *args, **kwargs) -> Any:
    """
    Create a new entity of the specified type.

    Args:
        entity_type: The type of entity to create (study, run, logical_worker, physical_worker).

    Returns:
        The created entity.

    Raises:
        EntityManagerNotSupportedError: If the entity type is not supported.
    """
    self._validate_entity_type(entity_type)
    return self._entity_managers[entity_type].create(*args, **kwargs)

delete(entity_type, *args, **kwargs)

Delete an entity by type and identifier.

Parameters:

Name Type Description Default
entity_type str

The type of entity to delete (study, run, logical_worker, physical_worker).

required

Raises:

Type Description
EntityManagerNotSupportedError

If the entity type is not supported.

Source code in merlin/db_scripts/merlin_db.py
def delete(self, entity_type: str, *args, **kwargs) -> None:
    """
    Delete an entity by type and identifier.

    Args:
        entity_type: The type of entity to delete (study, run, logical_worker, physical_worker).

    Raises:
        EntityManagerNotSupportedError: If the entity type is not supported.
    """
    self._validate_entity_type(entity_type)
    self._entity_managers[entity_type].delete(*args, **kwargs)

delete_all(entity_type, **kwargs)

Delete all entities of a specific type.

Parameters:

Name Type Description Default
entity_type str

The type of entities to delete (study, run, logical_worker, physical_worker).

required

Raises:

Type Description
EntityManagerNotSupportedError

If the entity type is not supported.

Source code in merlin/db_scripts/merlin_db.py
def delete_all(self, entity_type: str, **kwargs) -> None:
    """
    Delete all entities of a specific type.

    Args:
        entity_type: The type of entities to delete (study, run, logical_worker, physical_worker).

    Raises:
        EntityManagerNotSupportedError: If the entity type is not supported.
    """
    self._validate_entity_type(entity_type)
    self._entity_managers[entity_type].delete_all(**kwargs)

delete_everything(force=False)

Delete all entities from all entity managers.

This method deletes studies last to ensure proper cleanup of dependencies.

Source code in merlin/db_scripts/merlin_db.py
def delete_everything(self, force: bool = False) -> None:
    """
    Delete all entities from all entity managers.

    This method deletes studies last to ensure proper cleanup of dependencies.
    """
    flush_database = False
    if force:
        flush_database = True
    else:
        # Ask the user for confirmation
        valid_inputs = ["y", "n"]
        user_input = input("Are you sure you want to flush the entire database? (y/n): ").strip().lower()
        while user_input not in valid_inputs:
            user_input = input("Invalid input. Use 'y' for 'yes' or 'n' for 'no': ").strip().lower()

        if user_input == "y":
            flush_database = True

    if flush_database:
        LOG.info("Flushing the database...")
        self.backend.flush_database()
        LOG.info("Database successfully flushed.")
    else:
        LOG.info("Database flush cancelled.")

get(entity_type, *args, **kwargs)

Get an entity by type and identifier.

Parameters:

Name Type Description Default
entity_type str

The type of entity to get (study, run, logical_worker, physical_worker).

required

Returns:

Type Description
Any

The requested entity.

Raises:

Type Description
EntityManagerNotSupportedError

If the entity type is not supported.

Source code in merlin/db_scripts/merlin_db.py
def get(self, entity_type: str, *args, **kwargs) -> Any:
    """
    Get an entity by type and identifier.

    Args:
        entity_type: The type of entity to get (study, run, logical_worker, physical_worker).

    Returns:
        The requested entity.

    Raises:
        EntityManagerNotSupportedError: If the entity type is not supported.
    """
    self._validate_entity_type(entity_type)
    return self._entity_managers[entity_type].get(*args, **kwargs)

get_all(entity_type, filters=None)

Get all entities of a specific type, optionally filtering results.

Parameters:

Name Type Description Default
entity_type str

The type of entities to get (study, run, logical_worker, physical_worker).

required
filters Dict

A dictionary of filter keys and values used to narrow down the query results. Filter keys must correspond to supported filters defined in the ENTITY_REGISTRY for the given entity type. Values are compared against entity attributes or accessor methods (e.g., {"name": "foo"}, {"queues": ["queue1", "queue2"]}).

None

Returns:

Type Description
List[Any]

A list of all entities of the specified type matching the filters.

Raises:

Type Description
EntityManagerNotSupportedError

If the entity type is not supported.

Source code in merlin/db_scripts/merlin_db.py
def get_all(self, entity_type: str, filters: Dict = None) -> List[Any]:
    """
    Get all entities of a specific type, optionally filtering results.

    Args:
        entity_type: The type of entities to get (study, run, logical_worker, physical_worker).
        filters: A dictionary of filter keys and values used to narrow down the query results.
            Filter keys must correspond to supported filters defined in the ENTITY_REGISTRY
            for the given entity type. Values are compared against entity attributes or
            accessor methods (e.g., {"name": "foo"}, {"queues": ["queue1", "queue2"]}).

    Returns:
        A list of all entities of the specified type matching the filters.

    Raises:
        EntityManagerNotSupportedError: If the entity type is not supported.
    """
    self._validate_entity_type(entity_type)
    return self._entity_managers[entity_type].get_all(filters)

get_connection_string()

Get the connection string to the backend.

Returns:

Type Description
str

The connection string to the backend.

Source code in merlin/db_scripts/merlin_db.py
def get_connection_string(self) -> str:
    """
    Get the connection string to the backend.

    Returns:
        The connection string to the backend.
    """
    return self.backend.get_connection_string()

get_db_type()

Retrieve the type of backend.

Returns:

Type Description
str

The type of backend (e.g. redis, sql, etc.).

Source code in merlin/db_scripts/merlin_db.py
def get_db_type(self) -> str:
    """
    Retrieve the type of backend.

    Returns:
        The type of backend (e.g. redis, sql, etc.).
    """
    return self.backend.get_name()

get_db_version()

Get the version of the backend.

Returns:

Type Description
str

The version number of the backend.

Source code in merlin/db_scripts/merlin_db.py
def get_db_version(self) -> str:
    """
    Get the version of the backend.

    Returns:
        The version number of the backend.
    """
    return self.backend.get_version()

get_everything()

Get all entities from all entity managers.

Returns:

Type Description
List[Any]

A dictionary mapping entity types to lists of entities.

Source code in merlin/db_scripts/merlin_db.py
def get_everything(self) -> List[Any]:
    """
    Get all entities from all entity managers.

    Returns:
        A dictionary mapping entity types to lists of entities.
    """
    result = []
    for manager in self._entity_managers.values():
        result.extend(manager.get_all())
    return result

info(max_preview=3)

Print summarized information about the database contents.

Parameters:

Name Type Description Default
max_preview int

Number of recent entries to preview per entity type.

3
Source code in merlin/db_scripts/merlin_db.py
def info(self, max_preview: int = 3):
    """
    Print summarized information about the database contents.

    Args:
        max_preview: Number of recent entries to preview per entity type.
    """
    # Step 1: Fetch all data first to avoid log statements cluttering output
    entity_summaries = self._fetch_info_data(max_preview)

    # Step 2: Print everything after fetching
    self._display_info_data(entity_summaries)