Skip to content

Structures

__all__ = ['Structure', 'Agent', 'Pipeline', 'Workflow'] module-attribute

Agent

Bases: Structure

Source code in griptape/structures/agent.py
@define
class Agent(Structure):
    input_template: str = field(default=PromptTask.DEFAULT_INPUT_TEMPLATE)
    tools: list[BaseTool] = field(factory=list, kw_only=True)
    max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True)

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()
        if len(self.tasks) == 0:
            if self.tools:
                task = ToolkitTask(
                    self.input_template, tools=self.tools, max_meta_memory_entries=self.max_meta_memory_entries
                )
            else:
                task = PromptTask(self.input_template, max_meta_memory_entries=self.max_meta_memory_entries)

            self.add_task(task)

    @property
    def task(self) -> BaseTask:
        return self.tasks[0]

    def add_task(self, task: BaseTask) -> BaseTask:
        self.tasks.clear()

        task.preprocess(self)

        self.tasks.append(task)

        return task

    def add_tasks(self, *tasks: BaseTask) -> list[BaseTask]:
        if len(tasks) > 1:
            raise ValueError("Agents can only have one task.")
        return super().add_tasks(*tasks)

    def try_run(self, *args) -> Agent:
        self._execution_args = args

        self.task.reset()

        self.task.execute()

        if self.conversation_memory:
            if isinstance(self.task.input, tuple):
                input_text = self.task.input[0].to_text()
            else:
                input_text = self.task.input.to_text()

            run = Run(input=input_text, output=self.task.output.to_text())

            self.conversation_memory.add_run(run)

        return self

input_template: str = field(default=PromptTask.DEFAULT_INPUT_TEMPLATE) class-attribute instance-attribute

max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True) class-attribute instance-attribute

task: BaseTask property

tools: list[BaseTool] = field(factory=list, kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/structures/agent.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()
    if len(self.tasks) == 0:
        if self.tools:
            task = ToolkitTask(
                self.input_template, tools=self.tools, max_meta_memory_entries=self.max_meta_memory_entries
            )
        else:
            task = PromptTask(self.input_template, max_meta_memory_entries=self.max_meta_memory_entries)

        self.add_task(task)

add_task(task)

Source code in griptape/structures/agent.py
def add_task(self, task: BaseTask) -> BaseTask:
    self.tasks.clear()

    task.preprocess(self)

    self.tasks.append(task)

    return task

add_tasks(*tasks)

Source code in griptape/structures/agent.py
def add_tasks(self, *tasks: BaseTask) -> list[BaseTask]:
    if len(tasks) > 1:
        raise ValueError("Agents can only have one task.")
    return super().add_tasks(*tasks)

try_run(*args)

Source code in griptape/structures/agent.py
def try_run(self, *args) -> Agent:
    self._execution_args = args

    self.task.reset()

    self.task.execute()

    if self.conversation_memory:
        if isinstance(self.task.input, tuple):
            input_text = self.task.input[0].to_text()
        else:
            input_text = self.task.input.to_text()

        run = Run(input=input_text, output=self.task.output.to_text())

        self.conversation_memory.add_run(run)

    return self

Pipeline

Bases: Structure

Source code in griptape/structures/pipeline.py
@define
class Pipeline(Structure):
    def add_task(self, task: BaseTask) -> BaseTask:
        task.preprocess(self)

        if self.output_task:
            self.output_task.child_ids.append(task.id)
            task.parent_ids.append(self.output_task.id)

        self.tasks.append(task)

        return task

    def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask:
        task.preprocess(self)

        if parent_task.children:
            child_task = parent_task.children[0]

            task.child_ids.append(child_task.id)
            child_task.parent_ids.append(task.id)

            child_task.parent_ids.remove(parent_task.id)
            parent_task.child_ids.remove(child_task.id)

        task.parent_ids.append(parent_task.id)
        parent_task.child_ids.append(task.id)

        parent_index = self.tasks.index(parent_task)
        self.tasks.insert(parent_index + 1, task)

        return task

    def try_run(self, *args) -> Pipeline:
        self._execution_args = args

        [task.reset() for task in self.tasks]

        self.__run_from_task(self.input_task)

        if self.conversation_memory:
            if isinstance(self.input_task.input, tuple):
                input_text = self.input_task.input[0].to_text()
            else:
                input_text = self.input_task.input.to_text()

            run = Run(input=input_text, output=self.output_task.output.to_text())

            self.conversation_memory.add_run(run)

        return self

    def context(self, task: BaseTask) -> dict[str, Any]:
        context = super().context(task)

        context.update(
            {
                "parent_output": task.parents[0].output.to_text() if task.parents and task.parents[0].output else None,
                "parent": task.parents[0] if task.parents else None,
                "child": task.children[0] if task.children else None,
            }
        )

        return context

    def __run_from_task(self, task: Optional[BaseTask]) -> None:
        if task is None:
            return
        else:
            if isinstance(task.execute(), ErrorArtifact):
                return
            else:
                self.__run_from_task(next(iter(task.children), None))

__run_from_task(task)

Source code in griptape/structures/pipeline.py
def __run_from_task(self, task: Optional[BaseTask]) -> None:
    if task is None:
        return
    else:
        if isinstance(task.execute(), ErrorArtifact):
            return
        else:
            self.__run_from_task(next(iter(task.children), None))

add_task(task)

Source code in griptape/structures/pipeline.py
def add_task(self, task: BaseTask) -> BaseTask:
    task.preprocess(self)

    if self.output_task:
        self.output_task.child_ids.append(task.id)
        task.parent_ids.append(self.output_task.id)

    self.tasks.append(task)

    return task

context(task)

Source code in griptape/structures/pipeline.py
def context(self, task: BaseTask) -> dict[str, Any]:
    context = super().context(task)

    context.update(
        {
            "parent_output": task.parents[0].output.to_text() if task.parents and task.parents[0].output else None,
            "parent": task.parents[0] if task.parents else None,
            "child": task.children[0] if task.children else None,
        }
    )

    return context

insert_task(parent_task, task)

Source code in griptape/structures/pipeline.py
def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask:
    task.preprocess(self)

    if parent_task.children:
        child_task = parent_task.children[0]

        task.child_ids.append(child_task.id)
        child_task.parent_ids.append(task.id)

        child_task.parent_ids.remove(parent_task.id)
        parent_task.child_ids.remove(child_task.id)

    task.parent_ids.append(parent_task.id)
    parent_task.child_ids.append(task.id)

    parent_index = self.tasks.index(parent_task)
    self.tasks.insert(parent_index + 1, task)

    return task

try_run(*args)

Source code in griptape/structures/pipeline.py
def try_run(self, *args) -> Pipeline:
    self._execution_args = args

    [task.reset() for task in self.tasks]

    self.__run_from_task(self.input_task)

    if self.conversation_memory:
        if isinstance(self.input_task.input, tuple):
            input_text = self.input_task.input[0].to_text()
        else:
            input_text = self.input_task.input.to_text()

        run = Run(input=input_text, output=self.output_task.output.to_text())

        self.conversation_memory.add_run(run)

    return self

Structure

Bases: ABC

Source code in griptape/structures/structure.py
@define
class Structure(ABC):
    LOGGER_NAME = "griptape"

    id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True)
    stream: Optional[bool] = field(default=None, kw_only=True)
    prompt_driver: Optional[BasePromptDriver] = field(default=None)
    embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True)
    config: BaseStructureConfig = field(
        default=Factory(lambda self: self.default_config, takes_self=True), kw_only=True
    )
    rulesets: list[Ruleset] = field(factory=list, kw_only=True)
    rules: list[Rule] = field(factory=list, kw_only=True)
    tasks: list[BaseTask] = field(factory=list, kw_only=True)
    custom_logger: Optional[Logger] = field(default=None, kw_only=True)
    logger_level: int = field(default=logging.INFO, kw_only=True)
    event_listeners: list[EventListener] = field(factory=list, kw_only=True)
    conversation_memory: Optional[BaseConversationMemory] = field(
        default=Factory(
            lambda self: ConversationMemory(driver=self.config.global_drivers.conversation_memory_driver),
            takes_self=True,
        ),
        kw_only=True,
    )
    task_memory: Optional[TaskMemory] = field(
        default=Factory(lambda self: self.default_task_memory, takes_self=True), kw_only=True
    )
    meta_memory: MetaMemory = field(default=Factory(lambda: MetaMemory()), kw_only=True)
    _execution_args: tuple = ()
    _logger: Optional[Logger] = None

    @rulesets.validator  # pyright: ignore
    def validate_rulesets(self, _, rulesets: list[Ruleset]) -> None:
        if not rulesets:
            return

        if self.rules:
            raise ValueError("can't have both rulesets and rules specified")

    @rules.validator  # pyright: ignore
    def validate_rules(self, _, rules: list[Rule]) -> None:
        if not rules:
            return

        if self.rulesets:
            raise ValueError("can't have both rules and rulesets specified")

    def __attrs_post_init__(self) -> None:
        if self.conversation_memory:
            self.conversation_memory.structure = self

        tasks = self.tasks.copy()
        self.tasks.clear()
        self.add_tasks(*tasks)

    def __add__(self, other: BaseTask | list[BaseTask]) -> list[BaseTask]:
        return self.add_tasks(*other) if isinstance(other, list) else self + [other]

    @prompt_driver.validator  # pyright: ignore
    def validate_prompt_driver(self, attribute, value):
        if value is not None:
            deprecation_warn(f"`{attribute.name}` is deprecated, use `config.global_drivers.prompt_driver` instead.")

    @embedding_driver.validator  # pyright: ignore
    def validate_embedding_driver(self, attribute, value):
        if value is not None:
            deprecation_warn(f"`{attribute.name}` is deprecated, use `config.global_drivers.embedding_driver` instead.")

    @stream.validator  # pyright: ignore
    def validate_stream(self, attribute, value):
        if value is not None:
            deprecation_warn(
                f"`{attribute.name}` is deprecated, use `config.global_drivers.prompt_driver.stream` instead."
            )

    @property
    def execution_args(self) -> tuple:
        return self._execution_args

    @property
    def logger(self) -> Logger:
        if self.custom_logger:
            return self.custom_logger
        else:
            if self._logger is None:
                self._logger = logging.getLogger(self.LOGGER_NAME)

                self._logger.propagate = False
                self._logger.level = self.logger_level

                self._logger.handlers = [RichHandler(show_time=True, show_path=False)]
            return self._logger

    @property
    def input_task(self) -> Optional[BaseTask]:
        return self.tasks[0] if self.tasks else None

    @property
    def output_task(self) -> Optional[BaseTask]:
        return self.tasks[-1] if self.tasks else None

    @property
    def finished_tasks(self) -> list[BaseTask]:
        return [s for s in self.tasks if s.is_finished()]

    @property
    def default_config(self) -> BaseStructureConfig:
        if self.prompt_driver is not None or self.embedding_driver is not None:
            config = StructureConfig()

            if self.prompt_driver is None:
                prompt_driver = OpenAiChatPromptDriver(model="gpt-4")
            else:
                prompt_driver = self.prompt_driver

            if self.embedding_driver is None:
                embedding_driver = OpenAiEmbeddingDriver()
            else:
                embedding_driver = self.embedding_driver

            if self.stream is not None:
                prompt_driver.stream = self.stream

            vector_store_driver = LocalVectorStoreDriver(embedding_driver=embedding_driver)

            config.global_drivers.prompt_driver = prompt_driver
            config.global_drivers.vector_store_driver = vector_store_driver
            config.global_drivers.embedding_driver = embedding_driver

            config.task_memory.query_engine.prompt_driver = prompt_driver
            config.task_memory.query_engine.vector_store_driver = vector_store_driver
            config.task_memory.summary_engine.prompt_driver = prompt_driver
            config.task_memory.extraction_engine.csv.prompt_driver = prompt_driver
            config.task_memory.extraction_engine.json.prompt_driver = prompt_driver
        else:
            config = OpenAiStructureConfig()

        return config

    @property
    def default_task_memory(self) -> TaskMemory:
        global_drivers = self.config.global_drivers
        task_memory = self.config.task_memory

        return TaskMemory(
            artifact_storages={
                TextArtifact: TextArtifactStorage(
                    query_engine=VectorQueryEngine(
                        prompt_driver=(
                            global_drivers.prompt_driver
                            if isinstance(task_memory.query_engine.prompt_driver, DummyPromptDriver)
                            else task_memory.query_engine.prompt_driver
                        ),
                        vector_store_driver=(
                            global_drivers.vector_store_driver
                            if isinstance(task_memory.query_engine.prompt_driver, DummyVectorStoreDriver)
                            else task_memory.query_engine.vector_store_driver
                        ),
                    ),
                    summary_engine=PromptSummaryEngine(
                        prompt_driver=(
                            global_drivers.prompt_driver
                            if isinstance(task_memory.summary_engine.prompt_driver, DummyPromptDriver)
                            else task_memory.summary_engine.prompt_driver
                        )
                    ),
                    csv_extraction_engine=CsvExtractionEngine(
                        prompt_driver=(
                            global_drivers.prompt_driver
                            if isinstance(task_memory.extraction_engine.csv.prompt_driver, DummyPromptDriver)
                            else task_memory.extraction_engine.csv.prompt_driver
                        )
                    ),
                    json_extraction_engine=JsonExtractionEngine(
                        prompt_driver=(
                            global_drivers.prompt_driver
                            if isinstance(task_memory.extraction_engine.json.prompt_driver, DummyPromptDriver)
                            else task_memory.extraction_engine.json.prompt_driver
                        )
                    ),
                ),
                BlobArtifact: BlobArtifactStorage(),
            }
        )

    def is_finished(self) -> bool:
        return all(s.is_finished() for s in self.tasks)

    def is_executing(self) -> bool:
        return any(s for s in self.tasks if s.is_executing())

    def find_task(self, task_id: str) -> BaseTask:
        for task in self.tasks:
            if task.id == task_id:
                return task
        raise ValueError(f"Task with id {task_id} doesn't exist.")

    def add_tasks(self, *tasks: BaseTask) -> list[BaseTask]:
        return [self.add_task(s) for s in tasks]

    def add_event_listener(self, event_listener: EventListener) -> EventListener:
        if event_listener not in self.event_listeners:
            self.event_listeners.append(event_listener)

        return event_listener

    def remove_event_listener(self, event_listener: EventListener) -> None:
        if event_listener in self.event_listeners:
            self.event_listeners.remove(event_listener)
        else:
            raise ValueError("Event Listener not found.")

    def publish_event(self, event: BaseEvent) -> None:
        for event_listener in self.event_listeners:
            event_listener.publish_event(event)

    def context(self, task: BaseTask) -> dict[str, Any]:
        return {"args": self.execution_args, "structure": self}

    def before_run(self) -> None:
        self.publish_event(
            StartStructureRunEvent(
                structure_id=self.id, input_task_input=self.input_task.input, input_task_output=self.input_task.output
            )
        )

    def after_run(self) -> None:
        self.publish_event(
            FinishStructureRunEvent(
                structure_id=self.id,
                output_task_input=self.output_task.input,
                output_task_output=self.output_task.output,
            )
        )

    @abstractmethod
    def add_task(self, task: BaseTask) -> BaseTask:
        ...

    def run(self, *args) -> Structure:
        self.before_run()

        result = self.try_run(*args)

        self.after_run()

        return result

    @abstractmethod
    def try_run(self, *args) -> Structure:
        ...

LOGGER_NAME = 'griptape' class-attribute instance-attribute

config: BaseStructureConfig = field(default=Factory(lambda self: self.default_config, takes_self=True), kw_only=True) class-attribute instance-attribute

conversation_memory: Optional[BaseConversationMemory] = field(default=Factory(lambda self: ConversationMemory(driver=self.config.global_drivers.conversation_memory_driver), takes_self=True), kw_only=True) class-attribute instance-attribute

custom_logger: Optional[Logger] = field(default=None, kw_only=True) class-attribute instance-attribute

default_config: BaseStructureConfig property

default_task_memory: TaskMemory property

embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True) class-attribute instance-attribute

event_listeners: list[EventListener] = field(factory=list, kw_only=True) class-attribute instance-attribute

execution_args: tuple property

finished_tasks: list[BaseTask] property

id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True) class-attribute instance-attribute

input_task: Optional[BaseTask] property

logger: Logger property

logger_level: int = field(default=logging.INFO, kw_only=True) class-attribute instance-attribute

meta_memory: MetaMemory = field(default=Factory(lambda: MetaMemory()), kw_only=True) class-attribute instance-attribute

output_task: Optional[BaseTask] property

prompt_driver: Optional[BasePromptDriver] = field(default=None) class-attribute instance-attribute

rules: list[Rule] = field(factory=list, kw_only=True) class-attribute instance-attribute

rulesets: list[Ruleset] = field(factory=list, kw_only=True) class-attribute instance-attribute

stream: Optional[bool] = field(default=None, kw_only=True) class-attribute instance-attribute

task_memory: Optional[TaskMemory] = field(default=Factory(lambda self: self.default_task_memory, takes_self=True), kw_only=True) class-attribute instance-attribute

tasks: list[BaseTask] = field(factory=list, kw_only=True) class-attribute instance-attribute

__add__(other)

Source code in griptape/structures/structure.py
def __add__(self, other: BaseTask | list[BaseTask]) -> list[BaseTask]:
    return self.add_tasks(*other) if isinstance(other, list) else self + [other]

__attrs_post_init__()

Source code in griptape/structures/structure.py
def __attrs_post_init__(self) -> None:
    if self.conversation_memory:
        self.conversation_memory.structure = self

    tasks = self.tasks.copy()
    self.tasks.clear()
    self.add_tasks(*tasks)

add_event_listener(event_listener)

Source code in griptape/structures/structure.py
def add_event_listener(self, event_listener: EventListener) -> EventListener:
    if event_listener not in self.event_listeners:
        self.event_listeners.append(event_listener)

    return event_listener

add_task(task) abstractmethod

Source code in griptape/structures/structure.py
@abstractmethod
def add_task(self, task: BaseTask) -> BaseTask:
    ...

add_tasks(*tasks)

Source code in griptape/structures/structure.py
def add_tasks(self, *tasks: BaseTask) -> list[BaseTask]:
    return [self.add_task(s) for s in tasks]

after_run()

Source code in griptape/structures/structure.py
def after_run(self) -> None:
    self.publish_event(
        FinishStructureRunEvent(
            structure_id=self.id,
            output_task_input=self.output_task.input,
            output_task_output=self.output_task.output,
        )
    )

before_run()

Source code in griptape/structures/structure.py
def before_run(self) -> None:
    self.publish_event(
        StartStructureRunEvent(
            structure_id=self.id, input_task_input=self.input_task.input, input_task_output=self.input_task.output
        )
    )

context(task)

Source code in griptape/structures/structure.py
def context(self, task: BaseTask) -> dict[str, Any]:
    return {"args": self.execution_args, "structure": self}

find_task(task_id)

Source code in griptape/structures/structure.py
def find_task(self, task_id: str) -> BaseTask:
    for task in self.tasks:
        if task.id == task_id:
            return task
    raise ValueError(f"Task with id {task_id} doesn't exist.")

is_executing()

Source code in griptape/structures/structure.py
def is_executing(self) -> bool:
    return any(s for s in self.tasks if s.is_executing())

is_finished()

Source code in griptape/structures/structure.py
def is_finished(self) -> bool:
    return all(s.is_finished() for s in self.tasks)

publish_event(event)

Source code in griptape/structures/structure.py
def publish_event(self, event: BaseEvent) -> None:
    for event_listener in self.event_listeners:
        event_listener.publish_event(event)

remove_event_listener(event_listener)

Source code in griptape/structures/structure.py
def remove_event_listener(self, event_listener: EventListener) -> None:
    if event_listener in self.event_listeners:
        self.event_listeners.remove(event_listener)
    else:
        raise ValueError("Event Listener not found.")

run(*args)

Source code in griptape/structures/structure.py
def run(self, *args) -> Structure:
    self.before_run()

    result = self.try_run(*args)

    self.after_run()

    return result

try_run(*args) abstractmethod

Source code in griptape/structures/structure.py
@abstractmethod
def try_run(self, *args) -> Structure:
    ...

validate_embedding_driver(attribute, value)

Source code in griptape/structures/structure.py
@embedding_driver.validator  # pyright: ignore
def validate_embedding_driver(self, attribute, value):
    if value is not None:
        deprecation_warn(f"`{attribute.name}` is deprecated, use `config.global_drivers.embedding_driver` instead.")

validate_prompt_driver(attribute, value)

Source code in griptape/structures/structure.py
@prompt_driver.validator  # pyright: ignore
def validate_prompt_driver(self, attribute, value):
    if value is not None:
        deprecation_warn(f"`{attribute.name}` is deprecated, use `config.global_drivers.prompt_driver` instead.")

validate_rules(_, rules)

Source code in griptape/structures/structure.py
@rules.validator  # pyright: ignore
def validate_rules(self, _, rules: list[Rule]) -> None:
    if not rules:
        return

    if self.rulesets:
        raise ValueError("can't have both rules and rulesets specified")

validate_rulesets(_, rulesets)

Source code in griptape/structures/structure.py
@rulesets.validator  # pyright: ignore
def validate_rulesets(self, _, rulesets: list[Ruleset]) -> None:
    if not rulesets:
        return

    if self.rules:
        raise ValueError("can't have both rulesets and rules specified")

validate_stream(attribute, value)

Source code in griptape/structures/structure.py
@stream.validator  # pyright: ignore
def validate_stream(self, attribute, value):
    if value is not None:
        deprecation_warn(
            f"`{attribute.name}` is deprecated, use `config.global_drivers.prompt_driver.stream` instead."
        )

Workflow

Bases: Structure

Source code in griptape/structures/workflow.py
@define
class Workflow(Structure):
    futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True)

    def add_task(self, task: BaseTask) -> BaseTask:
        task.preprocess(self)

        if self.output_task:
            self.output_task.child_ids.append(task.id)
            task.parent_ids.append(self.output_task.id)

        self.tasks.append(task)

        return task

    def insert_tasks(
        self,
        parent_tasks: BaseTask | list[BaseTask],
        tasks: BaseTask | list[BaseTask],
        child_tasks: BaseTask | list[BaseTask],
        preserve_relationship: bool = False,
    ) -> list[BaseTask]:
        """Insert tasks between parent and child tasks in the workflow.

        Args:
            parent_tasks: The tasks that will be the parents of the new tasks.
            tasks: The tasks to insert between the parent and child tasks.
            child_tasks: The tasks that will be the children of the new tasks.
            preserve_relationship: Whether to preserve the parent/child relationship when inserting between parent and child tasks.
        """

        if not isinstance(parent_tasks, list):
            parent_tasks = [parent_tasks]
        if not isinstance(tasks, list):
            tasks = [tasks]
        if not isinstance(child_tasks, list):
            child_tasks = [child_tasks]

        for task in tasks:
            self.insert_task(parent_tasks, task, child_tasks, preserve_relationship)

        return tasks

    def insert_task(
        self,
        parent_tasks: list[BaseTask],
        task: BaseTask,
        child_tasks: list[BaseTask],
        preserve_relationship: bool = False,
    ) -> BaseTask:
        task.preprocess(self)

        for child_task in child_tasks:
            # Link the new task to the child task
            if child_task.id not in task.child_ids:
                task.child_ids.append(child_task.id)
            if task.id not in child_task.parent_ids:
                child_task.parent_ids.append(task.id)

        if not preserve_relationship:
            for parent_task in parent_tasks:
                for child_task in child_tasks:
                    # Remove the old parent/child relationship
                    if child_task.id in parent_task.child_ids:
                        parent_task.child_ids.remove(child_task.id)
                    if parent_task.id in child_task.parent_ids:
                        child_task.parent_ids.remove(parent_task.id)

        for parent_task in parent_tasks:
            # Link the new task to the parent task
            if parent_task.id not in task.parent_ids:
                task.parent_ids.append(parent_task.id)
            if task.id not in parent_task.child_ids:
                parent_task.child_ids.append(task.id)

            parent_index = self.tasks.index(parent_task)
            self.tasks.insert(parent_index + 1, task)

        return task

    def try_run(self, *args) -> Workflow:
        self._execution_args = args
        ordered_tasks = self.order_tasks()
        exit_loop = False

        while not self.is_finished() and not exit_loop:
            futures_list = {}

            for task in ordered_tasks:
                if task.can_execute():
                    future = self.futures_executor.submit(task.execute)
                    futures_list[future] = task

            # Wait for all tasks to complete
            for future in futures.as_completed(futures_list):
                if isinstance(future.result(), ErrorArtifact):
                    exit_loop = True

                    break

        if self.conversation_memory:
            if isinstance(self.input_task.input, tuple):
                input_text = self.input_task.input[0].to_text()
            else:
                input_text = self.input_task.input.to_text()

            run = Run(input=input_text, output=self.output_task.output.to_text())

            self.conversation_memory.add_run(run)

        return self

    def context(self, task: BaseTask) -> dict[str, Any]:
        context = super().context(task)

        context.update(
            {
                "parent_outputs": {
                    parent.id: parent.output.to_text() if parent.output else "" for parent in task.parents
                },
                "parents": {parent.id: parent for parent in task.parents},
                "children": {child.id: child for child in task.children},
            }
        )

        return context

    def to_graph(self) -> dict[str, set[str]]:
        graph: dict[str, set[str]] = {}

        for key_task in self.tasks:
            graph[key_task.id] = set()

            for value_task in self.tasks:
                if key_task.id in value_task.child_ids:
                    graph[key_task.id].add(value_task.id)

        return graph

    def order_tasks(self) -> list[BaseTask]:
        return [self.find_task(task_id) for task_id in TopologicalSorter(self.to_graph()).static_order()]

futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True) class-attribute instance-attribute

add_task(task)

Source code in griptape/structures/workflow.py
def add_task(self, task: BaseTask) -> BaseTask:
    task.preprocess(self)

    if self.output_task:
        self.output_task.child_ids.append(task.id)
        task.parent_ids.append(self.output_task.id)

    self.tasks.append(task)

    return task

context(task)

Source code in griptape/structures/workflow.py
def context(self, task: BaseTask) -> dict[str, Any]:
    context = super().context(task)

    context.update(
        {
            "parent_outputs": {
                parent.id: parent.output.to_text() if parent.output else "" for parent in task.parents
            },
            "parents": {parent.id: parent for parent in task.parents},
            "children": {child.id: child for child in task.children},
        }
    )

    return context

insert_task(parent_tasks, task, child_tasks, preserve_relationship=False)

Source code in griptape/structures/workflow.py
def insert_task(
    self,
    parent_tasks: list[BaseTask],
    task: BaseTask,
    child_tasks: list[BaseTask],
    preserve_relationship: bool = False,
) -> BaseTask:
    task.preprocess(self)

    for child_task in child_tasks:
        # Link the new task to the child task
        if child_task.id not in task.child_ids:
            task.child_ids.append(child_task.id)
        if task.id not in child_task.parent_ids:
            child_task.parent_ids.append(task.id)

    if not preserve_relationship:
        for parent_task in parent_tasks:
            for child_task in child_tasks:
                # Remove the old parent/child relationship
                if child_task.id in parent_task.child_ids:
                    parent_task.child_ids.remove(child_task.id)
                if parent_task.id in child_task.parent_ids:
                    child_task.parent_ids.remove(parent_task.id)

    for parent_task in parent_tasks:
        # Link the new task to the parent task
        if parent_task.id not in task.parent_ids:
            task.parent_ids.append(parent_task.id)
        if task.id not in parent_task.child_ids:
            parent_task.child_ids.append(task.id)

        parent_index = self.tasks.index(parent_task)
        self.tasks.insert(parent_index + 1, task)

    return task

insert_tasks(parent_tasks, tasks, child_tasks, preserve_relationship=False)

Insert tasks between parent and child tasks in the workflow.

Parameters:

Name Type Description Default
parent_tasks BaseTask | list[BaseTask]

The tasks that will be the parents of the new tasks.

required
tasks BaseTask | list[BaseTask]

The tasks to insert between the parent and child tasks.

required
child_tasks BaseTask | list[BaseTask]

The tasks that will be the children of the new tasks.

required
preserve_relationship bool

Whether to preserve the parent/child relationship when inserting between parent and child tasks.

False
Source code in griptape/structures/workflow.py
def insert_tasks(
    self,
    parent_tasks: BaseTask | list[BaseTask],
    tasks: BaseTask | list[BaseTask],
    child_tasks: BaseTask | list[BaseTask],
    preserve_relationship: bool = False,
) -> list[BaseTask]:
    """Insert tasks between parent and child tasks in the workflow.

    Args:
        parent_tasks: The tasks that will be the parents of the new tasks.
        tasks: The tasks to insert between the parent and child tasks.
        child_tasks: The tasks that will be the children of the new tasks.
        preserve_relationship: Whether to preserve the parent/child relationship when inserting between parent and child tasks.
    """

    if not isinstance(parent_tasks, list):
        parent_tasks = [parent_tasks]
    if not isinstance(tasks, list):
        tasks = [tasks]
    if not isinstance(child_tasks, list):
        child_tasks = [child_tasks]

    for task in tasks:
        self.insert_task(parent_tasks, task, child_tasks, preserve_relationship)

    return tasks

order_tasks()

Source code in griptape/structures/workflow.py
def order_tasks(self) -> list[BaseTask]:
    return [self.find_task(task_id) for task_id in TopologicalSorter(self.to_graph()).static_order()]

to_graph()

Source code in griptape/structures/workflow.py
def to_graph(self) -> dict[str, set[str]]:
    graph: dict[str, set[str]] = {}

    for key_task in self.tasks:
        graph[key_task.id] = set()

        for value_task in self.tasks:
            if key_task.id in value_task.child_ids:
                graph[key_task.id].add(value_task.id)

    return graph

try_run(*args)

Source code in griptape/structures/workflow.py
def try_run(self, *args) -> Workflow:
    self._execution_args = args
    ordered_tasks = self.order_tasks()
    exit_loop = False

    while not self.is_finished() and not exit_loop:
        futures_list = {}

        for task in ordered_tasks:
            if task.can_execute():
                future = self.futures_executor.submit(task.execute)
                futures_list[future] = task

        # Wait for all tasks to complete
        for future in futures.as_completed(futures_list):
            if isinstance(future.result(), ErrorArtifact):
                exit_loop = True

                break

    if self.conversation_memory:
        if isinstance(self.input_task.input, tuple):
            input_text = self.input_task.input[0].to_text()
        else:
            input_text = self.input_task.input.to_text()

        run = Run(input=input_text, output=self.output_task.output.to_text())

        self.conversation_memory.add_run(run)

    return self