Skip to content

Pipeline

Pipeline

Bases: Structure

Source code in griptape/structures/pipeline.py
@define
class Pipeline(Structure):
    def add_task(self, task: BaseTask) -> BaseTask:
        task.preprocess(self)

        if self.output_task:
            self.output_task.child_ids.append(task.id)
            task.parent_ids.append(self.output_task.id)

        self.tasks.append(task)

        return task

    def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask:
        task.preprocess(self)

        if parent_task.children:
            child_task = parent_task.children[0]

            task.child_ids.append(child_task.id)
            child_task.parent_ids.append(task.id)

            child_task.parent_ids.remove(parent_task.id)
            parent_task.child_ids.remove(child_task.id)

        task.parent_ids.append(parent_task.id)
        parent_task.child_ids.append(task.id)

        parent_index = self.tasks.index(parent_task)
        self.tasks.insert(parent_index + 1, task)

        return task

    def try_run(self, *args) -> Pipeline:
        self._execution_args = args

        [task.reset() for task in self.tasks]

        self.__run_from_task(self.input_task)

        if self.conversation_memory:
            if isinstance(self.input_task.input, tuple):
                input_text = self.input_task.input[0].to_text()
            else:
                input_text = self.input_task.input.to_text()

            run = Run(input=input_text, output=self.output_task.output.to_text())

            self.conversation_memory.add_run(run)

        return self

    def context(self, task: BaseTask) -> dict[str, Any]:
        context = super().context(task)

        context.update(
            {
                "parent_output": task.parents[0].output.to_text() if task.parents and task.parents[0].output else None,
                "parent": task.parents[0] if task.parents else None,
                "child": task.children[0] if task.children else None,
            }
        )

        return context

    def __run_from_task(self, task: Optional[BaseTask]) -> None:
        if task is None:
            return
        else:
            if isinstance(task.execute(), ErrorArtifact):
                return
            else:
                self.__run_from_task(next(iter(task.children), None))

__run_from_task(task)

Source code in griptape/structures/pipeline.py
def __run_from_task(self, task: Optional[BaseTask]) -> None:
    if task is None:
        return
    else:
        if isinstance(task.execute(), ErrorArtifact):
            return
        else:
            self.__run_from_task(next(iter(task.children), None))

add_task(task)

Source code in griptape/structures/pipeline.py
def add_task(self, task: BaseTask) -> BaseTask:
    task.preprocess(self)

    if self.output_task:
        self.output_task.child_ids.append(task.id)
        task.parent_ids.append(self.output_task.id)

    self.tasks.append(task)

    return task

context(task)

Source code in griptape/structures/pipeline.py
def context(self, task: BaseTask) -> dict[str, Any]:
    context = super().context(task)

    context.update(
        {
            "parent_output": task.parents[0].output.to_text() if task.parents and task.parents[0].output else None,
            "parent": task.parents[0] if task.parents else None,
            "child": task.children[0] if task.children else None,
        }
    )

    return context

insert_task(parent_task, task)

Source code in griptape/structures/pipeline.py
def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask:
    task.preprocess(self)

    if parent_task.children:
        child_task = parent_task.children[0]

        task.child_ids.append(child_task.id)
        child_task.parent_ids.append(task.id)

        child_task.parent_ids.remove(parent_task.id)
        parent_task.child_ids.remove(child_task.id)

    task.parent_ids.append(parent_task.id)
    parent_task.child_ids.append(task.id)

    parent_index = self.tasks.index(parent_task)
    self.tasks.insert(parent_index + 1, task)

    return task

try_run(*args)

Source code in griptape/structures/pipeline.py
def try_run(self, *args) -> Pipeline:
    self._execution_args = args

    [task.reset() for task in self.tasks]

    self.__run_from_task(self.input_task)

    if self.conversation_memory:
        if isinstance(self.input_task.input, tuple):
            input_text = self.input_task.input[0].to_text()
        else:
            input_text = self.input_task.input.to_text()

        run = Run(input=input_text, output=self.output_task.output.to_text())

        self.conversation_memory.add_run(run)

    return self