orchestrator.Task

class orchestrator.Task(id, name, action, parameters=<factory>, dependencies=<factory>, status=TaskStatus.PENDING, result=None, error=None, metadata=<factory>, timeout=None, max_retries=3, retry_count=0, created_at=<factory>, started_at=None, completed_at=None)[source]

Bases: object

Core task abstraction for the orchestrator.

A task represents a single unit of work in a pipeline with dependencies, parameters, and execution metadata.

__init__(id, name, action, parameters=<factory>, dependencies=<factory>, status=TaskStatus.PENDING, result=None, error=None, metadata=<factory>, timeout=None, max_retries=3, retry_count=0, created_at=<factory>, started_at=None, completed_at=None)

Methods

__init__(id, name, action[, parameters, ...])

can_retry()

Check if task can be retried.

complete([result])

Mark task as completed.

fail(error)

Mark task as failed.

from_dict(data)

Create task from dictionary representation.

is_ready(completed_tasks)

Check if all dependencies are satisfied.

reset()

Reset task to pending state.

skip([reason])

Mark task as skipped.

start()

Mark task as started.

to_dict()

Convert task to dictionary representation.

Attributes

completed_at

error

execution_time

Get task execution time in seconds.

is_terminal

Check if task is in a terminal state.

max_retries

result

retry_count

started_at

status

timeout

id

name

action

parameters

dependencies

metadata

created_at

id: str
name: str
action: str
parameters: Dict[str, Any]
dependencies: List[str]
status: TaskStatus = 'pending'
result: Optional[Any] = None
error: Optional[Exception] = None
metadata: Dict[str, Any]
timeout: Optional[int] = None
max_retries: int = 3
retry_count: int = 0
created_at: float
started_at: Optional[float] = None
completed_at: Optional[float] = None
__post_init__()[source]

Validate task after initialization.

Return type:

None

is_ready(completed_tasks)[source]

Check if all dependencies are satisfied.

Parameters:

completed_tasks (Set[str]) – Set of completed task IDs

Return type:

bool

Returns:

True if all dependencies are satisfied, False otherwise

can_retry()[source]

Check if task can be retried.

Return type:

bool

start()[source]

Mark task as started.

Return type:

None

complete(result=None)[source]

Mark task as completed.

Return type:

None

fail(error)[source]

Mark task as failed.

Return type:

None

skip(reason='')[source]

Mark task as skipped.

Return type:

None

reset()[source]

Reset task to pending state.

Return type:

None

property execution_time: float | None

Get task execution time in seconds.

property is_terminal: bool

Check if task is in a terminal state.

to_dict()[source]

Convert task to dictionary representation.

Return type:

Dict[str, Any]

classmethod from_dict(data)[source]

Create task from dictionary representation.

Return type:

Task

__repr__()[source]

String representation of task.

Return type:

str

__eq__(other)[source]

Check equality based on task ID.

Return type:

bool

__hash__()[source]

Hash based on task ID.

Return type:

int

__init__(id, name, action, parameters=<factory>, dependencies=<factory>, status=TaskStatus.PENDING, result=None, error=None, metadata=<factory>, timeout=None, max_retries=3, retry_count=0, created_at=<factory>, started_at=None, completed_at=None)