orchestrator.Orchestrator

class orchestrator.Orchestrator(model_registry=None, control_system=None, state_manager=None, yaml_compiler=None, error_handler=None, resource_allocator=None, parallel_executor=None, max_concurrent_tasks=10)[source]

Bases: object

Main orchestration engine.

Coordinates the execution of pipelines by managing tasks, dependencies, and control systems.

__init__(model_registry=None, control_system=None, state_manager=None, yaml_compiler=None, error_handler=None, resource_allocator=None, parallel_executor=None, max_concurrent_tasks=10)[source]

Initialize orchestrator.

Parameters:
  • model_registry (Optional[ModelRegistry]) – Model registry for model selection

  • control_system (Optional[ControlSystem]) – Control system for task execution

  • state_manager (Optional[StateManager]) – State manager for checkpointing

  • yaml_compiler (Optional[YAMLCompiler]) – YAML compiler for pipeline parsing

  • error_handler (Optional[ErrorHandler]) – Error handler for fault tolerance

  • resource_allocator (Optional[ResourceAllocator]) – Resource allocator for task scheduling

  • parallel_executor (Optional[ParallelExecutor]) – Parallel executor for concurrent execution

  • max_concurrent_tasks (int) – Maximum concurrent tasks

Methods

__init__([model_registry, control_system, ...])

Initialize orchestrator.

clear_execution_history()

Clear execution history.

execute_pipeline(pipeline[, ...])

Execute a pipeline.

execute_yaml(yaml_content[, context])

Execute pipeline from YAML content.

execute_yaml_file(yaml_file[, context])

Execute pipeline from YAML file.

get_execution_history([limit])

Get execution history.

get_execution_status(execution_id)

Get execution status.

get_performance_metrics()

Get performance metrics for the orchestrator.

health_check()

Perform health check on all components.

list_running_pipelines()

List all running pipeline execution IDs.

recover_pipeline(execution_id[, from_checkpoint])

Recover a failed pipeline from checkpoint.

shutdown()

Shutdown orchestrator and clean up resources.

__init__(model_registry=None, control_system=None, state_manager=None, yaml_compiler=None, error_handler=None, resource_allocator=None, parallel_executor=None, max_concurrent_tasks=10)[source]

Initialize orchestrator.

Parameters:
  • model_registry (Optional[ModelRegistry]) – Model registry for model selection

  • control_system (Optional[ControlSystem]) – Control system for task execution

  • state_manager (Optional[StateManager]) – State manager for checkpointing

  • yaml_compiler (Optional[YAMLCompiler]) – YAML compiler for pipeline parsing

  • error_handler (Optional[ErrorHandler]) – Error handler for fault tolerance

  • resource_allocator (Optional[ResourceAllocator]) – Resource allocator for task scheduling

  • parallel_executor (Optional[ParallelExecutor]) – Parallel executor for concurrent execution

  • max_concurrent_tasks (int) – Maximum concurrent tasks

async execute_pipeline(pipeline, checkpoint_enabled=True, max_retries=3)[source]

Execute a pipeline.

Parameters:
  • pipeline (Pipeline) – Pipeline to execute

  • checkpoint_enabled (bool) – Whether to enable checkpointing

  • max_retries (int) – Maximum number of retries for failed tasks

Return type:

Dict[str, Any]

Returns:

Execution results

Raises:

ExecutionError – If execution fails

async execute_yaml(yaml_content, context=None, **kwargs)[source]

Execute pipeline from YAML content.

Parameters:
  • yaml_content (str) – YAML pipeline definition

  • context (Optional[Dict[str, Any]]) – Template context variables

  • **kwargs (Any) – Additional execution parameters

Return type:

Dict[str, Any]

Returns:

Execution results

async execute_yaml_file(yaml_file, context=None, **kwargs)[source]

Execute pipeline from YAML file.

Parameters:
  • yaml_file (str) – Path to YAML file

  • context (Optional[Dict[str, Any]]) – Template context variables

  • **kwargs (Any) – Additional execution parameters

Return type:

Dict[str, Any]

Returns:

Execution results

async recover_pipeline(execution_id, from_checkpoint=None)[source]

Recover a failed pipeline from checkpoint.

Parameters:
  • execution_id (str) – Execution ID to recover

  • from_checkpoint (Optional[str]) – Specific checkpoint to recover from

Return type:

Dict[str, Any]

Returns:

Recovery results

get_execution_status(execution_id)[source]

Get execution status.

Parameters:

execution_id (str) – Execution ID

Return type:

Dict[str, Any]

Returns:

Status information

list_running_pipelines()[source]

List all running pipeline execution IDs.

Return type:

List[str]

get_execution_history(limit=100)[source]

Get execution history.

Parameters:

limit (int) – Maximum number of records to return

Return type:

List[Dict[str, Any]]

Returns:

List of execution records

clear_execution_history()[source]

Clear execution history.

Return type:

None

async get_performance_metrics()[source]

Get performance metrics for the orchestrator.

Return type:

Dict[str, Any]

async health_check()[source]

Perform health check on all components.

Return type:

Dict[str, Any]

async shutdown()[source]

Shutdown orchestrator and clean up resources.

Return type:

None

__repr__()[source]

String representation of orchestrator.

Return type:

str