Skip to content

structures

__all__ = ['Structure', 'Agent', 'Pipeline', 'Workflow'] module-attribute

Agent

Bases: Structure

Source code in griptape/structures/agent.py
@define
class Agent(Structure):
    input: Union[str, list, tuple, BaseArtifact, Callable[[BaseTask], BaseArtifact]] = field(
        default=lambda task: task.full_context["args"][0] if task.full_context["args"] else TextArtifact(value=""),
    )
    stream: bool = field(default=None, kw_only=True)
    prompt_driver: BasePromptDriver = field(default=None, kw_only=True)
    output_schema: Optional[Schema] = field(default=None, kw_only=True)
    tools: list[BaseTool] = field(factory=list, kw_only=True)
    max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True)
    fail_fast: bool = field(default=False, kw_only=True)
    _tasks: list[Union[BaseTask, list[BaseTask]]] = field(
        factory=list, kw_only=True, alias="tasks", metadata={"serializable": True}
    )

    @fail_fast.validator  # pyright: ignore[reportAttributeAccessIssue]
    def validate_fail_fast(self, _: Attribute, fail_fast: bool) -> None:  # noqa: FBT001
        if fail_fast:
            raise ValueError("Agents cannot fail fast, as they can only have 1 task.")

    @prompt_driver.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
    def validate_prompt_driver(self, _: Attribute, prompt_driver: Optional[BasePromptDriver]) -> None:  # noqa: FBT001
        if prompt_driver is not None and self.stream is not None:
            warnings.warn(
                "`Agent.prompt_driver` is set, but `Agent.stream` was provided. `Agent.stream` will be ignored. This will be an error in the future.",
                UserWarning,
                stacklevel=2,
            )

    @_tasks.validator  # pyright: ignore[reportAttributeAccessIssue]
    def validate_tasks(self, _: Attribute, tasks: list) -> None:
        if tasks and self.prompt_driver is not None:
            warnings.warn(
                "`Agent.tasks` is set, but `Agent.prompt_driver` was provided. `Agent.prompt_driver` will be ignored. This will be an error in the future.",
                UserWarning,
                stacklevel=2,
            )

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()

        if len(self.tasks) == 0:
            self._init_task()

    @property
    def task(self) -> BaseTask:
        return self.tasks[0]

    def add_task(self, task: BaseTask) -> BaseTask:
        self._tasks.clear()

        task.preprocess(self)

        self._tasks.append(task)

        return task

    def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]:
        if len(tasks) > 1:
            raise ValueError("Agents can only have one task.")
        return super().add_tasks(*tasks)

    @observable
    def try_run(self, *args) -> Agent:
        self.task.run()

        return self

    def _init_task(self) -> None:
        if self.stream is None:
            with validators.disabled():
                self.stream = Defaults.drivers_config.prompt_driver.stream

        if self.prompt_driver is None:
            with validators.disabled():
                self.prompt_driver = evolve(Defaults.drivers_config.prompt_driver, stream=self.stream)

        task = PromptTask(
            self.input,
            prompt_driver=self.prompt_driver,
            tools=self.tools,
            output_schema=self.output_schema,
            max_meta_memory_entries=self.max_meta_memory_entries,
        )

        self.add_task(task)

fail_fast: bool = field(default=False, kw_only=True) class-attribute instance-attribute

input: Union[str, list, tuple, BaseArtifact, Callable[[BaseTask], BaseArtifact]] = field(default=lambda task: task.full_context['args'][0] if task.full_context['args'] else TextArtifact(value='')) class-attribute instance-attribute

max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True) class-attribute instance-attribute

output_schema: Optional[Schema] = field(default=None, kw_only=True) class-attribute instance-attribute

prompt_driver: BasePromptDriver = field(default=None, kw_only=True) class-attribute instance-attribute

stream: bool = field(default=None, kw_only=True) class-attribute instance-attribute

task: BaseTask property

tools: list[BaseTool] = field(factory=list, kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/structures/agent.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()

    if len(self.tasks) == 0:
        self._init_task()

add_task(task)

Source code in griptape/structures/agent.py
def add_task(self, task: BaseTask) -> BaseTask:
    self._tasks.clear()

    task.preprocess(self)

    self._tasks.append(task)

    return task

add_tasks(*tasks)

Source code in griptape/structures/agent.py
def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]:
    if len(tasks) > 1:
        raise ValueError("Agents can only have one task.")
    return super().add_tasks(*tasks)

try_run(*args)

Source code in griptape/structures/agent.py
@observable
def try_run(self, *args) -> Agent:
    self.task.run()

    return self

validate_fail_fast(_, fail_fast)

Source code in griptape/structures/agent.py
@fail_fast.validator  # pyright: ignore[reportAttributeAccessIssue]
def validate_fail_fast(self, _: Attribute, fail_fast: bool) -> None:  # noqa: FBT001
    if fail_fast:
        raise ValueError("Agents cannot fail fast, as they can only have 1 task.")

validate_prompt_driver(_, prompt_driver)

Source code in griptape/structures/agent.py
@prompt_driver.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
def validate_prompt_driver(self, _: Attribute, prompt_driver: Optional[BasePromptDriver]) -> None:  # noqa: FBT001
    if prompt_driver is not None and self.stream is not None:
        warnings.warn(
            "`Agent.prompt_driver` is set, but `Agent.stream` was provided. `Agent.stream` will be ignored. This will be an error in the future.",
            UserWarning,
            stacklevel=2,
        )

validate_tasks(_, tasks)

Source code in griptape/structures/agent.py
@_tasks.validator  # pyright: ignore[reportAttributeAccessIssue]
def validate_tasks(self, _: Attribute, tasks: list) -> None:
    if tasks and self.prompt_driver is not None:
        warnings.warn(
            "`Agent.tasks` is set, but `Agent.prompt_driver` was provided. `Agent.prompt_driver` will be ignored. This will be an error in the future.",
            UserWarning,
            stacklevel=2,
        )

Pipeline

Bases: Structure

Source code in griptape/structures/pipeline.py
@define
class Pipeline(Structure):
    def add_task(self, task: BaseTask) -> BaseTask:
        if (existing_task := self.try_find_task(task.id)) is not None:
            return existing_task

        task.preprocess(self)

        if self.output_task:
            self.output_task.child_ids.append(task.id)
            task.parent_ids.append(self.output_task.id)

        self._tasks.append(task)

        return task

    def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask:
        task.preprocess(self)

        if parent_task.children:
            child_task = parent_task.children[0]

            task.child_ids.append(child_task.id)
            child_task.parent_ids.append(task.id)

            child_task.parent_ids.remove(parent_task.id)
            parent_task.child_ids.remove(child_task.id)

        task.parent_ids.append(parent_task.id)
        parent_task.child_ids.append(task.id)

        parent_index = self.tasks.index(parent_task)
        self._tasks.insert(parent_index + 1, task)

        return task

    @observable
    def try_run(self, *args) -> Pipeline:
        self.__run_from_task(self.input_task)

        return self

    def context(self, task: BaseTask) -> dict[str, Any]:
        context = super().context(task)

        context.update(
            {
                "parent_output": task.parents[0].output if task.parents else None,
                "task_outputs": self.task_outputs,
                "parent": task.parents[0] if task.parents else None,
                "child": task.children[0] if task.children else None,
            },
        )

        return context

    def __run_from_task(self, task: Optional[BaseTask]) -> None:
        if task is None:
            return
        else:
            if isinstance(task.run(), ErrorArtifact) and self.fail_fast:
                return
            else:
                self.__run_from_task(next(iter(task.children), None))

__run_from_task(task)

Source code in griptape/structures/pipeline.py
def __run_from_task(self, task: Optional[BaseTask]) -> None:
    if task is None:
        return
    else:
        if isinstance(task.run(), ErrorArtifact) and self.fail_fast:
            return
        else:
            self.__run_from_task(next(iter(task.children), None))

add_task(task)

Source code in griptape/structures/pipeline.py
def add_task(self, task: BaseTask) -> BaseTask:
    if (existing_task := self.try_find_task(task.id)) is not None:
        return existing_task

    task.preprocess(self)

    if self.output_task:
        self.output_task.child_ids.append(task.id)
        task.parent_ids.append(self.output_task.id)

    self._tasks.append(task)

    return task

context(task)

Source code in griptape/structures/pipeline.py
def context(self, task: BaseTask) -> dict[str, Any]:
    context = super().context(task)

    context.update(
        {
            "parent_output": task.parents[0].output if task.parents else None,
            "task_outputs": self.task_outputs,
            "parent": task.parents[0] if task.parents else None,
            "child": task.children[0] if task.children else None,
        },
    )

    return context

insert_task(parent_task, task)

Source code in griptape/structures/pipeline.py
def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask:
    task.preprocess(self)

    if parent_task.children:
        child_task = parent_task.children[0]

        task.child_ids.append(child_task.id)
        child_task.parent_ids.append(task.id)

        child_task.parent_ids.remove(parent_task.id)
        parent_task.child_ids.remove(child_task.id)

    task.parent_ids.append(parent_task.id)
    parent_task.child_ids.append(task.id)

    parent_index = self.tasks.index(parent_task)
    self._tasks.insert(parent_index + 1, task)

    return task

try_run(*args)

Source code in griptape/structures/pipeline.py
@observable
def try_run(self, *args) -> Pipeline:
    self.__run_from_task(self.input_task)

    return self

Structure

Bases: RuleMixin, SerializableMixin, RunnableMixin['Structure'], ABC

Source code in griptape/structures/structure.py
@define
class Structure(RuleMixin, SerializableMixin, RunnableMixin["Structure"], ABC):
    id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={"serializable": True})
    _tasks: list[Union[BaseTask, list[BaseTask]]] = field(
        factory=list, kw_only=True, alias="tasks", metadata={"serializable": True}
    )
    conversation_memory: Optional[BaseConversationMemory] = field(
        default=Factory(lambda: ConversationMemory()),
        kw_only=True,
        metadata={"serializable": True},
    )
    conversation_memory_strategy: Literal["per_structure", "per_task"] = field(
        default="per_structure", kw_only=True, metadata={"serializable": True}
    )
    task_memory: TaskMemory = field(
        default=Factory(lambda self: TaskMemory(), takes_self=True),
        kw_only=True,
    )
    meta_memory: MetaMemory = field(default=Factory(lambda: MetaMemory()), kw_only=True)
    fail_fast: bool = field(default=True, kw_only=True, metadata={"serializable": True})
    _execution_args: tuple = ()
    _event_queue: Queue[BaseEvent] = field(default=Factory(lambda: Queue()), init=False)

    def __attrs_post_init__(self) -> None:
        tasks = self._tasks.copy()
        self._tasks.clear()
        self.add_tasks(*tasks)

    def __add__(self, other: BaseTask | list[BaseTask | list[BaseTask]]) -> list[BaseTask]:
        return self.add_tasks(*other) if isinstance(other, list) else self.add_tasks(other)

    @property
    def tasks(self) -> list[BaseTask]:
        tasks = []

        for task in self._tasks:
            if isinstance(task, list):
                tasks.extend(task)
            else:
                tasks.append(task)
        return tasks

    @property
    def execution_args(self) -> tuple:
        return self._execution_args

    @property
    def input_task(self) -> Optional[BaseTask]:
        return self.tasks[0] if self.tasks else None

    @property
    def output_task(self) -> Optional[BaseTask]:
        return self.tasks[-1] if self.tasks else None

    @property
    def output(self) -> BaseArtifact:
        if self.output_task is None:
            raise ValueError("Structure has no output Task. Add a Task to the Structure to generate output.")
        if self.output_task.output is None:
            raise ValueError("Structure's output Task has no output. Run the Structure to generate output.")
        return self.output_task.output

    @property
    def task_outputs(self) -> dict[str, Optional[BaseArtifact]]:
        return {task.id: task.output for task in self.tasks}

    @property
    def finished_tasks(self) -> list[BaseTask]:
        return [s for s in self.tasks if s.is_finished()]

    def is_finished(self) -> bool:
        return all(not s.can_run() for s in self.tasks)

    def is_running(self) -> bool:
        return any(s for s in self.tasks if s.is_running())

    def find_task(self, task_id: str) -> BaseTask:
        if (task := self.try_find_task(task_id)) is not None:
            return task
        raise ValueError(f"Task with id {task_id} doesn't exist.")

    def try_find_task(self, task_id: str) -> Optional[BaseTask]:
        for task in self.tasks:
            if task.id == task_id:
                return task
        return None

    def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]:
        added_tasks = []
        for task in tasks:
            if isinstance(task, list):
                added_tasks.extend(self.add_tasks(*task))
            else:
                added_tasks.append(self.add_task(task))
        return added_tasks

    def context(self, task: BaseTask) -> dict[str, Any]:
        return {"args": self.execution_args, "structure": self}

    def resolve_relationships(self) -> None:
        task_by_id = {}
        for task in self.tasks:
            if task.id in task_by_id:
                raise ValueError(f"Duplicate task with id {task.id} found.")
            task_by_id[task.id] = task

        for task in self.tasks:
            # Ensure parents include this task as a child
            for parent_id in task.parent_ids:
                if parent_id not in task_by_id:
                    raise ValueError(f"Task with id {parent_id} doesn't exist.")
                parent = task_by_id[parent_id]
                if task.id not in parent.child_ids:
                    parent.child_ids.append(task.id)

            # Ensure children include this task as a parent
            for child_id in task.child_ids:
                if child_id not in task_by_id:
                    raise ValueError(f"Task with id {child_id} doesn't exist.")
                child = task_by_id[child_id]
                if task.id not in child.parent_ids:
                    child.parent_ids.append(task.id)

    @observable
    def before_run(self, args: Any) -> None:
        super().before_run(args)
        self._execution_args = args

        [task.reset() for task in self.tasks]

        if self.input_task is not None:
            EventBus.publish_event(
                StartStructureRunEvent(
                    structure_id=self.id,
                    input_task_input=self.input_task.input,
                    input_task_output=self.input_task.output,
                ),
            )

        self.resolve_relationships()

    @observable
    def after_run(self) -> None:
        super().after_run()

        if self.output_task is not None:
            if (
                self.conversation_memory_strategy == "per_structure"
                and self.conversation_memory is not None
                and self.input_task is not None
                and self.output_task.output is not None
            ):
                run = Run(input=self.input_task.input, output=self.output_task.output)

                self.conversation_memory.add_run(run)

            EventBus.publish_event(
                FinishStructureRunEvent(
                    structure_id=self.id,
                    output_task_input=self.output_task.input,
                    output_task_output=self.output_task.output,
                ),
                flush=True,
            )

    @abstractmethod
    def add_task(self, task: BaseTask) -> BaseTask: ...

    @observable
    def run(self, *args) -> Structure:
        self.before_run(args)

        result = self.try_run(*args)

        self.after_run()

        return result

    @observable
    def run_stream(self, *args, event_types: Optional[list[type[BaseEvent]]] = None) -> Iterator[BaseEvent]:
        if event_types is None:
            event_types = [BaseEvent]
        else:
            if FinishStructureRunEvent not in event_types:
                event_types = [*event_types, FinishStructureRunEvent]

        with EventListener(self._event_queue.put, event_types=event_types):
            t = Thread(target=with_contextvars(self.run), args=args)
            t.start()

            while True:
                event = self._event_queue.get()
                if isinstance(event, FinishStructureRunEvent) and event.structure_id == self.id:
                    break
                else:
                    yield event
            t.join()

    @abstractmethod
    def try_run(self, *args) -> Structure: ...

conversation_memory: Optional[BaseConversationMemory] = field(default=Factory(lambda: ConversationMemory()), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

conversation_memory_strategy: Literal['per_structure', 'per_task'] = field(default='per_structure', kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

execution_args: tuple property

fail_fast: bool = field(default=True, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

finished_tasks: list[BaseTask] property

id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

input_task: Optional[BaseTask] property

meta_memory: MetaMemory = field(default=Factory(lambda: MetaMemory()), kw_only=True) class-attribute instance-attribute

output: BaseArtifact property

output_task: Optional[BaseTask] property

task_memory: TaskMemory = field(default=Factory(lambda self: TaskMemory(), takes_self=True), kw_only=True) class-attribute instance-attribute

task_outputs: dict[str, Optional[BaseArtifact]] property

tasks: list[BaseTask] property

__add__(other)

Source code in griptape/structures/structure.py
def __add__(self, other: BaseTask | list[BaseTask | list[BaseTask]]) -> list[BaseTask]:
    return self.add_tasks(*other) if isinstance(other, list) else self.add_tasks(other)

__attrs_post_init__()

Source code in griptape/structures/structure.py
def __attrs_post_init__(self) -> None:
    tasks = self._tasks.copy()
    self._tasks.clear()
    self.add_tasks(*tasks)

add_task(task) abstractmethod

Source code in griptape/structures/structure.py
@abstractmethod
def add_task(self, task: BaseTask) -> BaseTask: ...

add_tasks(*tasks)

Source code in griptape/structures/structure.py
def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]:
    added_tasks = []
    for task in tasks:
        if isinstance(task, list):
            added_tasks.extend(self.add_tasks(*task))
        else:
            added_tasks.append(self.add_task(task))
    return added_tasks

after_run()

Source code in griptape/structures/structure.py
@observable
def after_run(self) -> None:
    super().after_run()

    if self.output_task is not None:
        if (
            self.conversation_memory_strategy == "per_structure"
            and self.conversation_memory is not None
            and self.input_task is not None
            and self.output_task.output is not None
        ):
            run = Run(input=self.input_task.input, output=self.output_task.output)

            self.conversation_memory.add_run(run)

        EventBus.publish_event(
            FinishStructureRunEvent(
                structure_id=self.id,
                output_task_input=self.output_task.input,
                output_task_output=self.output_task.output,
            ),
            flush=True,
        )

before_run(args)

Source code in griptape/structures/structure.py
@observable
def before_run(self, args: Any) -> None:
    super().before_run(args)
    self._execution_args = args

    [task.reset() for task in self.tasks]

    if self.input_task is not None:
        EventBus.publish_event(
            StartStructureRunEvent(
                structure_id=self.id,
                input_task_input=self.input_task.input,
                input_task_output=self.input_task.output,
            ),
        )

    self.resolve_relationships()

context(task)

Source code in griptape/structures/structure.py
def context(self, task: BaseTask) -> dict[str, Any]:
    return {"args": self.execution_args, "structure": self}

find_task(task_id)

Source code in griptape/structures/structure.py
def find_task(self, task_id: str) -> BaseTask:
    if (task := self.try_find_task(task_id)) is not None:
        return task
    raise ValueError(f"Task with id {task_id} doesn't exist.")

is_finished()

Source code in griptape/structures/structure.py
def is_finished(self) -> bool:
    return all(not s.can_run() for s in self.tasks)

is_running()

Source code in griptape/structures/structure.py
def is_running(self) -> bool:
    return any(s for s in self.tasks if s.is_running())

resolve_relationships()

Source code in griptape/structures/structure.py
def resolve_relationships(self) -> None:
    task_by_id = {}
    for task in self.tasks:
        if task.id in task_by_id:
            raise ValueError(f"Duplicate task with id {task.id} found.")
        task_by_id[task.id] = task

    for task in self.tasks:
        # Ensure parents include this task as a child
        for parent_id in task.parent_ids:
            if parent_id not in task_by_id:
                raise ValueError(f"Task with id {parent_id} doesn't exist.")
            parent = task_by_id[parent_id]
            if task.id not in parent.child_ids:
                parent.child_ids.append(task.id)

        # Ensure children include this task as a parent
        for child_id in task.child_ids:
            if child_id not in task_by_id:
                raise ValueError(f"Task with id {child_id} doesn't exist.")
            child = task_by_id[child_id]
            if task.id not in child.parent_ids:
                child.parent_ids.append(task.id)

run(*args)

Source code in griptape/structures/structure.py
@observable
def run(self, *args) -> Structure:
    self.before_run(args)

    result = self.try_run(*args)

    self.after_run()

    return result

run_stream(*args, event_types=None)

Source code in griptape/structures/structure.py
@observable
def run_stream(self, *args, event_types: Optional[list[type[BaseEvent]]] = None) -> Iterator[BaseEvent]:
    if event_types is None:
        event_types = [BaseEvent]
    else:
        if FinishStructureRunEvent not in event_types:
            event_types = [*event_types, FinishStructureRunEvent]

    with EventListener(self._event_queue.put, event_types=event_types):
        t = Thread(target=with_contextvars(self.run), args=args)
        t.start()

        while True:
            event = self._event_queue.get()
            if isinstance(event, FinishStructureRunEvent) and event.structure_id == self.id:
                break
            else:
                yield event
        t.join()

try_find_task(task_id)

Source code in griptape/structures/structure.py
def try_find_task(self, task_id: str) -> Optional[BaseTask]:
    for task in self.tasks:
        if task.id == task_id:
            return task
    return None

try_run(*args) abstractmethod

Source code in griptape/structures/structure.py
@abstractmethod
def try_run(self, *args) -> Structure: ...

Workflow

Bases: Structure, FuturesExecutorMixin

Source code in griptape/structures/workflow.py
@define
class Workflow(Structure, FuturesExecutorMixin):
    @property
    def input_task(self) -> Optional[BaseTask]:
        return self.order_tasks()[0] if self.tasks else None

    @property
    def output_task(self) -> Optional[BaseTask]:
        return self.order_tasks()[-1] if self.tasks else None

    @property
    def input_tasks(self) -> list[BaseTask]:
        return [task for task in self.tasks if not task.parents]

    @property
    def output_tasks(self) -> list[BaseTask]:
        return [task for task in self.tasks if not task.children]

    @property
    def outputs(self) -> list[BaseArtifact]:
        return [task.output for task in self.output_tasks if task.output is not None]

    def add_task(self, task: BaseTask) -> BaseTask:
        if (existing_task := self.try_find_task(task.id)) is not None:
            return existing_task

        task.preprocess(self)

        self._tasks.append(task)

        return task

    def insert_tasks(
        self,
        parent_tasks: BaseTask | list[BaseTask],
        tasks: BaseTask | list[BaseTask],
        child_tasks: BaseTask | list[BaseTask],
        *,
        preserve_relationship: bool = False,
    ) -> list[BaseTask]:
        """Insert tasks between parent and child tasks in the workflow.

        Args:
            parent_tasks: The tasks that will be the parents of the new tasks.
            tasks: The tasks to insert between the parent and child tasks.
            child_tasks: The tasks that will be the children of the new tasks.
            preserve_relationship: Whether to preserve the parent/child relationship when inserting between parent and child tasks.
        """
        if not isinstance(parent_tasks, list):
            parent_tasks = [parent_tasks]
        if not isinstance(tasks, list):
            tasks = [tasks]
        if not isinstance(child_tasks, list):
            child_tasks = [child_tasks]

        for task in tasks:
            self.insert_task(parent_tasks, task, child_tasks, preserve_relationship=preserve_relationship)

        return tasks

    def insert_task(
        self,
        parent_tasks: list[BaseTask],
        task: BaseTask,
        child_tasks: list[BaseTask],
        *,
        preserve_relationship: bool = False,
    ) -> BaseTask:
        task.preprocess(self)

        self.__link_task_to_children(task, child_tasks)

        if not preserve_relationship:
            self.__remove_old_parent_child_relationships(parent_tasks, child_tasks)

        last_parent_index = self.__link_task_to_parents(task, parent_tasks)

        # Insert the new task once, just after the last parent task
        self._tasks.insert(last_parent_index + 1, task)

        return task

    @observable
    def try_run(self, *args) -> Workflow:
        exit_loop = False

        with self.create_futures_executor() as futures_executor:
            while not self.is_finished() and not exit_loop:
                futures_list = {}
                ordered_tasks = self.order_tasks()

                for task in ordered_tasks:
                    if task.can_run():
                        future = futures_executor.submit(with_contextvars(task.run))
                        futures_list[future] = task

                # Wait for all tasks to complete
                for future in futures.as_completed(futures_list):
                    if isinstance(future.result(), ErrorArtifact) and self.fail_fast:
                        exit_loop = True

                        break

            return self

    def context(self, task: BaseTask) -> dict[str, Any]:
        context = super().context(task)

        context.update(
            {
                "task_outputs": self.task_outputs,
                "parent_outputs": task.parent_outputs,
                "parents_output_text": task.parents_output_text,
                "parents": {parent.id: parent for parent in task.parents},
                "children": {child.id: child for child in task.children},
            },
        )

        return context

    def to_graph(self) -> dict[str, set[str]]:
        graph: dict[str, set[str]] = {}

        for key_task in self.tasks:
            graph[key_task.id] = set()

            for value_task in self.tasks:
                if key_task.id in value_task.child_ids:
                    graph[key_task.id].add(value_task.id)

        return graph

    def order_tasks(self) -> list[BaseTask]:
        return [self.find_task(task_id) for task_id in TopologicalSorter(self.to_graph()).static_order()]

    def __link_task_to_children(self, task: BaseTask, child_tasks: list[BaseTask]) -> None:
        for child_task in child_tasks:
            # Link the new task to the child task
            if child_task.id not in task.child_ids:
                task.child_ids.append(child_task.id)
            if task.id not in child_task.parent_ids:
                child_task.parent_ids.append(task.id)

    def __remove_old_parent_child_relationships(
        self,
        parent_tasks: list[BaseTask],
        child_tasks: list[BaseTask],
    ) -> None:
        for parent_task in parent_tasks:
            for child_task in child_tasks:
                # Remove the old parent/child relationship
                if child_task.id in parent_task.child_ids:
                    parent_task.child_ids.remove(child_task.id)
                if parent_task.id in child_task.parent_ids:
                    child_task.parent_ids.remove(parent_task.id)

    def __link_task_to_parents(self, task: BaseTask, parent_tasks: list[BaseTask]) -> int:
        last_parent_index = -1
        for parent_task in parent_tasks:
            # Link the new task to the parent task
            if parent_task.id not in task.parent_ids:
                task.parent_ids.append(parent_task.id)
            if task.id not in parent_task.child_ids:
                parent_task.child_ids.append(task.id)

            try:
                parent_index = self.tasks.index(parent_task)
            except ValueError as exc:
                raise ValueError(f"Parent task {parent_task.id} not found in workflow.") from exc
            else:
                if parent_index > last_parent_index:
                    last_parent_index = parent_index

        return last_parent_index

input_task: Optional[BaseTask] property

input_tasks: list[BaseTask] property

output_task: Optional[BaseTask] property

output_tasks: list[BaseTask] property

outputs: list[BaseArtifact] property

Source code in griptape/structures/workflow.py
def __link_task_to_children(self, task: BaseTask, child_tasks: list[BaseTask]) -> None:
    for child_task in child_tasks:
        # Link the new task to the child task
        if child_task.id not in task.child_ids:
            task.child_ids.append(child_task.id)
        if task.id not in child_task.parent_ids:
            child_task.parent_ids.append(task.id)
Source code in griptape/structures/workflow.py
def __link_task_to_parents(self, task: BaseTask, parent_tasks: list[BaseTask]) -> int:
    last_parent_index = -1
    for parent_task in parent_tasks:
        # Link the new task to the parent task
        if parent_task.id not in task.parent_ids:
            task.parent_ids.append(parent_task.id)
        if task.id not in parent_task.child_ids:
            parent_task.child_ids.append(task.id)

        try:
            parent_index = self.tasks.index(parent_task)
        except ValueError as exc:
            raise ValueError(f"Parent task {parent_task.id} not found in workflow.") from exc
        else:
            if parent_index > last_parent_index:
                last_parent_index = parent_index

    return last_parent_index

__remove_old_parent_child_relationships(parent_tasks, child_tasks)

Source code in griptape/structures/workflow.py
def __remove_old_parent_child_relationships(
    self,
    parent_tasks: list[BaseTask],
    child_tasks: list[BaseTask],
) -> None:
    for parent_task in parent_tasks:
        for child_task in child_tasks:
            # Remove the old parent/child relationship
            if child_task.id in parent_task.child_ids:
                parent_task.child_ids.remove(child_task.id)
            if parent_task.id in child_task.parent_ids:
                child_task.parent_ids.remove(parent_task.id)

add_task(task)

Source code in griptape/structures/workflow.py
def add_task(self, task: BaseTask) -> BaseTask:
    if (existing_task := self.try_find_task(task.id)) is not None:
        return existing_task

    task.preprocess(self)

    self._tasks.append(task)

    return task

context(task)

Source code in griptape/structures/workflow.py
def context(self, task: BaseTask) -> dict[str, Any]:
    context = super().context(task)

    context.update(
        {
            "task_outputs": self.task_outputs,
            "parent_outputs": task.parent_outputs,
            "parents_output_text": task.parents_output_text,
            "parents": {parent.id: parent for parent in task.parents},
            "children": {child.id: child for child in task.children},
        },
    )

    return context

insert_task(parent_tasks, task, child_tasks, *, preserve_relationship=False)

Source code in griptape/structures/workflow.py
def insert_task(
    self,
    parent_tasks: list[BaseTask],
    task: BaseTask,
    child_tasks: list[BaseTask],
    *,
    preserve_relationship: bool = False,
) -> BaseTask:
    task.preprocess(self)

    self.__link_task_to_children(task, child_tasks)

    if not preserve_relationship:
        self.__remove_old_parent_child_relationships(parent_tasks, child_tasks)

    last_parent_index = self.__link_task_to_parents(task, parent_tasks)

    # Insert the new task once, just after the last parent task
    self._tasks.insert(last_parent_index + 1, task)

    return task

insert_tasks(parent_tasks, tasks, child_tasks, *, preserve_relationship=False)

Insert tasks between parent and child tasks in the workflow.

Parameters:

Name Type Description Default
parent_tasks BaseTask | list[BaseTask]

The tasks that will be the parents of the new tasks.

required
tasks BaseTask | list[BaseTask]

The tasks to insert between the parent and child tasks.

required
child_tasks BaseTask | list[BaseTask]

The tasks that will be the children of the new tasks.

required
preserve_relationship bool

Whether to preserve the parent/child relationship when inserting between parent and child tasks.

False
Source code in griptape/structures/workflow.py
def insert_tasks(
    self,
    parent_tasks: BaseTask | list[BaseTask],
    tasks: BaseTask | list[BaseTask],
    child_tasks: BaseTask | list[BaseTask],
    *,
    preserve_relationship: bool = False,
) -> list[BaseTask]:
    """Insert tasks between parent and child tasks in the workflow.

    Args:
        parent_tasks: The tasks that will be the parents of the new tasks.
        tasks: The tasks to insert between the parent and child tasks.
        child_tasks: The tasks that will be the children of the new tasks.
        preserve_relationship: Whether to preserve the parent/child relationship when inserting between parent and child tasks.
    """
    if not isinstance(parent_tasks, list):
        parent_tasks = [parent_tasks]
    if not isinstance(tasks, list):
        tasks = [tasks]
    if not isinstance(child_tasks, list):
        child_tasks = [child_tasks]

    for task in tasks:
        self.insert_task(parent_tasks, task, child_tasks, preserve_relationship=preserve_relationship)

    return tasks

order_tasks()

Source code in griptape/structures/workflow.py
def order_tasks(self) -> list[BaseTask]:
    return [self.find_task(task_id) for task_id in TopologicalSorter(self.to_graph()).static_order()]

to_graph()

Source code in griptape/structures/workflow.py
def to_graph(self) -> dict[str, set[str]]:
    graph: dict[str, set[str]] = {}

    for key_task in self.tasks:
        graph[key_task.id] = set()

        for value_task in self.tasks:
            if key_task.id in value_task.child_ids:
                graph[key_task.id].add(value_task.id)

    return graph

try_run(*args)

Source code in griptape/structures/workflow.py
@observable
def try_run(self, *args) -> Workflow:
    exit_loop = False

    with self.create_futures_executor() as futures_executor:
        while not self.is_finished() and not exit_loop:
            futures_list = {}
            ordered_tasks = self.order_tasks()

            for task in ordered_tasks:
                if task.can_run():
                    future = futures_executor.submit(with_contextvars(task.run))
                    futures_list[future] = task

            # Wait for all tasks to complete
            for future in futures.as_completed(futures_list):
                if isinstance(future.result(), ErrorArtifact) and self.fail_fast:
                    exit_loop = True

                    break

        return self