"""
Orchestrator: AI pipeline orchestration framework with intelligent ambiguity resolution.
This package provides a unified interface for executing AI pipelines defined in YAML
with automatic ambiguity resolution using LLMs.
"""
import asyncio
from pathlib import Path
from typing import Any, Dict, Optional
from .compiler.yaml_compiler import YAMLCompiler
from .core.control_system import ControlSystem
from .core.model import Model, ModelCapabilities, ModelRequirements
from .core.pipeline import Pipeline
from .core.task import Task, TaskStatus
from .integrations.huggingface_model import HuggingFaceModel
from .integrations.ollama_model import OllamaModel
from .models.model_registry import ModelRegistry
from .orchestrator import Orchestrator
from .state.state_manager import StateManager
from .tools.mcp_server import default_mcp_server, default_tool_detector
__version__ = "0.1.0"
__all__ = [
"Orchestrator",
"Task",
"TaskStatus",
"Pipeline",
"Model",
"ModelRegistry",
"YAMLCompiler",
"ControlSystem",
"ErrorHandler",
"ResourceAllocator",
"StateManager",
"HuggingFaceModel",
"OllamaModel",
"init_models",
"compile",
"compile_async",
]
__author__ = "Contextual Dynamics Lab"
__email__ = "contextualdynamics@gmail.com"
# Global instances
_model_registry = None
_orchestrator = None
[docs]
def init_models(config_path: str = "models.yaml") -> ModelRegistry:
"""Initialize the pool of available models by reading models.yaml and environment."""
global _model_registry
from .utils.model_utils import (
load_model_config,
parse_model_size,
check_ollama_installed,
)
from .integrations.anthropic_model import AnthropicModel
from .integrations.google_model import GoogleModel
from .integrations.openai_model import OpenAIModel
import os
print(">> Initializing model pool...")
_model_registry = ModelRegistry()
# Load model configuration
config = load_model_config(config_path)
models_config = config.get("models", [])
# Check if Ollama is installed
ollama_available = check_ollama_installed()
if not ollama_available:
print(">> ⚠️ Ollama not found - Ollama models will not be available")
print(">> Install from: https://ollama.ai")
# Process each model in configuration
for model_config in models_config:
source = model_config.get("source")
name = model_config.get("name")
expertise = model_config.get("expertise", ["general"])
size_str = model_config.get("size")
if not source or not name:
continue
# Parse model size
size_billions = parse_model_size(name, size_str)
try:
if source == "ollama":
if not ollama_available:
continue
# Register model for lazy loading (will be downloaded on first use)
# Use a lazy wrapper that doesn't check availability yet
from .integrations.lazy_ollama_model import LazyOllamaModel
model = LazyOllamaModel(model_name=name, timeout=60)
# Add dynamic attributes for model selection
setattr(model, "_expertise", expertise)
setattr(model, "_size_billions", size_billions)
_model_registry.register_model(model)
print(
f">> 📦 Registered Ollama model: {name} ({size_billions}B params) - will download on first use"
)
elif source == "huggingface":
# Check if transformers is available
try:
import importlib.util
if importlib.util.find_spec("transformers") is not None:
# Register for lazy loading (will be downloaded on first use)
from .integrations.lazy_huggingface_model import (
LazyHuggingFaceModel,
)
hf_model = LazyHuggingFaceModel(model_name=name)
# Add dynamic attributes for model selection
setattr(hf_model, "_expertise", expertise)
setattr(hf_model, "_size_billions", size_billions)
_model_registry.register_model(hf_model)
print(
f">> 📦 Registered HuggingFace model: {name} ({size_billions}B params) - will download on first use"
)
except ImportError:
print(
f">> ⚠️ HuggingFace model {name} configured but transformers not installed"
)
print(">> Install with: pip install transformers torch")
except Exception as e:
print(f">> ⚠️ Failed to register HuggingFace model {name}: {e}")
elif source == "openai" and os.environ.get("OPENAI_API_KEY"):
# Only register if API key is available
model = OpenAIModel(name=name, model=name)
# Add dynamic attributes for model selection
setattr(model, "_expertise", expertise)
setattr(model, "_size_billions", size_billions)
_model_registry.register_model(model)
print(
f">> ✅ Registered OpenAI model: {name} ({size_billions}B params)"
)
elif source == "anthropic" and os.environ.get("ANTHROPIC_API_KEY"):
# Only register if API key is available
model = AnthropicModel(name=name, model=name)
# Add dynamic attributes for model selection
setattr(model, "_expertise", expertise)
setattr(model, "_size_billions", size_billions)
_model_registry.register_model(model)
print(
f">> ✅ Registered Anthropic model: {name} ({size_billions}B params)"
)
elif source == "google" and os.environ.get("GOOGLE_API_KEY"):
# Only register if API key is available
model = GoogleModel(name=name, model=name)
# Add dynamic attributes for model selection
setattr(model, "_expertise", expertise)
setattr(model, "_size_billions", size_billions)
_model_registry.register_model(model)
print(
f">> ✅ Registered Google model: {name} ({size_billions}B params)"
)
except Exception as e:
print(f">> ⚠️ Error registering {source} model {name}: {e}")
print(
f"\n>> Model initialization complete: {len(_model_registry.list_models())} models registered"
)
if not _model_registry.list_models():
print(">> ⚠️ No models available - using mock fallback")
# Store defaults in registry for later use
setattr(_model_registry, "_defaults", config.get("defaults", {}))
return _model_registry
class OrchestratorPipeline:
"""Wrapper for compiled pipeline that can be called with keyword arguments."""
def __init__(
self, pipeline: Pipeline, compiler: YAMLCompiler, orchestrator: Orchestrator
):
self.pipeline = pipeline
self.compiler = compiler
self.orchestrator = orchestrator
self._print_usage()
def _print_usage(self) -> None:
"""Print keyword arguments as shown in README."""
print(">> keyword arguments:")
# Extract inputs from the raw pipeline definition
inputs = self._extract_inputs()
if inputs:
for name, info in inputs.items():
if isinstance(info, dict):
desc = info.get("description", "No description")
type_str = info.get("type", "String").title()
required = " (required)" if info.get("required", False) else ""
print(f">> {name}: {desc} (type: {type_str}){required}")
else:
# Simple string description
print(f">> {name}: {info} (type: String)")
else:
# Default inputs for research report
print(
">> topic: a word or underscore-separated phrase specifying the to-be-researched topic (type: String)"
)
print(
">> instructions: detailed instructions to help guide the report, specify areas of particular interest (or areas to stay away from), etc. (type: String)"
)
def _extract_inputs(self) -> Dict[str, Any]:
"""Extract input definitions from the compiled pipeline."""
# The inputs are stored in the pipeline's metadata during compilation
if hasattr(self.pipeline, "metadata") and "inputs" in self.pipeline.metadata:
return self.pipeline.metadata["inputs"]
# If not in metadata, try to get from the original definition
# This is a fallback - we should enhance the compilation process
return {}
def run(self, **kwargs: Any) -> Any:
"""Run the pipeline with given keyword arguments."""
# Run pipeline asynchronously
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self._run_async(**kwargs))
finally:
loop.close()
async def _run_async(self, **kwargs):
"""Async pipeline execution."""
# Validate required inputs
self._validate_inputs(kwargs)
# Resolve outputs using AUTO tags
outputs = await self._resolve_outputs(kwargs)
# Create full context from kwargs and resolved outputs
context = {"inputs": kwargs, "outputs": outputs}
# Apply runtime template resolution to pipeline tasks
resolved_pipeline = await self._resolve_runtime_templates(
self.pipeline, context
)
# Execute pipeline
# Set the resolved pipeline's context
resolved_pipeline.context = context
results = await self.orchestrator.execute_pipeline(resolved_pipeline)
# Return the final output (PDF path or report content)
if outputs and "pdf" in outputs:
return outputs["pdf"]
elif "final_report" in results:
return results["final_report"]
else:
return results
def _validate_inputs(self, kwargs):
"""Validate that required inputs are provided."""
inputs_def = self._extract_inputs()
for name, info in inputs_def.items():
if isinstance(info, dict) and info.get("required", False):
if name not in kwargs:
raise ValueError(f"Required input '{name}' not provided")
async def _resolve_outputs(self, inputs):
"""Resolve output definitions using AUTO tags."""
outputs = {}
outputs_def = self._extract_outputs()
if outputs_def:
from jinja2 import Template
for name, value in outputs_def.items():
if isinstance(value, str):
if value.startswith("<AUTO>") and value.endswith("</AUTO>"):
# Resolve AUTO tag
auto_content = value[6:-7] # Remove <AUTO> tags
if hasattr(
self.orchestrator.yaml_compiler, "ambiguity_resolver"
):
resolved = await self.orchestrator.yaml_compiler.ambiguity_resolver.resolve(
auto_content, f"outputs.{name}"
)
outputs[name] = resolved
else:
outputs[name] = (
f"report_{inputs.get('topic', 'research')}.pdf"
)
else:
# Regular template - render with current context
try:
template = Template(value)
outputs[name] = template.render(
inputs=inputs, outputs=outputs
)
except Exception:
outputs[name] = value
else:
outputs[name] = value
return outputs
def _extract_outputs(self):
"""Extract output definitions from the compiled pipeline."""
if hasattr(self.pipeline, "metadata") and "outputs" in self.pipeline.metadata:
return self.pipeline.metadata["outputs"]
return {}
async def _resolve_runtime_templates(
self, pipeline: Pipeline, context: Dict[str, Any]
) -> Pipeline:
"""Resolve templates in pipeline tasks at runtime."""
import copy
# Create a deep copy to avoid modifying the original
resolved_pipeline = copy.deepcopy(pipeline)
# Resolve templates in each task
for task_id, task in resolved_pipeline.tasks.items():
if hasattr(task, "parameters"):
task.parameters = await self._resolve_task_templates(
task.parameters, context
)
return resolved_pipeline
async def _resolve_task_templates(self, obj, context):
"""Recursively resolve templates in task parameters."""
from jinja2 import Template
if isinstance(obj, str):
if "{{" in obj and "}}" in obj:
try:
template = Template(obj)
return template.render(**context)
except Exception:
# If template resolution fails, return original
return obj
return obj
elif isinstance(obj, dict):
return {
k: await self._resolve_task_templates(v, context)
for k, v in obj.items()
}
elif isinstance(obj, list):
return [await self._resolve_task_templates(item, context) for item in obj]
else:
return obj
[docs]
async def compile_async(yaml_path: str) -> "OrchestratorPipeline":
"""Compile a YAML pipeline file into an executable OrchestratorPipeline (async version)."""
global _orchestrator, _model_registry
# Ensure models are initialized
if _model_registry is None:
init_models()
# Create orchestrator with mock control system (will be replaced)
from .core.control_system import MockControlSystem
control_system = MockControlSystem()
_orchestrator = Orchestrator(control_system=control_system)
# Set up model for ambiguity resolution
model_keys = _model_registry.list_models() if _model_registry else []
if model_keys:
best_model_key = model_keys[0] # Assume first is best (gemma2:27b)
# Get model object directly from the models dict
best_model = _model_registry.models[best_model_key]
_orchestrator.yaml_compiler.ambiguity_resolver.model = best_model
print(f">> Using model for AUTO resolution: {best_model_key}")
# Load and compile YAML
yaml_path = Path(yaml_path)
if not yaml_path.exists():
raise FileNotFoundError(f"Pipeline file not found: {yaml_path}")
with open(yaml_path, "r") as f:
yaml_content = f.read()
# Parse YAML to detect required tools
import yaml as yaml_lib
pipeline_def = yaml_lib.safe_load(yaml_content)
# Auto-detect and register tools
required_tools = default_tool_detector.detect_tools_from_yaml(pipeline_def)
if required_tools:
print(f">> Detected required tools: {', '.join(required_tools)}")
availability = default_tool_detector.ensure_tools_available(required_tools)
for tool, available in availability.items():
status = "✅" if available else "❌"
print(f">> {status} {tool}")
# Start MCP server if tools are required
if required_tools and any(availability.values()):
print(">> Starting MCP tool server...")
await default_mcp_server.start_server()
# Compile pipeline
pipeline = await _orchestrator.yaml_compiler.compile(yaml_content, {})
# Return callable pipeline
return OrchestratorPipeline(pipeline, _orchestrator.yaml_compiler, _orchestrator)
[docs]
def compile(yaml_path: str) -> "OrchestratorPipeline":
"""Compile a YAML pipeline file into an executable OrchestratorPipeline."""
# Check if we're already in an event loop
try:
asyncio.get_running_loop()
# We're in an async context, need to run in a new thread or return a coroutine
import concurrent.futures
def run_in_new_loop():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
return new_loop.run_until_complete(compile_async(yaml_path))
finally:
new_loop.close()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_new_loop)
return future.result()
except RuntimeError:
# No event loop running, we can create one
return asyncio.run(compile_async(yaml_path))
__all__ = [
"Task",
"TaskStatus",
"Pipeline",
"Model",
"ModelCapabilities",
"ModelRequirements",
"ControlSystem",
"YAMLCompiler",
"ModelRegistry",
"StateManager",
"Orchestrator",
"init_models",
"compile",
]