CLI module for querying active Merlin task server workers.
This module defines the QueryWorkersCommand class, which implements the
query-workers subcommand for the Merlin CLI. The command allows users to
inspect the state of connected workers on a task server (e.g., Celery),
optionally filtering by queues or worker names.
QueryWorkersCommand
Bases: CommandEntryPoint
Handles query-workers CLI command for querying information about Merlin workers.
Methods:
| Name |
Description |
add_parser |
Adds the query-workers command to the CLI parser.
|
process_command |
Processes the CLI input and dispatches the appropriate action.
|
Source code in merlin/cli/commands/query_workers.py
| class QueryWorkersCommand(CommandEntryPoint):
"""
Handles `query-workers` CLI command for querying information about Merlin workers.
Methods:
add_parser: Adds the `query-workers` command to the CLI parser.
process_command: Processes the CLI input and dispatches the appropriate action.
"""
def add_parser(self, subparsers: ArgumentParser):
"""
Add the `query-workers` command parser to the CLI argument parser.
Parameters:
subparsers (ArgumentParser): The subparsers object to which the `query-workers` command parser will be added.
"""
query: ArgumentParser = subparsers.add_parser("query-workers", help="List connected task server workers.")
query.set_defaults(func=self.process_command)
query.add_argument(
"--task_server",
type=str,
default="celery",
help="Task server type from which to query workers.\
Default: %(default)s",
)
query.add_argument(
"--spec",
type=str,
default=None,
help="Path to a Merlin YAML spec file from which to read worker names to query.",
)
query.add_argument("--queues", type=str, default=None, nargs="+", help="Specific queues to query workers from.")
query.add_argument(
"--workers",
type=str,
action="store",
nargs="+",
default=None,
help="Regex match for specific workers to query.",
)
def process_command(self, args: Namespace):
"""
CLI command for finding all workers.
This function retrieves and queries the names of any active workers.
If the `--spec` argument is included, only query the workers defined in the spec file.
Args:
args: Parsed command-line arguments, which may include:\n
- `spec`: Path to the specification file.
- `task_server`: Address of the task server to query.
- `queues`: List of queue names to filter workers.
- `workers`: List of specific worker names to query.
"""
print(banner_small)
# Get the workers from the spec file if --spec provided
worker_names = []
if args.spec:
spec_path = verify_filepath(args.spec)
spec = MerlinSpec.load_specification(spec_path)
worker_names = spec.get_worker_names()
for worker_name in worker_names:
if "$" in worker_name:
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")
LOG.debug(f"Searching for the following workers to stop based on the spec {args.spec}: {worker_names}")
query_workers(args.task_server, worker_names, args.queues, args.workers)
|
add_parser(subparsers)
Add the query-workers command parser to the CLI argument parser.
Parameters:
| Name |
Type |
Description |
Default |
subparsers
|
ArgumentParser
|
The subparsers object to which the query-workers command parser will be added.
|
required
|
Source code in merlin/cli/commands/query_workers.py
| def add_parser(self, subparsers: ArgumentParser):
"""
Add the `query-workers` command parser to the CLI argument parser.
Parameters:
subparsers (ArgumentParser): The subparsers object to which the `query-workers` command parser will be added.
"""
query: ArgumentParser = subparsers.add_parser("query-workers", help="List connected task server workers.")
query.set_defaults(func=self.process_command)
query.add_argument(
"--task_server",
type=str,
default="celery",
help="Task server type from which to query workers.\
Default: %(default)s",
)
query.add_argument(
"--spec",
type=str,
default=None,
help="Path to a Merlin YAML spec file from which to read worker names to query.",
)
query.add_argument("--queues", type=str, default=None, nargs="+", help="Specific queues to query workers from.")
query.add_argument(
"--workers",
type=str,
action="store",
nargs="+",
default=None,
help="Regex match for specific workers to query.",
)
|
process_command(args)
CLI command for finding all workers.
This function retrieves and queries the names of any active workers.
If the --spec argument is included, only query the workers defined in the spec file.
Parameters:
| Name |
Type |
Description |
Default |
args
|
Namespace
|
Parsed command-line arguments, which may include:
spec: Path to the specification file.
task_server: Address of the task server to query.
queues: List of queue names to filter workers.
workers: List of specific worker names to query.
|
required
|
Source code in merlin/cli/commands/query_workers.py
| def process_command(self, args: Namespace):
"""
CLI command for finding all workers.
This function retrieves and queries the names of any active workers.
If the `--spec` argument is included, only query the workers defined in the spec file.
Args:
args: Parsed command-line arguments, which may include:\n
- `spec`: Path to the specification file.
- `task_server`: Address of the task server to query.
- `queues`: List of queue names to filter workers.
- `workers`: List of specific worker names to query.
"""
print(banner_small)
# Get the workers from the spec file if --spec provided
worker_names = []
if args.spec:
spec_path = verify_filepath(args.spec)
spec = MerlinSpec.load_specification(spec_path)
worker_names = spec.get_worker_names()
for worker_name in worker_names:
if "$" in worker_name:
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")
LOG.debug(f"Searching for the following workers to stop based on the spec {args.spec}: {worker_names}")
query_workers(args.task_server, worker_names, args.queues, args.workers)
|