orchestrator.Pipeline

class orchestrator.Pipeline(id, name, tasks=<factory>, context=<factory>, metadata=<factory>, created_at=<factory>, version='1.0.0', description=None)[source]

Bases: object

Pipeline represents a collection of tasks with dependencies.

A pipeline manages the execution order of tasks based on their dependencies and provides methods for validation and execution planning.

__init__(id, name, tasks=<factory>, context=<factory>, metadata=<factory>, created_at=<factory>, version='1.0.0', description=None)

Methods

__init__(id, name[, tasks, context, ...])

add_task(task)

Add a task to the pipeline.

clear_tasks()

Clear all tasks from the pipeline.

from_dict(data)

Create pipeline from dictionary representation.

get_completed_tasks()

Get list of completed task IDs.

get_critical_path()

Get the critical path (longest path) through the pipeline.

get_dependencies(task_id)

Get dependencies for a task.

get_dependents(task_id)

Get tasks that depend on the given task.

get_execution_levels()

Get tasks grouped by execution level (parallel groups).

get_execution_order()

Get flat execution order of tasks.

get_execution_order_flat()

Get flat execution order of tasks.

get_failed_tasks()

Get list of failed task IDs.

get_progress()

Get pipeline execution progress.

get_ready_task_ids(completed_tasks)

Get task IDs that are ready to execute (legacy interface).

get_ready_tasks(completed_tasks)

Get tasks that are ready to execute.

get_running_tasks()

Get list of running task IDs.

get_status()

Get pipeline status summary.

get_task(task_id)

Get a task by ID.

get_task_safe(task_id)

Get a task by ID without raising exceptions.

get_task_strict(task_id)

Get a task by ID (legacy interface that raises exceptions).

has_task(task_id)

Check if pipeline has a task with given ID.

is_complete()

Check if all tasks are completed.

is_failed()

Check if any critical task has failed.

is_valid()

Check if pipeline is valid.

remove_task(task_id)

Remove a task from the pipeline and return it.

remove_task_strict(task_id)

Remove a task from the pipeline (legacy interface that raises exceptions).

reset()

Reset all tasks to pending state.

to_dict()

Convert pipeline to dictionary representation.

Attributes

description

task_count

Get number of tasks in pipeline.

version

id

name

tasks

context

metadata

created_at

id: str
name: str
tasks: Dict[str, Task]
context: Dict[str, Any]
metadata: Dict[str, Any]
created_at: float
version: str = '1.0.0'
description: Optional[str] = None
__post_init__()[source]

Validate pipeline after initialization.

Return type:

None

add_task(task)[source]

Add a task to the pipeline.

Parameters:

task (Task) – Task to add

Raises:

ValueError – If task with same ID already exists

Return type:

None

remove_task(task_id)[source]

Remove a task from the pipeline and return it.

Parameters:

task_id (str) – ID of task to remove

Return type:

Optional[Task]

Returns:

Removed task, or None if task doesn’t exist

Raises:

ValueError – If other tasks depend on it

remove_task_strict(task_id)[source]

Remove a task from the pipeline (legacy interface that raises exceptions).

Parameters:

task_id (str) – ID of task to remove

Return type:

Task

Returns:

Removed task

Raises:

ValueError – If task doesn’t exist or other tasks depend on it

get_task(task_id)[source]

Get a task by ID.

Parameters:

task_id (str) – Task ID

Return type:

Optional[Task]

Returns:

Task object, or None if not found

get_task_safe(task_id)[source]

Get a task by ID without raising exceptions.

Parameters:

task_id (str) – Task ID

Return type:

Optional[Task]

Returns:

Task object or None if not found

get_task_strict(task_id)[source]

Get a task by ID (legacy interface that raises exceptions).

Parameters:

task_id (str) – Task ID

Return type:

Task

Returns:

Task object

Raises:

ValueError – If task doesn’t exist

get_execution_order()[source]

Get flat execution order of tasks.

Return type:

List[str]

Returns:

List of task IDs in execution order

get_ready_tasks(completed_tasks)[source]

Get tasks that are ready to execute.

Parameters:

completed_tasks (Set[str]) – Set of completed task IDs

Return type:

List[Task]

Returns:

List of Task objects ready for execution

get_ready_task_ids(completed_tasks)[source]

Get task IDs that are ready to execute (legacy interface).

Parameters:

completed_tasks (Set[str]) – Set of completed task IDs

Return type:

List[str]

Returns:

List of task IDs ready for execution

get_failed_tasks()[source]

Get list of failed task IDs.

Return type:

List[str]

get_completed_tasks()[source]

Get list of completed task IDs.

Return type:

List[str]

get_running_tasks()[source]

Get list of running task IDs.

Return type:

List[str]

reset()[source]

Reset all tasks to pending state.

Return type:

None

is_complete()[source]

Check if all tasks are completed.

Return type:

bool

is_failed()[source]

Check if any critical task has failed.

Return type:

bool

get_progress()[source]

Get pipeline execution progress.

Return type:

Dict[str, int]

get_critical_path()[source]

Get the critical path (longest path) through the pipeline.

Return type:

List[str]

Returns:

List of task IDs in the critical path

to_dict()[source]

Convert pipeline to dictionary representation.

Return type:

Dict[str, Any]

classmethod from_dict(data)[source]

Create pipeline from dictionary representation.

Return type:

Pipeline

__repr__()[source]

String representation of pipeline.

Return type:

str

__len__()[source]

Number of tasks in pipeline.

Return type:

int

__contains__(task_id)[source]

Check if task exists in pipeline.

Return type:

bool

__iter__()[source]

Iterate over task IDs.

has_task(task_id)[source]

Check if pipeline has a task with given ID.

Parameters:

task_id (str) – Task ID to check

Return type:

bool

Returns:

True if task exists, False otherwise

get_execution_order_flat()[source]

Get flat execution order of tasks.

Return type:

List[str]

Returns:

List of task IDs in execution order

get_execution_levels()[source]

Get tasks grouped by execution level (parallel groups).

Return type:

List[List[str]]

Returns:

List of lists, where each inner list contains task IDs that can be executed in parallel at that level

get_dependencies(task_id)[source]

Get dependencies for a task.

Parameters:

task_id (str) – Task ID

Return type:

List[str]

Returns:

List of dependency task IDs

get_dependents(task_id)[source]

Get tasks that depend on the given task.

Parameters:

task_id (str) – Task ID

Return type:

List[str]

Returns:

List of dependent task IDs

is_valid()[source]

Check if pipeline is valid.

Return type:

bool

Returns:

True if valid, False otherwise

get_status()[source]

Get pipeline status summary.

Return type:

Dict[str, int]

Returns:

Dictionary with status counts

clear_tasks()[source]

Clear all tasks from the pipeline.

Return type:

None

property task_count: int

Get number of tasks in pipeline.

__eq__(other)[source]

Check equality based on pipeline ID.

Return type:

bool

__hash__()[source]

Hash based on pipeline ID.

Return type:

int

__init__(id, name, tasks=<factory>, context=<factory>, metadata=<factory>, created_at=<factory>, version='1.0.0', description=None)