API Reference
Complete API documentation for the Orchestrator framework.
Overview
The Orchestrator API is organized into several key modules:
Core Module (core)
The foundation of the framework, containing:
Task: Individual units of work
Pipeline: Collections of tasks with dependencies
ControlSystem: Pluggable execution backends
Model: AI model abstraction
Compiler Module (compiler)
YAML pipeline compilation and processing:
YAMLCompiler: Main compilation engine
AmbiguityResolver: AUTO tag resolution
SchemaValidator: Pipeline validation
TemplateEngine: Jinja2 template processing
Models Module (models)
AI model management and selection:
ModelRegistry: Central model repository
ModelSelector: Intelligent model selection
ModelCapabilities: Model capability definitions
ResourceManager: Resource allocation and monitoring
Tools Module (tools)
Tool system and integrations:
ToolRegistry: Tool management
MCPServer: Model Context Protocol integration
WebTools: Internet interaction tools
SystemTools: File and command execution
DataTools: Data processing and validation
Orchestrator Module (orchestrator)
Main orchestration engine:
Orchestrator: Primary orchestration class
StateManager: Pipeline state and checkpointing
ExecutionEngine: Task execution and coordination
ErrorHandler: Error management and recovery
Quick Reference
Core Classes
|
Core task abstraction for the orchestrator. |
|
Pipeline represents a collection of tasks with dependencies. |
|
Main orchestration engine. |
|
Compiles YAML definitions into executable pipelines. |
Central registry for all available models. |
Main Functions
|
Initialize the pool of available models by reading models.yaml and environment. |
|
Compile a YAML pipeline file into an executable OrchestratorPipeline. |
|
Compile a YAML pipeline file into an executable OrchestratorPipeline (async version). |
Key Exceptions
Usage Examples
Basic Usage
import orchestrator as orc
# Initialize models
registry = orc.init_models()
# Compile pipeline
pipeline = orc.compile("my_pipeline.yaml")
# Execute
result = pipeline.run(input_param="value")
Advanced Usage
from orchestrator import Orchestrator
from orchestrator.core.control_system import MockControlSystem
from orchestrator.models.model_registry import ModelRegistry
# Create custom orchestrator
control_system = MockControlSystem()
orchestrator = Orchestrator(control_system=control_system)
# Use custom model registry
registry = ModelRegistry()
# ... configure models
# Compile with custom settings
pipeline = orchestrator.compile(
yaml_content,
config={"timeout": 3600}
)
Type Annotations
The Orchestrator framework uses comprehensive type annotations for better IDE support and type checking:
from typing import Dict, Any, List, Optional
from orchestrator import Pipeline, Task
def process_pipeline(
pipeline: Pipeline,
inputs: Dict[str, Any],
timeout: Optional[int] = None
) -> Dict[str, Any]:
return pipeline.run(**inputs)
Environment Variables
The framework recognizes these environment variables:
Variable |
Description |
|---|---|
|
Home directory for configuration and cache |
|
Logging level (DEBUG, INFO, WARNING, ERROR) |
|
Default timeout for model operations |
|
Default timeout for tool operations |
|
Auto-start MCP server when tools detected |
Configuration
Default configuration can be overridden using a config file at ~/.orchestrator/config.yaml:
models:
default: "ollama:gemma2:27b"
fallback: "ollama:llama3.2:1b"
timeout: 300
tools:
mcp_port: 8000
auto_start: true
execution:
parallel: true
checkpoint: true
timeout: 3600
logging:
level: "INFO"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
Performance Considerations
Model Loading
Models are loaded lazily and cached. For better performance:
# Initialize models once at startup
orc.init_models()
# Reuse compiled pipelines
pipeline = orc.compile("pipeline.yaml")
# Multiple executions reuse the same pipeline
for inputs in input_batches:
result = pipeline.run(**inputs)
Memory Management
Large pipelines and datasets can consume significant memory:
# Enable checkpointing for long pipelines
pipeline = orc.compile("pipeline.yaml", config={
"checkpoint": True,
"memory_limit": "8GB"
})
# Process data in batches
for batch in data_batches:
result = pipeline.run(data=batch)
# Results are automatically checkpointed
Error Handling
The framework provides structured error handling:
from orchestrator import CompilationError, ExecutionError
try:
pipeline = orc.compile("pipeline.yaml")
result = pipeline.run(input="value")
except CompilationError as e:
print(f"Pipeline compilation failed: {e}")
print(f"Error details: {e.details}")
except ExecutionError as e:
print(f"Pipeline execution failed: {e}")
print(f"Failed step: {e.step_id}")
print(f"Error context: {e.context}")
Debugging
Enable detailed logging for debugging:
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Compile with debug information
pipeline = orc.compile("pipeline.yaml", debug=True)
# Execute with verbose output
result = pipeline.run(input="value", _verbose=True)
Extension Points
The framework provides several extension points:
Custom Control Systems
from orchestrator.core.control_system import ControlSystem
class MyControlSystem(ControlSystem):
async def execute_task(self, task: Task, context: dict) -> dict:
# Custom execution logic
pass
Custom Tools
from orchestrator.tools.base import Tool
class MyTool(Tool):
def __init__(self):
super().__init__("my-tool", "Description")
async def execute(self, **kwargs) -> dict:
# Tool implementation
pass
Custom Models
from orchestrator.core.model import Model
class MyModel(Model):
async def generate(self, prompt: str, **kwargs) -> str:
# Model implementation
pass
Thread Safety
The framework is designed to be thread-safe:
import concurrent.futures
# Safe to use across threads
pipeline = orc.compile("pipeline.yaml")
def process_input(input_data):
return pipeline.run(**input_data)
# Parallel execution
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_input, data)
for data in input_datasets]
results = [f.result() for f in futures]
Testing
Testing utilities and patterns:
from orchestrator.testing import MockModel, TestRunner
def test_my_pipeline():
# Use mock model for testing
with MockModel() as mock:
mock.set_response("test response")
pipeline = orc.compile("test_pipeline.yaml")
result = pipeline.run(input="test")
assert result == "expected"
# Test runner for pipeline validation
runner = TestRunner("pipelines/")
runner.validate_all() # Validates all YAML files
runner.test_compilation() # Tests compilation
runner.run_smoke_tests() # Basic execution tests
Troubleshooting
Common Issues
ImportError: No module named ‘orchestrator’
Ensure the package is installed:
pip install py-orcCheck virtual environment activation
Model Loading Failures
Verify model availability:
ollama listCheck API keys for cloud models
Ensure sufficient memory for local models
Pipeline Compilation Errors
Validate YAML syntax
Check required fields in pipeline definition
Verify template syntax
Tool Execution Failures
Check tool dependencies (e.g., pandoc for PDF generation)
Verify network connectivity for web tools
Check file permissions for system tools
Getting Help
Documentation: https://orc.readthedocs.io
GitHub Issues: https://github.com/ContextLab/orchestrator/issues
Discussions: https://github.com/ContextLab/orchestrator/discussions