Skip to content

run_manager

Module for managing run entities within the Merlin database.

This module defines the RunManager class, which extends the generic EntityManager base class to provide CRUD operations for runs associated with studies, workspaces, and queues. The manager coordinates with the study and logical worker managers for consistent data handling during create and delete operations.

RunManager

Bases: EntityManager[RunEntity, RunModel]

Manager for run entities.

This class handles creation, retrieval, updating, and deletion of runs in the database. It maintains consistency by coordinating with study and logical worker entities during operations such as run creation and deletion.

Attributes:

Name Type Description
backend

The database backend used for storing run entities.

db

Reference to the main Merlin database, allowing access to other entity managers such as studies and logical workers.

_filter_accessor_map Dict[str, Callable[[T], Any]]

A dictionary mapping supported filter keys to accessor functions for the entity type. Used by filtering logic (e.g., in get_all) to dynamically retrieve values from entity instances. Subclasses must override this to enable filtering support.

Methods:

Name Description
create

Create a new run associated with a study and workspace.

get

Retrieve a run by its ID or workspace identifier.

get_all

Retrieve all runs from the database.

delete

Delete a run and perform cleanup of related entities.

delete_all

Delete all runs in the database.

set_db_reference

Set the reference to the main Merlin database for cross-entity operations.

Source code in merlin/db_scripts/entity_managers/run_manager.py
class RunManager(EntityManager[RunEntity, RunModel]):
    """
    Manager for run entities.

    This class handles creation, retrieval, updating, and deletion of runs in the database.
    It maintains consistency by coordinating with study and logical worker entities during
    operations such as run creation and deletion.

    Attributes:
        backend: The database backend used for storing run entities.
        db: Reference to the main Merlin database, allowing access to other entity managers
            such as studies and logical workers.
        _filter_accessor_map: A dictionary mapping supported filter keys to accessor functions
            for the entity type. Used by filtering logic (e.g., in `get_all`) to dynamically
            retrieve values from entity instances. Subclasses must override this to enable
            filtering support.

    Methods:
        create: Create a new run associated with a study and workspace.
        get: Retrieve a run by its ID or workspace identifier.
        get_all: Retrieve all runs from the database.
        delete: Delete a run and perform cleanup of related entities.
        delete_all: Delete all runs in the database.
        set_db_reference: Set the reference to the main Merlin database for cross-entity operations.
    """

    _filter_accessor_map: Dict[str, Callable[[T], Any]] = {
        "study_id": lambda e: e.get_study_id(),
        "status": lambda e: e.get_run_status().value,
        "queues": lambda e: e.get_queues(),
        "workers": lambda e: e.get_workers(),
    }

    def __init__(self, backend: ResultsBackend):
        """
        Initialize the PhysicalWorkerManager with the given backend.

        This sets up the manager to handle physical worker entities by specifying
        the associated entity class and entity type string. These are used by the
        base EntityManager to perform generic operations like retrieving and filtering entities.

        Args:
            backend (ResultsBackend): The backend used to persist and retrieve physical worker data.
        """
        super().__init__(backend)
        self._entity_class = RunEntity
        self._entity_type = "run"

    def create(self, study_name: str, workspace: str, queues: List[str], **kwargs: Any) -> RunEntity:
        """
        Create a new run associated with a study, workspace, and queues.

        This method ensures the study exists (creating it if necessary), then creates
        and saves a new run entity. Additional keyword arguments that correspond to valid
        [`RunModel`][db_scripts.data_models.RunModel] fields are included; other kwargs are
        stored as additional data.

        Args:
            study_name (str): Name of the study this run belongs to.
            workspace (str): Workspace identifier for the run.
            queues (List[str]): List of queues associated with the run.
            **kwargs (Any): Additional optional fields for the run entity.

        Returns:
            The created run entity.
        """
        # Create the study if it doesn't exist
        study_entity = self.db.studies.create(study_name)

        # Filter valid fields for the RunModel
        valid_fields = {f.name for f in RunModel.get_class_fields()}
        valid_kwargs = {key: val for key, val in kwargs.items() if key in valid_fields}
        additional_data = {key: val for key, val in kwargs.items() if key not in valid_fields}

        # Create the RunModel and save it
        new_run = RunModel(
            study_id=study_entity.get_id(),
            workspace=workspace,
            queues=queues,
            **valid_kwargs,
            additional_data=additional_data,
        )
        run_entity = RunEntity(new_run, self.backend)
        run_entity.save()

        # Add the run ID to the study
        study_entity.add_run(run_entity.get_id())

        return run_entity

    def get(self, run_id_or_workspace: str) -> RunEntity:
        """
        Retrieve a run entity by its unique ID or workspace identifier.

        Args:
            run_id_or_workspace (str): The unique identifier or workspace string of the run.

        Returns:
            The run entity corresponding to the given identifier.

        Raises:
            RunNotFoundError: If no run is found matching the identifier.
        """
        return self._get_entity(RunEntity, run_id_or_workspace)

    def delete(self, run_id_or_workspace: str):
        """
        Delete a run entity by its ID or workspace identifier.

        Performs cleanup operations to maintain consistency:\n
        - Removes the run from its associated study.
        - Removes the run from all logical workers referencing it.

        Args:
            run_id_or_workspace (str): The unique identifier or workspace string of the run to delete.

        Raises:
            RunNotFoundError: If no run is found matching the identifier.
        """

        def cleanup_run(run):
            # Remove from study
            try:
                study = self.db.studies.get(run.get_study_id())
                study.remove_run(run.get_id())
            except StudyNotFoundError:
                LOG.warning(f"Couldn't find study with id {run.get_study_id()}. Continuing with run delete.")

            # Remove from logical workers
            for worker_id in run.get_workers():
                try:
                    logical_worker = self.db.logical_workers.get(worker_id=worker_id)
                    logical_worker.remove_run(run.get_id())
                except WorkerNotFoundError:
                    LOG.warning(f"Couldn't find logical worker with id {worker_id}. Continuing with run delete.")

        self._delete_entity(RunEntity, run_id_or_workspace, cleanup_fn=cleanup_run)

    def delete_all(self):
        """
        Delete all run entities from the database.

        This method calls `delete` on each run entity to ensure proper cleanup.
        """
        self._delete_all_by_type(get_all_fn=self.get_all, delete_fn=self.delete, entity_name="runs")

    def set_db_reference(self, db: MerlinDatabase):  # noqa: F821  pylint: disable=undefined-variable
        """
        Set a reference to the main Merlin database object for cross-entity operations.

        This allows the manager to access other entity managers (e.g., logical workers) when
        performing operations like cleanup during deletions.

        Args:
            db (db_scripts.merlin_db.MerlinDatabase): The database object that provides
                access to related entity managers.
        """
        self.db = db

    def get_all(self, filters: Dict[str, Any] = None) -> List[RunEntity]:
        """
        Retrieve all run entities managed by this run entity manager, optionally filtered by attributes.

        Args:
            filters: A dictionary of filter keys and values used to narrow down the query results.
                 Filter keys must correspond to supported filters defined in the ENTITY_REGISTRY
                 for the run entity type. Values are compared against entity attributes or
                 accessor methods (e.g., {"name": "foo"}, {"queues": ["queue1", "queue2"]}).

        Returns:
            A list of all entities of the specified type matching the filters.
        """
        if filters and "status" in filters:
            filters["status"] = filters["status"].upper()
        return super().get_all(filters=filters)

__init__(backend)

Initialize the PhysicalWorkerManager with the given backend.

This sets up the manager to handle physical worker entities by specifying the associated entity class and entity type string. These are used by the base EntityManager to perform generic operations like retrieving and filtering entities.

Parameters:

Name Type Description Default
backend ResultsBackend

The backend used to persist and retrieve physical worker data.

required
Source code in merlin/db_scripts/entity_managers/run_manager.py
def __init__(self, backend: ResultsBackend):
    """
    Initialize the PhysicalWorkerManager with the given backend.

    This sets up the manager to handle physical worker entities by specifying
    the associated entity class and entity type string. These are used by the
    base EntityManager to perform generic operations like retrieving and filtering entities.

    Args:
        backend (ResultsBackend): The backend used to persist and retrieve physical worker data.
    """
    super().__init__(backend)
    self._entity_class = RunEntity
    self._entity_type = "run"

create(study_name, workspace, queues, **kwargs)

Create a new run associated with a study, workspace, and queues.

This method ensures the study exists (creating it if necessary), then creates and saves a new run entity. Additional keyword arguments that correspond to valid RunModel fields are included; other kwargs are stored as additional data.

Parameters:

Name Type Description Default
study_name str

Name of the study this run belongs to.

required
workspace str

Workspace identifier for the run.

required
queues List[str]

List of queues associated with the run.

required
**kwargs Any

Additional optional fields for the run entity.

{}

Returns:

Type Description
RunEntity

The created run entity.

Source code in merlin/db_scripts/entity_managers/run_manager.py
def create(self, study_name: str, workspace: str, queues: List[str], **kwargs: Any) -> RunEntity:
    """
    Create a new run associated with a study, workspace, and queues.

    This method ensures the study exists (creating it if necessary), then creates
    and saves a new run entity. Additional keyword arguments that correspond to valid
    [`RunModel`][db_scripts.data_models.RunModel] fields are included; other kwargs are
    stored as additional data.

    Args:
        study_name (str): Name of the study this run belongs to.
        workspace (str): Workspace identifier for the run.
        queues (List[str]): List of queues associated with the run.
        **kwargs (Any): Additional optional fields for the run entity.

    Returns:
        The created run entity.
    """
    # Create the study if it doesn't exist
    study_entity = self.db.studies.create(study_name)

    # Filter valid fields for the RunModel
    valid_fields = {f.name for f in RunModel.get_class_fields()}
    valid_kwargs = {key: val for key, val in kwargs.items() if key in valid_fields}
    additional_data = {key: val for key, val in kwargs.items() if key not in valid_fields}

    # Create the RunModel and save it
    new_run = RunModel(
        study_id=study_entity.get_id(),
        workspace=workspace,
        queues=queues,
        **valid_kwargs,
        additional_data=additional_data,
    )
    run_entity = RunEntity(new_run, self.backend)
    run_entity.save()

    # Add the run ID to the study
    study_entity.add_run(run_entity.get_id())

    return run_entity

delete(run_id_or_workspace)

Delete a run entity by its ID or workspace identifier.

Performs cleanup operations to maintain consistency:

  • Removes the run from its associated study.
  • Removes the run from all logical workers referencing it.

Parameters:

Name Type Description Default
run_id_or_workspace str

The unique identifier or workspace string of the run to delete.

required

Raises:

Type Description
RunNotFoundError

If no run is found matching the identifier.

Source code in merlin/db_scripts/entity_managers/run_manager.py
def delete(self, run_id_or_workspace: str):
    """
    Delete a run entity by its ID or workspace identifier.

    Performs cleanup operations to maintain consistency:\n
    - Removes the run from its associated study.
    - Removes the run from all logical workers referencing it.

    Args:
        run_id_or_workspace (str): The unique identifier or workspace string of the run to delete.

    Raises:
        RunNotFoundError: If no run is found matching the identifier.
    """

    def cleanup_run(run):
        # Remove from study
        try:
            study = self.db.studies.get(run.get_study_id())
            study.remove_run(run.get_id())
        except StudyNotFoundError:
            LOG.warning(f"Couldn't find study with id {run.get_study_id()}. Continuing with run delete.")

        # Remove from logical workers
        for worker_id in run.get_workers():
            try:
                logical_worker = self.db.logical_workers.get(worker_id=worker_id)
                logical_worker.remove_run(run.get_id())
            except WorkerNotFoundError:
                LOG.warning(f"Couldn't find logical worker with id {worker_id}. Continuing with run delete.")

    self._delete_entity(RunEntity, run_id_or_workspace, cleanup_fn=cleanup_run)

delete_all()

Delete all run entities from the database.

This method calls delete on each run entity to ensure proper cleanup.

Source code in merlin/db_scripts/entity_managers/run_manager.py
def delete_all(self):
    """
    Delete all run entities from the database.

    This method calls `delete` on each run entity to ensure proper cleanup.
    """
    self._delete_all_by_type(get_all_fn=self.get_all, delete_fn=self.delete, entity_name="runs")

get(run_id_or_workspace)

Retrieve a run entity by its unique ID or workspace identifier.

Parameters:

Name Type Description Default
run_id_or_workspace str

The unique identifier or workspace string of the run.

required

Returns:

Type Description
RunEntity

The run entity corresponding to the given identifier.

Raises:

Type Description
RunNotFoundError

If no run is found matching the identifier.

Source code in merlin/db_scripts/entity_managers/run_manager.py
def get(self, run_id_or_workspace: str) -> RunEntity:
    """
    Retrieve a run entity by its unique ID or workspace identifier.

    Args:
        run_id_or_workspace (str): The unique identifier or workspace string of the run.

    Returns:
        The run entity corresponding to the given identifier.

    Raises:
        RunNotFoundError: If no run is found matching the identifier.
    """
    return self._get_entity(RunEntity, run_id_or_workspace)

get_all(filters=None)

Retrieve all run entities managed by this run entity manager, optionally filtered by attributes.

Parameters:

Name Type Description Default
filters Dict[str, Any]

A dictionary of filter keys and values used to narrow down the query results. Filter keys must correspond to supported filters defined in the ENTITY_REGISTRY for the run entity type. Values are compared against entity attributes or accessor methods (e.g., {"name": "foo"}, {"queues": ["queue1", "queue2"]}).

None

Returns:

Type Description
List[RunEntity]

A list of all entities of the specified type matching the filters.

Source code in merlin/db_scripts/entity_managers/run_manager.py
def get_all(self, filters: Dict[str, Any] = None) -> List[RunEntity]:
    """
    Retrieve all run entities managed by this run entity manager, optionally filtered by attributes.

    Args:
        filters: A dictionary of filter keys and values used to narrow down the query results.
             Filter keys must correspond to supported filters defined in the ENTITY_REGISTRY
             for the run entity type. Values are compared against entity attributes or
             accessor methods (e.g., {"name": "foo"}, {"queues": ["queue1", "queue2"]}).

    Returns:
        A list of all entities of the specified type matching the filters.
    """
    if filters and "status" in filters:
        filters["status"] = filters["status"].upper()
    return super().get_all(filters=filters)

set_db_reference(db)

Set a reference to the main Merlin database object for cross-entity operations.

This allows the manager to access other entity managers (e.g., logical workers) when performing operations like cleanup during deletions.

Parameters:

Name Type Description Default
db MerlinDatabase

The database object that provides access to related entity managers.

required
Source code in merlin/db_scripts/entity_managers/run_manager.py
def set_db_reference(self, db: MerlinDatabase):  # noqa: F821  pylint: disable=undefined-variable
    """
    Set a reference to the main Merlin database object for cross-entity operations.

    This allows the manager to access other entity managers (e.g., logical workers) when
    performing operations like cleanup during deletions.

    Args:
        db (db_scripts.merlin_db.MerlinDatabase): The database object that provides
            access to related entity managers.
    """
    self.db = db