"""Core task abstraction for the orchestrator framework."""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set
class TaskStatus(Enum):
"""Task execution status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
[docs]
@dataclass
class Task:
"""
Core task abstraction for the orchestrator.
A task represents a single unit of work in a pipeline with dependencies,
parameters, and execution metadata.
"""
id: str
name: str
action: str
parameters: Dict[str, Any] = field(default_factory=dict)
dependencies: List[str] = field(default_factory=list)
status: TaskStatus = TaskStatus.PENDING
result: Optional[Any] = None
error: Optional[Exception] = None
metadata: Dict[str, Any] = field(default_factory=dict)
timeout: Optional[int] = None
max_retries: int = 3
retry_count: int = 0
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
[docs]
def __post_init__(self) -> None:
"""Validate task after initialization."""
if not self.id:
raise ValueError("Task ID cannot be empty")
if not self.name:
raise ValueError("Task name cannot be empty")
if not self.action:
raise ValueError("Task action cannot be empty")
if self.max_retries < 0:
raise ValueError("Max retries must be non-negative")
# Validate dependencies
if self.id in self.dependencies:
raise ValueError(f"Task {self.id} cannot depend on itself")
[docs]
def is_ready(self, completed_tasks: Set[str]) -> bool:
"""
Check if all dependencies are satisfied.
Args:
completed_tasks: Set of completed task IDs
Returns:
True if all dependencies are satisfied, False otherwise
"""
return all(dep in completed_tasks for dep in self.dependencies)
[docs]
def can_retry(self) -> bool:
"""Check if task can be retried."""
return self.retry_count < self.max_retries and self.status == TaskStatus.FAILED
[docs]
def start(self) -> None:
"""Mark task as started."""
self.status = TaskStatus.RUNNING
self.started_at = time.time()
[docs]
def complete(self, result: Any = None) -> None:
"""Mark task as completed."""
self.status = TaskStatus.COMPLETED
self.result = result
self.completed_at = time.time()
self.error = None
[docs]
def fail(self, error: Exception) -> None:
"""Mark task as failed."""
self.status = TaskStatus.FAILED
self.error = error
self.completed_at = time.time()
self.retry_count += 1
[docs]
def skip(self, reason: str = "") -> None:
"""Mark task as skipped."""
self.status = TaskStatus.SKIPPED
self.completed_at = time.time()
if reason:
self.metadata["skip_reason"] = reason
[docs]
def reset(self) -> None:
"""Reset task to pending state."""
self.status = TaskStatus.PENDING
self.result = None
self.error = None
self.started_at = None
self.completed_at = None
@property
def execution_time(self) -> Optional[float]:
"""Get task execution time in seconds."""
if self.started_at is None:
return None
end_time = self.completed_at or time.time()
return end_time - self.started_at
@property
def is_terminal(self) -> bool:
"""Check if task is in a terminal state."""
return self.status in {
TaskStatus.COMPLETED,
TaskStatus.FAILED,
TaskStatus.SKIPPED,
}
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert task to dictionary representation."""
return {
"id": self.id,
"name": self.name,
"action": self.action,
"parameters": self.parameters,
"dependencies": self.dependencies,
"status": self.status.value,
"result": self.result,
"error": str(self.error) if self.error else None,
"metadata": self.metadata,
"timeout": self.timeout,
"max_retries": self.max_retries,
"retry_count": self.retry_count,
"created_at": self.created_at,
"started_at": self.started_at,
"completed_at": self.completed_at,
"execution_time": self.execution_time,
}
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Task:
"""Create task from dictionary representation."""
# Convert status back to enum
if "status" in data:
data["status"] = TaskStatus(data["status"])
# Handle error
if "error" in data and data["error"]:
data["error"] = Exception(data["error"])
# Remove computed properties
data.pop("execution_time", None)
return cls(**data)
[docs]
def __repr__(self) -> str:
"""String representation of task."""
return f"Task(id='{self.id}', name='{self.name}', status={self.status.value})"
[docs]
def __eq__(self, other: object) -> bool:
"""Check equality based on task ID."""
if not isinstance(other, Task):
return NotImplemented
return self.id == other.id
[docs]
def __hash__(self) -> int:
"""Hash based on task ID."""
return hash(self.id)