Skip to content

Tasks

__all__ = ['BaseTask', 'BaseTextInputTask', 'BaseMultiTextInputTask', 'PromptTask', 'ActionsSubtask', 'ToolkitTask', 'TextSummaryTask', 'ToolTask', 'TextQueryTask', 'ExtractionTask', 'CsvExtractionTask', 'JsonExtractionTask', 'BaseImageGenerationTask', 'CodeExecutionTask', 'PromptImageGenerationTask', 'VariationImageGenerationTask', 'InpaintingImageGenerationTask', 'OutpaintingImageGenerationTask', 'ImageQueryTask', 'BaseAudioGenerationTask', 'TextToSpeechTask', 'StructureRunTask', 'AudioTranscriptionTask'] module-attribute

ActionsSubtask

Bases: BaseTextInputTask

Source code in griptape/tasks/actions_subtask.py
@define
class ActionsSubtask(BaseTextInputTask):
    @define(kw_only=True)
    class Action:
        tag: str = field()
        name: str = field()
        path: Optional[str] = field(default=None)
        input: dict = field()
        tool: Optional[BaseTool] = field(default=None)

    THOUGHT_PATTERN = r"(?s)^Thought:\s*(.*?)$"
    ACTIONS_PATTERN = r"(?s)Actions:[^\[]*(\[.*\])"
    ANSWER_PATTERN = r"(?s)^Answer:\s?([\s\S]*)$"

    parent_task_id: Optional[str] = field(default=None, kw_only=True)
    thought: Optional[str] = field(default=None, kw_only=True)
    actions: list[Action] = field(factory=list, kw_only=True)

    _input: Optional[str | TextArtifact | Callable[[BaseTask], TextArtifact]] = field(default=None)
    _memory: Optional[TaskMemory] = None

    @property
    def input(self) -> TextArtifact:
        if isinstance(self._input, TextArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            return TextArtifact(self._input)

    @input.setter
    def input(self, value: str | TextArtifact | Callable[[BaseTask], TextArtifact]) -> None:
        self._input = value

    @property
    def origin_task(self) -> BaseTask:
        if self.parent_task_id:
            return self.structure.find_task(self.parent_task_id)
        else:
            raise Exception("ActionSubtask has no parent task.")

    @property
    def parents(self) -> list[BaseTask]:
        if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
            return [self.origin_task.find_subtask(parent_id) for parent_id in self.parent_ids]
        else:
            raise Exception("ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin.")

    @property
    def children(self) -> list[BaseTask]:
        if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
            return [self.origin_task.find_subtask(child_id) for child_id in self.child_ids]
        else:
            raise Exception("ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin.")

    def attach_to(self, parent_task: BaseTask):
        self.parent_task_id = parent_task.id
        self.structure = parent_task.structure
        self.__init_from_prompt(self.input.to_text())

    def before_run(self) -> None:
        self.structure.publish_event(
            StartActionsSubtaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
                subtask_parent_task_id=self.parent_task_id,
                subtask_thought=self.thought,
                subtask_actions=self.actions_to_dicts(),
            )
        )
        self.structure.logger.info(f"Subtask {self.id}\n{self.input.to_text()}")

    def run(self) -> BaseArtifact:
        try:
            if any(a.name == "error" for a in self.actions):
                errors = [a.input["error"] for a in self.actions if a.name == "error"]

                self.output = ErrorArtifact("\n\n".join(errors))
            else:
                results = self.execute_actions(self.actions)

                actions_output = []
                for result in results:
                    tag, output = result
                    output.name = f"{tag} output"

                    actions_output.append(output)
                self.output = ListArtifact(actions_output)
        except Exception as e:
            self.structure.logger.error(f"Subtask {self.id}\n{e}", exc_info=True)

            self.output = ErrorArtifact(str(e), exception=e)
        finally:
            if self.output is not None:
                return self.output
            else:
                return ErrorArtifact("no tool output")

    def execute_actions(self, actions: list[Action]) -> list[tuple[str, BaseArtifact]]:
        results = utils.execute_futures_dict(
            {a.tag: self.futures_executor.submit(self.execute_action, a) for a in actions}
        )

        return [r for r in results.values()]

    def execute_action(self, action: Action) -> tuple[str, BaseArtifact]:
        if action.tool is not None:
            if action.path is not None:
                output = action.tool.execute(getattr(action.tool, action.path), self, action)
            else:
                output = ErrorArtifact("action path not found")
        else:
            output = ErrorArtifact("action name not found")

        return action.tag, output

    def after_run(self) -> None:
        response = self.output.to_text() if isinstance(self.output, BaseArtifact) else str(self.output)

        self.structure.publish_event(
            FinishActionsSubtaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
                subtask_parent_task_id=self.parent_task_id,
                subtask_thought=self.thought,
                subtask_actions=self.actions_to_dicts(),
            )
        )
        self.structure.logger.info(f"Subtask {self.id}\nResponse: {response}")

    def actions_to_dicts(self) -> list[dict]:
        json_list = []

        for action in self.actions:
            json_dict = {}

            if action.tag:
                json_dict["tag"] = action.tag

            if action.name:
                json_dict["name"] = action.name

            if action.path:
                json_dict["path"] = action.path

            if action.input:
                json_dict["input"] = action.input

            json_list.append(json_dict)

        return json_list

    def actions_to_json(self) -> str:
        return json.dumps(self.actions_to_dicts())

    def __init_from_prompt(self, value: str) -> None:
        thought_matches = re.findall(self.THOUGHT_PATTERN, value, re.MULTILINE)
        actions_matches = re.findall(self.ACTIONS_PATTERN, value, re.DOTALL)
        answer_matches = re.findall(self.ANSWER_PATTERN, value, re.MULTILINE)

        if self.thought is None and len(thought_matches) > 0:
            self.thought = thought_matches[-1]

        self.__parse_actions(actions_matches)

        # If there are no actions to take but an answer is provided, set the answer as the output.
        if len(self.actions) == 0 and self.output is None and len(answer_matches) > 0:
            self.output = TextArtifact(answer_matches[-1])

    def __parse_actions(self, actions_matches: list[str]) -> None:
        if len(actions_matches) == 0:
            return

        try:
            data = actions_matches[-1]
            actions_list: list = json.loads(data, strict=False)

            if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
                self.origin_task.actions_schema().validate(actions_list)

            for action_object in actions_list:
                # Load action name; throw exception if the key is not present
                action_tag = action_object["tag"]

                # Load action name; throw exception if the key is not present
                action_name = action_object["name"]

                # Load action method; throw exception if the key is not present
                action_path = action_object["path"]

                # Load optional input value; don't throw exceptions if key is not present
                if "input" in action_object:
                    # The schema library has a bug, where something like `Or(str, None)` doesn't get
                    # correctly translated into JSON schema. For some optional input fields LLMs sometimes
                    # still provide null value, which trips up the validator. The temporary solution that
                    # works is to strip all key-values where value is null.
                    action_input = remove_null_values_in_dict_recursively(action_object["input"])
                else:
                    action_input = {}

                # Load the action itself
                if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
                    tool = self.origin_task.find_tool(action_name)
                else:
                    raise Exception(
                        "ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin."
                    )

                new_action = ActionsSubtask.Action(
                    tag=action_tag, name=action_name, path=action_path, input=action_input, tool=tool
                )

                if new_action.tool:
                    if new_action.input:
                        self.__validate_action(new_action)

                # Don't forget to add it to the subtask actions list!
                self.actions.append(new_action)
        except SyntaxError as e:
            self.structure.logger.error(f"Subtask {self.origin_task.id}\nSyntax error: {e}")

            self.actions.append(self.__error_to_action(f"syntax error: {e}"))
        except schema.SchemaError as e:
            self.structure.logger.error(f"Subtask {self.origin_task.id}\nInvalid action JSON: {e}")

            self.actions.append(self.__error_to_action(f"Action JSON validation error: {e}"))
        except Exception as e:
            self.structure.logger.error(f"Subtask {self.origin_task.id}\nError parsing tool action: {e}")

            self.actions.append(self.__error_to_action(f"Action input parsing error: {e}"))

    def __error_to_action(self, error: str) -> Action:
        return ActionsSubtask.Action(tag="error", name="error", input={"error": error})

    def __validate_action(self, action: Action) -> None:
        try:
            if action.path is not None:
                activity = getattr(action.tool, action.path)
            else:
                raise Exception("Action path not found.")

            if activity is not None:
                activity_schema = action.tool.activity_schema(activity)
            else:
                raise Exception("Activity not found.")

            if activity_schema:
                activity_schema.validate(action.input)
        except schema.SchemaError as e:
            self.structure.logger.error(f"Subtask {self.origin_task.id}\nInvalid activity input JSON: {e}")

            self.actions.append(self.__error_to_action(f"Activity input JSON validation error: {e}"))

ACTIONS_PATTERN = '(?s)Actions:[^\\[]*(\\[.*\\])' class-attribute instance-attribute

ANSWER_PATTERN = '(?s)^Answer:\\s?([\\s\\S]*)$' class-attribute instance-attribute

THOUGHT_PATTERN = '(?s)^Thought:\\s*(.*?)$' class-attribute instance-attribute

actions: list[Action] = field(factory=list, kw_only=True) class-attribute instance-attribute

children: list[BaseTask] property

input: TextArtifact property writable

origin_task: BaseTask property

parent_task_id: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

parents: list[BaseTask] property

thought: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

Action

Source code in griptape/tasks/actions_subtask.py
@define(kw_only=True)
class Action:
    tag: str = field()
    name: str = field()
    path: Optional[str] = field(default=None)
    input: dict = field()
    tool: Optional[BaseTool] = field(default=None)
input: dict = field() class-attribute instance-attribute
name: str = field() class-attribute instance-attribute
path: Optional[str] = field(default=None) class-attribute instance-attribute
tag: str = field() class-attribute instance-attribute
tool: Optional[BaseTool] = field(default=None) class-attribute instance-attribute

__error_to_action(error)

Source code in griptape/tasks/actions_subtask.py
def __error_to_action(self, error: str) -> Action:
    return ActionsSubtask.Action(tag="error", name="error", input={"error": error})

__init_from_prompt(value)

Source code in griptape/tasks/actions_subtask.py
def __init_from_prompt(self, value: str) -> None:
    thought_matches = re.findall(self.THOUGHT_PATTERN, value, re.MULTILINE)
    actions_matches = re.findall(self.ACTIONS_PATTERN, value, re.DOTALL)
    answer_matches = re.findall(self.ANSWER_PATTERN, value, re.MULTILINE)

    if self.thought is None and len(thought_matches) > 0:
        self.thought = thought_matches[-1]

    self.__parse_actions(actions_matches)

    # If there are no actions to take but an answer is provided, set the answer as the output.
    if len(self.actions) == 0 and self.output is None and len(answer_matches) > 0:
        self.output = TextArtifact(answer_matches[-1])

__parse_actions(actions_matches)

Source code in griptape/tasks/actions_subtask.py
def __parse_actions(self, actions_matches: list[str]) -> None:
    if len(actions_matches) == 0:
        return

    try:
        data = actions_matches[-1]
        actions_list: list = json.loads(data, strict=False)

        if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
            self.origin_task.actions_schema().validate(actions_list)

        for action_object in actions_list:
            # Load action name; throw exception if the key is not present
            action_tag = action_object["tag"]

            # Load action name; throw exception if the key is not present
            action_name = action_object["name"]

            # Load action method; throw exception if the key is not present
            action_path = action_object["path"]

            # Load optional input value; don't throw exceptions if key is not present
            if "input" in action_object:
                # The schema library has a bug, where something like `Or(str, None)` doesn't get
                # correctly translated into JSON schema. For some optional input fields LLMs sometimes
                # still provide null value, which trips up the validator. The temporary solution that
                # works is to strip all key-values where value is null.
                action_input = remove_null_values_in_dict_recursively(action_object["input"])
            else:
                action_input = {}

            # Load the action itself
            if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
                tool = self.origin_task.find_tool(action_name)
            else:
                raise Exception(
                    "ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin."
                )

            new_action = ActionsSubtask.Action(
                tag=action_tag, name=action_name, path=action_path, input=action_input, tool=tool
            )

            if new_action.tool:
                if new_action.input:
                    self.__validate_action(new_action)

            # Don't forget to add it to the subtask actions list!
            self.actions.append(new_action)
    except SyntaxError as e:
        self.structure.logger.error(f"Subtask {self.origin_task.id}\nSyntax error: {e}")

        self.actions.append(self.__error_to_action(f"syntax error: {e}"))
    except schema.SchemaError as e:
        self.structure.logger.error(f"Subtask {self.origin_task.id}\nInvalid action JSON: {e}")

        self.actions.append(self.__error_to_action(f"Action JSON validation error: {e}"))
    except Exception as e:
        self.structure.logger.error(f"Subtask {self.origin_task.id}\nError parsing tool action: {e}")

        self.actions.append(self.__error_to_action(f"Action input parsing error: {e}"))

__validate_action(action)

Source code in griptape/tasks/actions_subtask.py
def __validate_action(self, action: Action) -> None:
    try:
        if action.path is not None:
            activity = getattr(action.tool, action.path)
        else:
            raise Exception("Action path not found.")

        if activity is not None:
            activity_schema = action.tool.activity_schema(activity)
        else:
            raise Exception("Activity not found.")

        if activity_schema:
            activity_schema.validate(action.input)
    except schema.SchemaError as e:
        self.structure.logger.error(f"Subtask {self.origin_task.id}\nInvalid activity input JSON: {e}")

        self.actions.append(self.__error_to_action(f"Activity input JSON validation error: {e}"))

actions_to_dicts()

Source code in griptape/tasks/actions_subtask.py
def actions_to_dicts(self) -> list[dict]:
    json_list = []

    for action in self.actions:
        json_dict = {}

        if action.tag:
            json_dict["tag"] = action.tag

        if action.name:
            json_dict["name"] = action.name

        if action.path:
            json_dict["path"] = action.path

        if action.input:
            json_dict["input"] = action.input

        json_list.append(json_dict)

    return json_list

actions_to_json()

Source code in griptape/tasks/actions_subtask.py
def actions_to_json(self) -> str:
    return json.dumps(self.actions_to_dicts())

after_run()

Source code in griptape/tasks/actions_subtask.py
def after_run(self) -> None:
    response = self.output.to_text() if isinstance(self.output, BaseArtifact) else str(self.output)

    self.structure.publish_event(
        FinishActionsSubtaskEvent(
            task_id=self.id,
            task_parent_ids=self.parent_ids,
            task_child_ids=self.child_ids,
            task_input=self.input,
            task_output=self.output,
            subtask_parent_task_id=self.parent_task_id,
            subtask_thought=self.thought,
            subtask_actions=self.actions_to_dicts(),
        )
    )
    self.structure.logger.info(f"Subtask {self.id}\nResponse: {response}")

attach_to(parent_task)

Source code in griptape/tasks/actions_subtask.py
def attach_to(self, parent_task: BaseTask):
    self.parent_task_id = parent_task.id
    self.structure = parent_task.structure
    self.__init_from_prompt(self.input.to_text())

before_run()

Source code in griptape/tasks/actions_subtask.py
def before_run(self) -> None:
    self.structure.publish_event(
        StartActionsSubtaskEvent(
            task_id=self.id,
            task_parent_ids=self.parent_ids,
            task_child_ids=self.child_ids,
            task_input=self.input,
            task_output=self.output,
            subtask_parent_task_id=self.parent_task_id,
            subtask_thought=self.thought,
            subtask_actions=self.actions_to_dicts(),
        )
    )
    self.structure.logger.info(f"Subtask {self.id}\n{self.input.to_text()}")

execute_action(action)

Source code in griptape/tasks/actions_subtask.py
def execute_action(self, action: Action) -> tuple[str, BaseArtifact]:
    if action.tool is not None:
        if action.path is not None:
            output = action.tool.execute(getattr(action.tool, action.path), self, action)
        else:
            output = ErrorArtifact("action path not found")
    else:
        output = ErrorArtifact("action name not found")

    return action.tag, output

execute_actions(actions)

Source code in griptape/tasks/actions_subtask.py
def execute_actions(self, actions: list[Action]) -> list[tuple[str, BaseArtifact]]:
    results = utils.execute_futures_dict(
        {a.tag: self.futures_executor.submit(self.execute_action, a) for a in actions}
    )

    return [r for r in results.values()]

run()

Source code in griptape/tasks/actions_subtask.py
def run(self) -> BaseArtifact:
    try:
        if any(a.name == "error" for a in self.actions):
            errors = [a.input["error"] for a in self.actions if a.name == "error"]

            self.output = ErrorArtifact("\n\n".join(errors))
        else:
            results = self.execute_actions(self.actions)

            actions_output = []
            for result in results:
                tag, output = result
                output.name = f"{tag} output"

                actions_output.append(output)
            self.output = ListArtifact(actions_output)
    except Exception as e:
        self.structure.logger.error(f"Subtask {self.id}\n{e}", exc_info=True)

        self.output = ErrorArtifact(str(e), exception=e)
    finally:
        if self.output is not None:
            return self.output
        else:
            return ErrorArtifact("no tool output")

AudioTranscriptionTask

Bases: RuleMixin, BaseTask, ABC

Source code in griptape/tasks/audio_transcription_task.py
@define
class AudioTranscriptionTask(RuleMixin, BaseTask, ABC):
    _input: AudioArtifact | Callable[[BaseTask], AudioArtifact] = field()
    _audio_transcription_engine: AudioTranscriptionEngine = field(
        default=None, kw_only=True, alias="audio_transcription_engine"
    )

    @property
    def input(self) -> AudioArtifact:
        if isinstance(self._input, AudioArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            raise ValueError("Input must be an AudioArtifact.")

    @input.setter
    def input(self, value: AudioArtifact | Callable[[BaseTask], AudioArtifact]) -> None:
        self._input = value

    @property
    def audio_transcription_engine(self) -> AudioTranscriptionEngine:
        if self._audio_transcription_engine is None:
            if self.structure is not None:
                self._audio_transcription_engine = AudioTranscriptionEngine(
                    audio_transcription_driver=self.structure.config.audio_transcription_driver
                )
            else:
                raise ValueError("Audio Generation Engine is not set.")
        return self._audio_transcription_engine

    @audio_transcription_engine.setter
    def audio_transcription_engine(self, value: AudioTranscriptionEngine) -> None:
        self._audio_transcription_engine = value

    def run(self) -> TextArtifact:
        return self.audio_transcription_engine.run(self.input)

audio_transcription_engine: AudioTranscriptionEngine property writable

input: AudioArtifact property writable

run()

Source code in griptape/tasks/audio_transcription_task.py
def run(self) -> TextArtifact:
    return self.audio_transcription_engine.run(self.input)

BaseAudioGenerationTask

Bases: BlobArtifactFileOutputMixin, RuleMixin, BaseTask, ABC

Source code in griptape/tasks/base_audio_generation_task.py
@define
class BaseAudioGenerationTask(BlobArtifactFileOutputMixin, RuleMixin, BaseTask, ABC): ...

BaseImageGenerationTask

Bases: BlobArtifactFileOutputMixin, RuleMixin, BaseTask, ABC

Provides a base class for image generation-related tasks.

Attributes:

Name Type Description
negative_rulesets list[Ruleset]

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules list[Rule]

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir list[Rule]

If provided, the generated image will be written to disk in output_dir.

output_file list[Rule]

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/base_image_generation_task.py
@define
class BaseImageGenerationTask(BlobArtifactFileOutputMixin, RuleMixin, BaseTask, ABC):
    """Provides a base class for image generation-related tasks.

    Attributes:
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    NEGATIVE_RULESET_NAME = "Negative Ruleset"

    negative_rulesets: list[Ruleset] = field(factory=list, kw_only=True)
    negative_rules: list[Rule] = field(factory=list, kw_only=True)

    @negative_rulesets.validator  # pyright: ignore
    def validate_negative_rulesets(self, _, negative_rulesets: list[Ruleset]) -> None:
        if not negative_rulesets:
            return

        if self.negative_rules:
            raise ValueError("Can't have both negative_rulesets and negative_rules specified.")

    @negative_rules.validator  # pyright: ignore
    def validate_negative_rules(self, _, negative_rules: list[Rule]) -> None:
        if not negative_rules:
            return

        if self.negative_rulesets:
            raise ValueError("Can't have both negative_rules and negative_rulesets specified.")

    @property
    def all_negative_rulesets(self) -> list[Ruleset]:
        task_rulesets = []
        if self.negative_rulesets:
            task_rulesets = self.negative_rulesets

        elif self.negative_rules:
            task_rulesets = [Ruleset(name=self.NEGATIVE_RULESET_NAME, rules=self.negative_rules)]

        return task_rulesets

    def _read_from_file(self, path: str) -> MediaArtifact:
        self.structure.logger.info(f"Reading image from {os.path.abspath(path)}")
        with open(path, "rb") as file:
            return ImageLoader().load(file.read())

NEGATIVE_RULESET_NAME = 'Negative Ruleset' class-attribute instance-attribute

all_negative_rulesets: list[Ruleset] property

negative_rules: list[Rule] = field(factory=list, kw_only=True) class-attribute instance-attribute

negative_rulesets: list[Ruleset] = field(factory=list, kw_only=True) class-attribute instance-attribute

validate_negative_rules(_, negative_rules)

Source code in griptape/tasks/base_image_generation_task.py
@negative_rules.validator  # pyright: ignore
def validate_negative_rules(self, _, negative_rules: list[Rule]) -> None:
    if not negative_rules:
        return

    if self.negative_rulesets:
        raise ValueError("Can't have both negative_rules and negative_rulesets specified.")

validate_negative_rulesets(_, negative_rulesets)

Source code in griptape/tasks/base_image_generation_task.py
@negative_rulesets.validator  # pyright: ignore
def validate_negative_rulesets(self, _, negative_rulesets: list[Ruleset]) -> None:
    if not negative_rulesets:
        return

    if self.negative_rules:
        raise ValueError("Can't have both negative_rulesets and negative_rules specified.")

BaseMultiTextInputTask

Bases: RuleMixin, BaseTask, ABC

Source code in griptape/tasks/base_multi_text_input_task.py
@define
class BaseMultiTextInputTask(RuleMixin, BaseTask, ABC):
    DEFAULT_INPUT_TEMPLATE = "{{ args[0] }}"

    _input: tuple[str, ...] | tuple[TextArtifact, ...] | tuple[Callable[[BaseTask], TextArtifact], ...] = field(
        default=Factory(lambda self: (self.DEFAULT_INPUT_TEMPLATE,), takes_self=True), alias="input"
    )

    @property
    def input(self) -> ListArtifact:
        if all(isinstance(elem, TextArtifact) for elem in self._input):
            return ListArtifact([artifact for artifact in self._input if isinstance(artifact, TextArtifact)])
        elif all(isinstance(elem, Callable) for elem in self._input):
            return ListArtifact([callable(self) for callable in self._input if isinstance(callable, Callable)])
        else:
            return ListArtifact(
                [
                    TextArtifact(J2().render_from_string(input_template, **self.full_context))
                    for input_template in self._input
                    if isinstance(input_template, str)
                ]
            )

    @input.setter
    def input(
        self, value: tuple[str, ...] | tuple[TextArtifact, ...] | tuple[Callable[[BaseTask], TextArtifact], ...]
    ) -> None:
        self._input = value

    def before_run(self) -> None:
        super().before_run()

        joined_input = "\n".join([input.to_text() for input in self.input])
        self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nInput: {joined_input}")

    def after_run(self) -> None:
        super().after_run()

        self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nOutput: {self.output.to_text()}")

DEFAULT_INPUT_TEMPLATE = '{{ args[0] }}' class-attribute instance-attribute

input: ListArtifact property writable

after_run()

Source code in griptape/tasks/base_multi_text_input_task.py
def after_run(self) -> None:
    super().after_run()

    self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nOutput: {self.output.to_text()}")

before_run()

Source code in griptape/tasks/base_multi_text_input_task.py
def before_run(self) -> None:
    super().before_run()

    joined_input = "\n".join([input.to_text() for input in self.input])
    self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nInput: {joined_input}")

BaseTask

Bases: ABC

Source code in griptape/tasks/base_task.py
@define
class BaseTask(ABC):
    class State(Enum):
        PENDING = 1
        EXECUTING = 2
        FINISHED = 3

    id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True)
    state: State = field(default=State.PENDING, kw_only=True)
    parent_ids: list[str] = field(factory=list, kw_only=True)
    child_ids: list[str] = field(factory=list, kw_only=True)
    max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True)

    output: Optional[BaseArtifact] = field(default=None, init=False)
    structure: Optional[Structure] = field(default=None, init=False)
    context: dict[str, Any] = field(factory=dict, kw_only=True)
    futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True)

    @property
    @abstractmethod
    def input(self) -> BaseArtifact: ...

    @property
    def parents(self) -> list[BaseTask]:
        return [self.structure.find_task(parent_id) for parent_id in self.parent_ids]

    @property
    def children(self) -> list[BaseTask]:
        return [self.structure.find_task(child_id) for child_id in self.child_ids]

    @property
    def parent_outputs(self) -> dict[str, str]:
        return {parent.id: parent.output.to_text() if parent.output else "" for parent in self.parents}

    @property
    def parents_output_text(self) -> str:
        return "\n".join([parent.output.to_text() for parent in self.parents if parent.output])

    @property
    def meta_memories(self) -> list[BaseMetaEntry]:
        if self.structure and self.structure.meta_memory:
            if self.max_meta_memory_entries:
                return self.structure.meta_memory.entries[: self.max_meta_memory_entries]
            else:
                return self.structure.meta_memory.entries
        else:
            return []

    def __str__(self) -> str:
        return str(self.output.value)

    def add_parents(self, parents: list[str | BaseTask]) -> None:
        for parent in parents:
            self.add_parent(parent)

    def add_parent(self, parent: str | BaseTask) -> None:
        parent_id = parent if isinstance(parent, str) else parent.id

        if parent_id not in self.parent_ids:
            self.parent_ids.append(parent_id)

    def add_children(self, children: list[str | BaseTask]) -> None:
        for child in children:
            self.add_child(child)

    def add_child(self, child: str | BaseTask) -> None:
        child_id = child if isinstance(child, str) else child.id

        if child_id not in self.child_ids:
            self.child_ids.append(child_id)

    def preprocess(self, structure: Structure) -> BaseTask:
        self.structure = structure

        return self

    def is_pending(self) -> bool:
        return self.state == BaseTask.State.PENDING

    def is_finished(self) -> bool:
        return self.state == BaseTask.State.FINISHED

    def is_executing(self) -> bool:
        return self.state == BaseTask.State.EXECUTING

    def before_run(self) -> None:
        if self.structure:
            self.structure.publish_event(
                StartTaskEvent(
                    task_id=self.id,
                    task_parent_ids=self.parent_ids,
                    task_child_ids=self.child_ids,
                    task_input=self.input,
                    task_output=self.output,
                )
            )

    def after_run(self) -> None:
        if self.structure:
            self.structure.publish_event(
                FinishTaskEvent(
                    task_id=self.id,
                    task_parent_ids=self.parent_ids,
                    task_child_ids=self.child_ids,
                    task_input=self.input,
                    task_output=self.output,
                )
            )

    def execute(self) -> Optional[BaseArtifact]:
        try:
            self.state = BaseTask.State.EXECUTING

            self.before_run()

            self.output = self.run()

            self.after_run()
        except Exception as e:
            self.structure.logger.error(f"{self.__class__.__name__} {self.id}\n{e}", exc_info=True)

            self.output = ErrorArtifact(str(e), exception=e)
        finally:
            self.state = BaseTask.State.FINISHED

            return self.output

    def can_execute(self) -> bool:
        return self.state == BaseTask.State.PENDING and all(parent.is_finished() for parent in self.parents)

    def reset(self) -> BaseTask:
        self.state = BaseTask.State.PENDING
        self.output = None

        return self

    @abstractmethod
    def run(self) -> BaseArtifact: ...

    @property
    def full_context(self) -> dict[str, Any]:
        if self.structure:
            structure_context = self.structure.context(self)

            structure_context.update(self.context)

            return structure_context
        else:
            return {}

child_ids: list[str] = field(factory=list, kw_only=True) class-attribute instance-attribute

children: list[BaseTask] property

context: dict[str, Any] = field(factory=dict, kw_only=True) class-attribute instance-attribute

full_context: dict[str, Any] property

futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True) class-attribute instance-attribute

id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True) class-attribute instance-attribute

input: BaseArtifact abstractmethod property

max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True) class-attribute instance-attribute

meta_memories: list[BaseMetaEntry] property

output: Optional[BaseArtifact] = field(default=None, init=False) class-attribute instance-attribute

parent_ids: list[str] = field(factory=list, kw_only=True) class-attribute instance-attribute

parent_outputs: dict[str, str] property

parents: list[BaseTask] property

parents_output_text: str property

state: State = field(default=State.PENDING, kw_only=True) class-attribute instance-attribute

structure: Optional[Structure] = field(default=None, init=False) class-attribute instance-attribute

State

Bases: Enum

Source code in griptape/tasks/base_task.py
class State(Enum):
    PENDING = 1
    EXECUTING = 2
    FINISHED = 3
EXECUTING = 2 class-attribute instance-attribute
FINISHED = 3 class-attribute instance-attribute
PENDING = 1 class-attribute instance-attribute

__str__()

Source code in griptape/tasks/base_task.py
def __str__(self) -> str:
    return str(self.output.value)

add_child(child)

Source code in griptape/tasks/base_task.py
def add_child(self, child: str | BaseTask) -> None:
    child_id = child if isinstance(child, str) else child.id

    if child_id not in self.child_ids:
        self.child_ids.append(child_id)

add_children(children)

Source code in griptape/tasks/base_task.py
def add_children(self, children: list[str | BaseTask]) -> None:
    for child in children:
        self.add_child(child)

add_parent(parent)

Source code in griptape/tasks/base_task.py
def add_parent(self, parent: str | BaseTask) -> None:
    parent_id = parent if isinstance(parent, str) else parent.id

    if parent_id not in self.parent_ids:
        self.parent_ids.append(parent_id)

add_parents(parents)

Source code in griptape/tasks/base_task.py
def add_parents(self, parents: list[str | BaseTask]) -> None:
    for parent in parents:
        self.add_parent(parent)

after_run()

Source code in griptape/tasks/base_task.py
def after_run(self) -> None:
    if self.structure:
        self.structure.publish_event(
            FinishTaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
            )
        )

before_run()

Source code in griptape/tasks/base_task.py
def before_run(self) -> None:
    if self.structure:
        self.structure.publish_event(
            StartTaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
            )
        )

can_execute()

Source code in griptape/tasks/base_task.py
def can_execute(self) -> bool:
    return self.state == BaseTask.State.PENDING and all(parent.is_finished() for parent in self.parents)

execute()

Source code in griptape/tasks/base_task.py
def execute(self) -> Optional[BaseArtifact]:
    try:
        self.state = BaseTask.State.EXECUTING

        self.before_run()

        self.output = self.run()

        self.after_run()
    except Exception as e:
        self.structure.logger.error(f"{self.__class__.__name__} {self.id}\n{e}", exc_info=True)

        self.output = ErrorArtifact(str(e), exception=e)
    finally:
        self.state = BaseTask.State.FINISHED

        return self.output

is_executing()

Source code in griptape/tasks/base_task.py
def is_executing(self) -> bool:
    return self.state == BaseTask.State.EXECUTING

is_finished()

Source code in griptape/tasks/base_task.py
def is_finished(self) -> bool:
    return self.state == BaseTask.State.FINISHED

is_pending()

Source code in griptape/tasks/base_task.py
def is_pending(self) -> bool:
    return self.state == BaseTask.State.PENDING

preprocess(structure)

Source code in griptape/tasks/base_task.py
def preprocess(self, structure: Structure) -> BaseTask:
    self.structure = structure

    return self

reset()

Source code in griptape/tasks/base_task.py
def reset(self) -> BaseTask:
    self.state = BaseTask.State.PENDING
    self.output = None

    return self

run() abstractmethod

Source code in griptape/tasks/base_task.py
@abstractmethod
def run(self) -> BaseArtifact: ...

BaseTextInputTask

Bases: RuleMixin, BaseTask, ABC

Source code in griptape/tasks/base_text_input_task.py
@define
class BaseTextInputTask(RuleMixin, BaseTask, ABC):
    DEFAULT_INPUT_TEMPLATE = "{{ args[0] }}"

    _input: str | TextArtifact | Callable[[BaseTask], TextArtifact] = field(
        default=DEFAULT_INPUT_TEMPLATE, alias="input"
    )

    @property
    def input(self) -> TextArtifact:
        if isinstance(self._input, TextArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            return TextArtifact(J2().render_from_string(self._input, **self.full_context))

    @input.setter
    def input(self, value: str | TextArtifact | Callable[[BaseTask], TextArtifact]) -> None:
        self._input = value

    def before_run(self) -> None:
        super().before_run()

        self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nInput: {self.input.to_text()}")

    def after_run(self) -> None:
        super().after_run()

        self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nOutput: {self.output.to_text()}")

DEFAULT_INPUT_TEMPLATE = '{{ args[0] }}' class-attribute instance-attribute

input: TextArtifact property writable

after_run()

Source code in griptape/tasks/base_text_input_task.py
def after_run(self) -> None:
    super().after_run()

    self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nOutput: {self.output.to_text()}")

before_run()

Source code in griptape/tasks/base_text_input_task.py
def before_run(self) -> None:
    super().before_run()

    self.structure.logger.info(f"{self.__class__.__name__} {self.id}\nInput: {self.input.to_text()}")

CodeExecutionTask

Bases: BaseTextInputTask

Source code in griptape/tasks/code_execution_task.py
@define
class CodeExecutionTask(BaseTextInputTask):
    run_fn: Callable[[CodeExecutionTask], BaseArtifact] = field(kw_only=True)

    def run(self) -> BaseArtifact:
        try:
            return self.run_fn(self)
        except Exception as e:
            return ErrorArtifact(f"error during Code Execution Task: {e}")

run_fn: Callable[[CodeExecutionTask], BaseArtifact] = field(kw_only=True) class-attribute instance-attribute

run()

Source code in griptape/tasks/code_execution_task.py
def run(self) -> BaseArtifact:
    try:
        return self.run_fn(self)
    except Exception as e:
        return ErrorArtifact(f"error during Code Execution Task: {e}")

CsvExtractionTask

Bases: ExtractionTask

Source code in griptape/tasks/csv_extraction_task.py
@define
class CsvExtractionTask(ExtractionTask):
    _extraction_engine: CsvExtractionEngine = field(default=None, kw_only=True, alias="extraction_engine")

    @property
    def extraction_engine(self) -> CsvExtractionEngine:
        if self._extraction_engine is None:
            if self.structure is not None:
                self._extraction_engine = CsvExtractionEngine(prompt_driver=self.structure.config.prompt_driver)
            else:
                raise ValueError("Extraction Engine is not set.")
        return self._extraction_engine

    @extraction_engine.setter
    def extraction_engine(self, value: CsvExtractionEngine) -> None:
        self._extraction_engine = value

extraction_engine: CsvExtractionEngine property writable

ExtractionTask

Bases: BaseTextInputTask

Source code in griptape/tasks/extraction_task.py
@define
class ExtractionTask(BaseTextInputTask):
    _extraction_engine: BaseExtractionEngine = field(kw_only=True, default=None, alias="extraction_engine")
    args: dict = field(kw_only=True)

    @property
    def extraction_engine(self) -> BaseExtractionEngine:
        return self._extraction_engine

    def run(self) -> ListArtifact | ErrorArtifact:
        return self.extraction_engine.extract(self.input.to_text(), rulesets=self.all_rulesets, **self.args)

args: dict = field(kw_only=True) class-attribute instance-attribute

extraction_engine: BaseExtractionEngine property

run()

Source code in griptape/tasks/extraction_task.py
def run(self) -> ListArtifact | ErrorArtifact:
    return self.extraction_engine.extract(self.input.to_text(), rulesets=self.all_rulesets, **self.args)

ImageQueryTask

Bases: BaseTask

A task that executes a natural language query on one or more input images. Accepts a text prompt and a list of images as input in one of the following formats: - tuple of (template string, list[ImageArtifact]) - tuple of (TextArtifact, list[ImageArtifact]) - Callable that returns a tuple of (TextArtifact, list[ImageArtifact])

Attributes:

Name Type Description
image_query_engine ImageQueryEngine

The engine used to execute the query.

Source code in griptape/tasks/image_query_task.py
@define
class ImageQueryTask(BaseTask):
    """A task that executes a natural language query on one or more input images. Accepts a text prompt and a list of
    images as input in one of the following formats:
    - tuple of (template string, list[ImageArtifact])
    - tuple of (TextArtifact, list[ImageArtifact])
    - Callable that returns a tuple of (TextArtifact, list[ImageArtifact])

    Attributes:
        image_query_engine: The engine used to execute the query.
    """

    _image_query_engine: ImageQueryEngine = field(default=None, kw_only=True, alias="image_query_engine")
    _input: (
        tuple[str, list[ImageArtifact]]
        | tuple[TextArtifact, list[ImageArtifact]]
        | Callable[[BaseTask], ListArtifact]
        | ListArtifact
    ) = field(default=None, alias="input")

    @property
    def input(self) -> ListArtifact:
        if isinstance(self._input, ListArtifact):
            return self._input
        elif isinstance(self._input, tuple):
            if isinstance(self._input[0], TextArtifact):
                query_text = self._input[0]
            else:
                query_text = TextArtifact(J2().render_from_string(self._input[0], **self.full_context))

            return ListArtifact([query_text, *self._input[1]])
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            raise ValueError(
                "Input must be a tuple of a TextArtifact and a list of ImageArtifacts or a callable that "
                "returns a tuple of a TextArtifact and a list of ImageArtifacts."
            )

    @input.setter
    def input(
        self,
        value: (
            tuple[str, list[ImageArtifact]]
            | tuple[TextArtifact, list[ImageArtifact]]
            | Callable[[BaseTask], ListArtifact]
        ),
    ) -> None:
        self._input = value

    @property
    def image_query_engine(self) -> ImageQueryEngine:
        if self._image_query_engine is None:
            if self.structure is not None:
                self._image_query_engine = ImageQueryEngine(image_query_driver=self.structure.config.image_query_driver)
            else:
                raise ValueError("Image Query Engine is not set.")
        return self._image_query_engine

    @image_query_engine.setter
    def image_query_engine(self, value: ImageQueryEngine) -> None:
        self._image_query_engine = value

    def run(self) -> TextArtifact:
        query = self.input.value[0]

        if all([isinstance(input, ImageArtifact) for input in self.input.value[1:]]):
            image_artifacts = [input for input in self.input.value[1:] if isinstance(input, ImageArtifact)]
        else:
            raise ValueError("All inputs after the query must be ImageArtifacts.")

        self.output = self.image_query_engine.run(query.value, image_artifacts)

        return self.output

image_query_engine: ImageQueryEngine property writable

input: ListArtifact property writable

run()

Source code in griptape/tasks/image_query_task.py
def run(self) -> TextArtifact:
    query = self.input.value[0]

    if all([isinstance(input, ImageArtifact) for input in self.input.value[1:]]):
        image_artifacts = [input for input in self.input.value[1:] if isinstance(input, ImageArtifact)]
    else:
        raise ValueError("All inputs after the query must be ImageArtifacts.")

    self.output = self.image_query_engine.run(query.value, image_artifacts)

    return self.output

InpaintingImageGenerationTask

Bases: BaseImageGenerationTask

A task that modifies a select region within an image using a mask. Accepts a text prompt, image, and mask as input in one of the following formats: - tuple of (template string, ImageArtifact, ImageArtifact) - tuple of (TextArtifact, ImageArtifact, ImageArtifact) - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact)

Attributes:

Name Type Description
image_generation_engine InpaintingImageGenerationEngine

The engine used to generate the image.

negative_rulesets InpaintingImageGenerationEngine

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules InpaintingImageGenerationEngine

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir InpaintingImageGenerationEngine

If provided, the generated image will be written to disk in output_dir.

output_file InpaintingImageGenerationEngine

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/inpainting_image_generation_task.py
@define
class InpaintingImageGenerationTask(BaseImageGenerationTask):
    """A task that modifies a select region within an image using a mask. Accepts a text prompt, image, and mask as
    input in one of the following formats:
    - tuple of (template string, ImageArtifact, ImageArtifact)
    - tuple of (TextArtifact, ImageArtifact, ImageArtifact)
    - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact)

    Attributes:
        image_generation_engine: The engine used to generate the image.
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    _image_generation_engine: InpaintingImageGenerationEngine = field(
        default=None, kw_only=True, alias="image_generation_engine"
    )
    _input: (
        tuple[str | TextArtifact, ImageArtifact, ImageArtifact] | Callable[[BaseTask], ListArtifact] | ListArtifact
    ) = field(default=None)

    @property
    def input(self) -> ListArtifact:
        if isinstance(self._input, ListArtifact):
            return self._input
        elif isinstance(self._input, tuple):
            if isinstance(self._input[0], TextArtifact):
                input_text = self._input[0]
            else:
                input_text = TextArtifact(J2().render_from_string(self._input[0], **self.full_context))

            return ListArtifact([input_text, self._input[1], self._input[2]])
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            raise ValueError("Input must be a tuple of (text, image, mask) or a callable that returns such a tuple.")

    @input.setter
    def input(
        self, value: tuple[str | TextArtifact, ImageArtifact, ImageArtifact] | Callable[[BaseTask], ListArtifact]
    ) -> None:
        self._input = value

    @property
    def image_generation_engine(self) -> InpaintingImageGenerationEngine:
        if self._image_generation_engine is None:
            if self.structure is not None:
                self._image_generation_engine = InpaintingImageGenerationEngine(
                    image_generation_driver=self.structure.config.image_generation_driver
                )
            else:
                raise ValueError("Image Generation Engine is not set.")
        return self._image_generation_engine

    @image_generation_engine.setter
    def image_generation_engine(self, value: InpaintingImageGenerationEngine) -> None:
        self._image_generation_engine = value

    def run(self) -> ImageArtifact:
        prompt_artifact = self.input[0]

        image_artifact = self.input[1]
        if not isinstance(image_artifact, ImageArtifact):
            raise ValueError("Image must be an ImageArtifact.")

        mask_artifact = self.input[2]
        if not isinstance(mask_artifact, ImageArtifact):
            raise ValueError("Mask must be an ImageArtifact.")

        output_image_artifact = self.image_generation_engine.run(
            prompts=[prompt_artifact.to_text()],
            image=image_artifact,
            mask=mask_artifact,
            rulesets=self.all_rulesets,
            negative_rulesets=self.negative_rulesets,
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_image_artifact)

        return output_image_artifact

image_generation_engine: InpaintingImageGenerationEngine property writable

input: ListArtifact property writable

run()

Source code in griptape/tasks/inpainting_image_generation_task.py
def run(self) -> ImageArtifact:
    prompt_artifact = self.input[0]

    image_artifact = self.input[1]
    if not isinstance(image_artifact, ImageArtifact):
        raise ValueError("Image must be an ImageArtifact.")

    mask_artifact = self.input[2]
    if not isinstance(mask_artifact, ImageArtifact):
        raise ValueError("Mask must be an ImageArtifact.")

    output_image_artifact = self.image_generation_engine.run(
        prompts=[prompt_artifact.to_text()],
        image=image_artifact,
        mask=mask_artifact,
        rulesets=self.all_rulesets,
        negative_rulesets=self.negative_rulesets,
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_image_artifact)

    return output_image_artifact

JsonExtractionTask

Bases: ExtractionTask

Source code in griptape/tasks/json_extraction_task.py
@define
class JsonExtractionTask(ExtractionTask):
    _extraction_engine: JsonExtractionEngine = field(default=None, kw_only=True, alias="extraction_engine")

    @property
    def extraction_engine(self) -> JsonExtractionEngine:
        if self._extraction_engine is None:
            if self.structure is not None:
                self._extraction_engine = JsonExtractionEngine(prompt_driver=self.structure.config.prompt_driver)
            else:
                raise ValueError("Extraction Engine is not set.")
        return self._extraction_engine

    @extraction_engine.setter
    def extraction_engine(self, value: JsonExtractionEngine) -> None:
        self._extraction_engine = value

extraction_engine: JsonExtractionEngine property writable

OutpaintingImageGenerationTask

Bases: BaseImageGenerationTask

A task that modifies an image outside the bounds of a mask. Accepts a text prompt, image, and mask as input in one of the following formats: - tuple of (template string, ImageArtifact, ImageArtifact) - tuple of (TextArtifact, ImageArtifact, ImageArtifact) - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact)

Attributes:

Name Type Description
image_generation_engine OutpaintingImageGenerationEngine

The engine used to generate the image.

negative_rulesets OutpaintingImageGenerationEngine

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules OutpaintingImageGenerationEngine

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir OutpaintingImageGenerationEngine

If provided, the generated image will be written to disk in output_dir.

output_file OutpaintingImageGenerationEngine

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/outpainting_image_generation_task.py
@define
class OutpaintingImageGenerationTask(BaseImageGenerationTask):
    """A task that modifies an image outside the bounds of a mask. Accepts a text prompt, image, and mask as
    input in one of the following formats:
    - tuple of (template string, ImageArtifact, ImageArtifact)
    - tuple of (TextArtifact, ImageArtifact, ImageArtifact)
    - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact)

    Attributes:
        image_generation_engine: The engine used to generate the image.
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    _image_generation_engine: OutpaintingImageGenerationEngine = field(
        default=None, kw_only=True, alias="image_generation_engine"
    )
    _input: (
        tuple[str | TextArtifact, ImageArtifact, ImageArtifact] | Callable[[BaseTask], ListArtifact] | ListArtifact
    ) = field(default=None)

    @property
    def input(self) -> ListArtifact:
        if isinstance(self._input, ListArtifact):
            return self._input
        elif isinstance(self._input, tuple):
            if isinstance(self._input[0], TextArtifact):
                input_text = self._input[0]
            else:
                input_text = TextArtifact(J2().render_from_string(self._input[0], **self.full_context))

            return ListArtifact([input_text, self._input[1], self._input[2]])
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            raise ValueError("Input must be a tuple of (text, image, mask) or a callable that returns such a tuple.")

    @input.setter
    def input(
        self, value: tuple[str | TextArtifact, ImageArtifact, ImageArtifact] | Callable[[BaseTask], ListArtifact]
    ) -> None:
        self._input = value

    @property
    def image_generation_engine(self) -> OutpaintingImageGenerationEngine:
        if self._image_generation_engine is None:
            if self.structure is not None:
                self._image_generation_engine = OutpaintingImageGenerationEngine(
                    image_generation_driver=self.structure.config.image_generation_driver
                )
            else:
                raise ValueError("Image Generation Engine is not set.")

        return self._image_generation_engine

    @image_generation_engine.setter
    def image_generation_engine(self, value: OutpaintingImageGenerationEngine) -> None:
        self._image_generation_engine = value

    def run(self) -> ImageArtifact:
        prompt_artifact = self.input[0]

        image_artifact = self.input[1]
        if not isinstance(image_artifact, ImageArtifact):
            raise ValueError("Image must be an ImageArtifact.")

        mask_artifact = self.input[2]
        if not isinstance(mask_artifact, ImageArtifact):
            raise ValueError("Mask must be an ImageArtifact.")

        output_image_artifact = self.image_generation_engine.run(
            prompts=[prompt_artifact.to_text()],
            image=image_artifact,
            mask=mask_artifact,
            rulesets=self.all_rulesets,
            negative_rulesets=self.negative_rulesets,
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_image_artifact)

        return output_image_artifact

image_generation_engine: OutpaintingImageGenerationEngine property writable

input: ListArtifact property writable

run()

Source code in griptape/tasks/outpainting_image_generation_task.py
def run(self) -> ImageArtifact:
    prompt_artifact = self.input[0]

    image_artifact = self.input[1]
    if not isinstance(image_artifact, ImageArtifact):
        raise ValueError("Image must be an ImageArtifact.")

    mask_artifact = self.input[2]
    if not isinstance(mask_artifact, ImageArtifact):
        raise ValueError("Mask must be an ImageArtifact.")

    output_image_artifact = self.image_generation_engine.run(
        prompts=[prompt_artifact.to_text()],
        image=image_artifact,
        mask=mask_artifact,
        rulesets=self.all_rulesets,
        negative_rulesets=self.negative_rulesets,
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_image_artifact)

    return output_image_artifact

PromptImageGenerationTask

Bases: BaseImageGenerationTask

Used to generate an image from a text prompt. Accepts prompt as input in one of the following formats: - template string - TextArtifact - Callable that returns a TextArtifact

Attributes:

Name Type Description
image_generation_engine PromptImageGenerationEngine

The engine used to generate the image.

negative_rulesets PromptImageGenerationEngine

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules PromptImageGenerationEngine

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir PromptImageGenerationEngine

If provided, the generated image will be written to disk in output_dir.

output_file PromptImageGenerationEngine

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/prompt_image_generation_task.py
@define
class PromptImageGenerationTask(BaseImageGenerationTask):
    """Used to generate an image from a text prompt. Accepts prompt as input in one of the following formats:
    - template string
    - TextArtifact
    - Callable that returns a TextArtifact

    Attributes:
        image_generation_engine: The engine used to generate the image.
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    DEFAULT_INPUT_TEMPLATE = "{{ args[0] }}"

    _input: str | TextArtifact | Callable[[BaseTask], TextArtifact] = field(default=DEFAULT_INPUT_TEMPLATE)
    _image_generation_engine: PromptImageGenerationEngine = field(
        default=None, kw_only=True, alias="image_generation_engine"
    )

    @property
    def input(self) -> TextArtifact:
        if isinstance(self._input, TextArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            return TextArtifact(J2().render_from_string(self._input, **self.full_context))

    @input.setter
    def input(self, value: TextArtifact) -> None:
        self._input = value

    @property
    def image_generation_engine(self) -> PromptImageGenerationEngine:
        if self._image_generation_engine is None:
            if self.structure is not None:
                self._image_generation_engine = PromptImageGenerationEngine(
                    image_generation_driver=self.structure.config.image_generation_driver
                )
            else:
                raise ValueError("Image Generation Engine is not set.")
        return self._image_generation_engine

    @image_generation_engine.setter
    def image_generation_engine(self, value: PromptImageGenerationEngine) -> None:
        self._image_generation_engine = value

    def run(self) -> ImageArtifact:
        image_artifact = self.image_generation_engine.run(
            prompts=[self.input.to_text()], rulesets=self.all_rulesets, negative_rulesets=self.negative_rulesets
        )

        if self.output_dir or self.output_file:
            self._write_to_file(image_artifact)

        return image_artifact

DEFAULT_INPUT_TEMPLATE = '{{ args[0] }}' class-attribute instance-attribute

image_generation_engine: PromptImageGenerationEngine property writable

input: TextArtifact property writable

run()

Source code in griptape/tasks/prompt_image_generation_task.py
def run(self) -> ImageArtifact:
    image_artifact = self.image_generation_engine.run(
        prompts=[self.input.to_text()], rulesets=self.all_rulesets, negative_rulesets=self.negative_rulesets
    )

    if self.output_dir or self.output_file:
        self._write_to_file(image_artifact)

    return image_artifact

PromptTask

Bases: BaseTextInputTask

Source code in griptape/tasks/prompt_task.py
@define
class PromptTask(BaseTextInputTask):
    _prompt_driver: Optional[BasePromptDriver] = field(default=None, kw_only=True, alias="prompt_driver")
    generate_system_template: Callable[[PromptTask], str] = field(
        default=Factory(lambda self: self.default_system_template_generator, takes_self=True), kw_only=True
    )

    output: Optional[BaseArtifact] = field(default=None, init=False)

    @property
    def prompt_stack(self) -> PromptStack:
        stack = PromptStack()
        memory = self.structure.conversation_memory

        stack.add_system_input(self.generate_system_template(self))

        stack.add_user_input(self.input.to_text())

        if self.output:
            stack.add_assistant_input(self.output.to_text())

        if memory:
            # inserting at index 1 to place memory right after system prompt
            memory.add_to_prompt_stack(stack, 1)

        return stack

    @property
    def prompt_driver(self) -> BasePromptDriver:
        if self._prompt_driver is None:
            if self.structure is not None:
                self._prompt_driver = self.structure.config.prompt_driver
            else:
                raise ValueError("Prompt Driver is not set")
        return self._prompt_driver

    def preprocess(self, structure: Structure) -> PromptTask:
        super().preprocess(structure)
        if self.prompt_driver is not None:
            self.prompt_driver.structure = structure

        return self

    def default_system_template_generator(self, _: PromptTask) -> str:
        return J2("tasks/prompt_task/system.j2").render(
            rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.all_rulesets)
        )

    def run(self) -> BaseArtifact:
        self.output = self.prompt_driver.run(self.prompt_stack)

        return self.output

generate_system_template: Callable[[PromptTask], str] = field(default=Factory(lambda self: self.default_system_template_generator, takes_self=True), kw_only=True) class-attribute instance-attribute

output: Optional[BaseArtifact] = field(default=None, init=False) class-attribute instance-attribute

prompt_driver: BasePromptDriver property

prompt_stack: PromptStack property

default_system_template_generator(_)

Source code in griptape/tasks/prompt_task.py
def default_system_template_generator(self, _: PromptTask) -> str:
    return J2("tasks/prompt_task/system.j2").render(
        rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.all_rulesets)
    )

preprocess(structure)

Source code in griptape/tasks/prompt_task.py
def preprocess(self, structure: Structure) -> PromptTask:
    super().preprocess(structure)
    if self.prompt_driver is not None:
        self.prompt_driver.structure = structure

    return self

run()

Source code in griptape/tasks/prompt_task.py
def run(self) -> BaseArtifact:
    self.output = self.prompt_driver.run(self.prompt_stack)

    return self.output

StructureRunTask

Bases: BaseMultiTextInputTask

Task to run a Structure.

Attributes:

Name Type Description
driver BaseStructureRunDriver

Driver to run the Structure.

Source code in griptape/tasks/structure_run_task.py
@define
class StructureRunTask(BaseMultiTextInputTask):
    """Task to run a Structure.

    Attributes:
        driver: Driver to run the Structure.
    """

    driver: BaseStructureRunDriver = field(kw_only=True)

    def run(self) -> BaseArtifact:
        return self.driver.run(*self.input)

driver: BaseStructureRunDriver = field(kw_only=True) class-attribute instance-attribute

run()

Source code in griptape/tasks/structure_run_task.py
def run(self) -> BaseArtifact:
    return self.driver.run(*self.input)

TextQueryTask

Bases: BaseTextInputTask

Source code in griptape/tasks/text_query_task.py
@define
class TextQueryTask(BaseTextInputTask):
    _query_engine: BaseQueryEngine = field(kw_only=True, default=None, alias="query_engine")
    loader: TextLoader = field(default=Factory(lambda: TextLoader()), kw_only=True)
    namespace: Optional[str] = field(default=None, kw_only=True)
    top_n: Optional[int] = field(default=None, kw_only=True)

    @property
    def query_engine(self) -> BaseQueryEngine:
        if self._query_engine is None:
            if self.structure is not None:
                self._query_engine = VectorQueryEngine(
                    prompt_driver=self.structure.config.prompt_driver,
                    vector_store_driver=self.structure.config.vector_store_driver,
                )
            else:
                raise ValueError("Query Engine is not set.")
        return self._query_engine

    @query_engine.setter
    def query_engine(self, value: BaseQueryEngine) -> None:
        self._query_engine = value

    def run(self) -> TextArtifact:
        return self.query_engine.query(
            self.input.to_text(), namespace=self.namespace, rulesets=self.all_rulesets, top_n=self.top_n
        )

loader: TextLoader = field(default=Factory(lambda: TextLoader()), kw_only=True) class-attribute instance-attribute

namespace: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

query_engine: BaseQueryEngine property writable

top_n: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute

run()

Source code in griptape/tasks/text_query_task.py
def run(self) -> TextArtifact:
    return self.query_engine.query(
        self.input.to_text(), namespace=self.namespace, rulesets=self.all_rulesets, top_n=self.top_n
    )

TextSummaryTask

Bases: BaseTextInputTask

Source code in griptape/tasks/text_summary_task.py
@define
class TextSummaryTask(BaseTextInputTask):
    _summary_engine: Optional[BaseSummaryEngine] = field(default=None, alias="summary_engine")

    @property
    def summary_engine(self) -> Optional[BaseSummaryEngine]:
        if self._summary_engine is None:
            if self.structure is not None:
                self._summary_engine = PromptSummaryEngine(prompt_driver=self.structure.config.prompt_driver)
            else:
                raise ValueError("Summary Engine is not set.")
        return self._summary_engine

    @summary_engine.setter
    def summary_engine(self, value: BaseSummaryEngine) -> None:
        self._summary_engine = value

    def run(self) -> TextArtifact:
        return TextArtifact(self.summary_engine.summarize_text(self.input.to_text(), rulesets=self.all_rulesets))

summary_engine: Optional[BaseSummaryEngine] property writable

run()

Source code in griptape/tasks/text_summary_task.py
def run(self) -> TextArtifact:
    return TextArtifact(self.summary_engine.summarize_text(self.input.to_text(), rulesets=self.all_rulesets))

TextToSpeechTask

Bases: BaseAudioGenerationTask

Source code in griptape/tasks/text_to_speech_task.py
@define
class TextToSpeechTask(BaseAudioGenerationTask):
    DEFAULT_INPUT_TEMPLATE = "{{ args[0] }}"

    _input: str | TextArtifact | Callable[[BaseTask], TextArtifact] = field(default=DEFAULT_INPUT_TEMPLATE)
    _text_to_speech_engine: TextToSpeechEngine = field(default=None, kw_only=True, alias="text_to_speech_engine")

    @property
    def input(self) -> TextArtifact:
        if isinstance(self._input, TextArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            return TextArtifact(J2().render_from_string(self._input, **self.full_context))

    @input.setter
    def input(self, value: TextArtifact) -> None:
        self._input = value

    @property
    def text_to_speech_engine(self) -> TextToSpeechEngine:
        if self._text_to_speech_engine is None:
            if self.structure is not None:
                self._text_to_speech_engine = TextToSpeechEngine(
                    text_to_speech_driver=self.structure.config.text_to_speech_driver
                )
            else:
                raise ValueError("Audio Generation Engine is not set.")
        return self._text_to_speech_engine

    @text_to_speech_engine.setter
    def text_to_speech_engine(self, value: TextToSpeechEngine) -> None:
        self._text_to_speech_engine = value

    def run(self) -> AudioArtifact:
        audio_artifact = self.text_to_speech_engine.run(prompts=[self.input.to_text()], rulesets=self.all_rulesets)

        if self.output_dir or self.output_file:
            self._write_to_file(audio_artifact)

        return audio_artifact

DEFAULT_INPUT_TEMPLATE = '{{ args[0] }}' class-attribute instance-attribute

input: TextArtifact property writable

text_to_speech_engine: TextToSpeechEngine property writable

run()

Source code in griptape/tasks/text_to_speech_task.py
def run(self) -> AudioArtifact:
    audio_artifact = self.text_to_speech_engine.run(prompts=[self.input.to_text()], rulesets=self.all_rulesets)

    if self.output_dir or self.output_file:
        self._write_to_file(audio_artifact)

    return audio_artifact

ToolTask

Bases: PromptTask, ActionsSubtaskOriginMixin

Source code in griptape/tasks/tool_task.py
@define
class ToolTask(PromptTask, ActionsSubtaskOriginMixin):
    ACTION_PATTERN = r"(?s)[^{]*({.*})"

    tool: BaseTool = field(kw_only=True)
    subtask: Optional[ActionsSubtask] = field(default=None, kw_only=True)
    task_memory: Optional[TaskMemory] = field(default=None, kw_only=True)

    def __attrs_post_init__(self) -> None:
        if self.task_memory is not None:
            self.set_default_tools_memory(self.task_memory)

    def preprocess(self, structure: Structure) -> ToolTask:
        super().preprocess(structure)

        if self.task_memory is None and structure.task_memory is not None:
            self.set_default_tools_memory(structure.task_memory)

        return self

    def default_system_template_generator(self, _: PromptTask) -> str:
        return J2("tasks/tool_task/system.j2").render(
            rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.all_rulesets),
            action_schema=utils.minify_json(json.dumps(self.tool.schema())),
            meta_memory=J2("memory/meta/meta_memory.j2").render(meta_memories=self.meta_memories),
        )

    def actions_schema(self) -> Schema:
        return self._actions_schema_for_tools([self.tool])

    def run(self) -> BaseArtifact:
        prompt_output = self.prompt_driver.run(prompt_stack=self.prompt_stack).to_text()
        action_matches = re.findall(self.ACTION_PATTERN, prompt_output, re.DOTALL)

        if action_matches:
            try:
                data = action_matches[-1]
                action_dict = json.loads(data)
                action_dict["tag"] = self.tool.name
                subtask_input = J2("tasks/tool_task/subtask.j2").render(action_json=json.dumps(action_dict))
                subtask = self.add_subtask(ActionsSubtask(subtask_input))

                subtask.before_run()
                subtask.run()
                subtask.after_run()

                if isinstance(subtask.output, ListArtifact):
                    self.output = subtask.output[0]
                else:
                    self.output = InfoArtifact("No tool output")
            except Exception as e:
                self.output = ErrorArtifact(f"Error processing tool input: {e}", exception=e)
            return self.output
        else:
            return ErrorArtifact("No action found in prompt output.")

    def find_tool(self, tool_name: str) -> BaseTool:
        if self.tool.name == tool_name:
            return self.tool
        else:
            raise ValueError(f"Tool with name {tool_name} not found.")

    def find_memory(self, memory_name: str) -> TaskMemory:
        raise NotImplementedError("ToolTask does not support Task Memory.")

    def find_subtask(self, subtask_id: str) -> ActionsSubtask:
        if self.subtask and self.subtask.id == subtask_id:
            return self.subtask
        else:
            raise ValueError(f"Subtask with id {subtask_id} not found.")

    def add_subtask(self, subtask: ActionsSubtask) -> ActionsSubtask:
        self.subtask = subtask
        self.subtask.attach_to(self)

        return self.subtask

    def set_default_tools_memory(self, memory: TaskMemory) -> None:
        self.task_memory = memory

        if self.task_memory:
            if self.tool.input_memory is None:
                self.tool.input_memory = [self.task_memory]
            if self.tool.output_memory is None and self.tool.off_prompt:
                self.tool.output_memory = {getattr(a, "name"): [self.task_memory] for a in self.tool.activities()}

ACTION_PATTERN = '(?s)[^{]*({.*})' class-attribute instance-attribute

subtask: Optional[ActionsSubtask] = field(default=None, kw_only=True) class-attribute instance-attribute

task_memory: Optional[TaskMemory] = field(default=None, kw_only=True) class-attribute instance-attribute

tool: BaseTool = field(kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tasks/tool_task.py
def __attrs_post_init__(self) -> None:
    if self.task_memory is not None:
        self.set_default_tools_memory(self.task_memory)

actions_schema()

Source code in griptape/tasks/tool_task.py
def actions_schema(self) -> Schema:
    return self._actions_schema_for_tools([self.tool])

add_subtask(subtask)

Source code in griptape/tasks/tool_task.py
def add_subtask(self, subtask: ActionsSubtask) -> ActionsSubtask:
    self.subtask = subtask
    self.subtask.attach_to(self)

    return self.subtask

default_system_template_generator(_)

Source code in griptape/tasks/tool_task.py
def default_system_template_generator(self, _: PromptTask) -> str:
    return J2("tasks/tool_task/system.j2").render(
        rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.all_rulesets),
        action_schema=utils.minify_json(json.dumps(self.tool.schema())),
        meta_memory=J2("memory/meta/meta_memory.j2").render(meta_memories=self.meta_memories),
    )

find_memory(memory_name)

Source code in griptape/tasks/tool_task.py
def find_memory(self, memory_name: str) -> TaskMemory:
    raise NotImplementedError("ToolTask does not support Task Memory.")

find_subtask(subtask_id)

Source code in griptape/tasks/tool_task.py
def find_subtask(self, subtask_id: str) -> ActionsSubtask:
    if self.subtask and self.subtask.id == subtask_id:
        return self.subtask
    else:
        raise ValueError(f"Subtask with id {subtask_id} not found.")

find_tool(tool_name)

Source code in griptape/tasks/tool_task.py
def find_tool(self, tool_name: str) -> BaseTool:
    if self.tool.name == tool_name:
        return self.tool
    else:
        raise ValueError(f"Tool with name {tool_name} not found.")

preprocess(structure)

Source code in griptape/tasks/tool_task.py
def preprocess(self, structure: Structure) -> ToolTask:
    super().preprocess(structure)

    if self.task_memory is None and structure.task_memory is not None:
        self.set_default_tools_memory(structure.task_memory)

    return self

run()

Source code in griptape/tasks/tool_task.py
def run(self) -> BaseArtifact:
    prompt_output = self.prompt_driver.run(prompt_stack=self.prompt_stack).to_text()
    action_matches = re.findall(self.ACTION_PATTERN, prompt_output, re.DOTALL)

    if action_matches:
        try:
            data = action_matches[-1]
            action_dict = json.loads(data)
            action_dict["tag"] = self.tool.name
            subtask_input = J2("tasks/tool_task/subtask.j2").render(action_json=json.dumps(action_dict))
            subtask = self.add_subtask(ActionsSubtask(subtask_input))

            subtask.before_run()
            subtask.run()
            subtask.after_run()

            if isinstance(subtask.output, ListArtifact):
                self.output = subtask.output[0]
            else:
                self.output = InfoArtifact("No tool output")
        except Exception as e:
            self.output = ErrorArtifact(f"Error processing tool input: {e}", exception=e)
        return self.output
    else:
        return ErrorArtifact("No action found in prompt output.")

set_default_tools_memory(memory)

Source code in griptape/tasks/tool_task.py
def set_default_tools_memory(self, memory: TaskMemory) -> None:
    self.task_memory = memory

    if self.task_memory:
        if self.tool.input_memory is None:
            self.tool.input_memory = [self.task_memory]
        if self.tool.output_memory is None and self.tool.off_prompt:
            self.tool.output_memory = {getattr(a, "name"): [self.task_memory] for a in self.tool.activities()}

ToolkitTask

Bases: PromptTask, ActionsSubtaskOriginMixin

Source code in griptape/tasks/toolkit_task.py
@define
class ToolkitTask(PromptTask, ActionsSubtaskOriginMixin):
    DEFAULT_MAX_STEPS = 20
    # Stop sequence for chain-of-thought in the framework. Using this "token-like" string to make it more unique,
    # so that it doesn't trigger on accident.
    RESPONSE_STOP_SEQUENCE = "<|Response|>"

    tools: list[BaseTool] = field(factory=list, kw_only=True)
    max_subtasks: int = field(default=DEFAULT_MAX_STEPS, kw_only=True)
    task_memory: Optional[TaskMemory] = field(default=None, kw_only=True)
    subtasks: list[ActionsSubtask] = field(factory=list)
    generate_assistant_subtask_template: Callable[[ActionsSubtask], str] = field(
        default=Factory(lambda self: self.default_assistant_subtask_template_generator, takes_self=True), kw_only=True
    )
    generate_user_subtask_template: Callable[[ActionsSubtask], str] = field(
        default=Factory(lambda self: self.default_user_subtask_template_generator, takes_self=True), kw_only=True
    )
    response_stop_sequence: str = field(default=RESPONSE_STOP_SEQUENCE, kw_only=True)

    def __attrs_post_init__(self) -> None:
        if self.task_memory:
            self.set_default_tools_memory(self.task_memory)

    @tools.validator  # pyright: ignore
    def validate_tools(self, _, tools: list[BaseTool]) -> None:
        tool_names = [t.name for t in tools]

        if len(tool_names) > len(set(tool_names)):
            raise ValueError("tools names have to be unique in task")

    @property
    def tool_output_memory(self) -> list[TaskMemory]:
        unique_memory_dict = {}

        for memories in [tool.output_memory for tool in self.tools if tool.output_memory]:
            for memory_list in memories.values():
                for memory in memory_list:
                    if memory.name not in unique_memory_dict:
                        unique_memory_dict[memory.name] = memory

        return list(unique_memory_dict.values())

    @property
    def prompt_stack(self) -> PromptStack:
        stack = PromptStack()
        memory = self.structure.conversation_memory

        stack.add_system_input(self.generate_system_template(self))

        stack.add_user_input(self.input.to_text())

        if self.output:
            stack.add_assistant_input(self.output.to_text())
        else:
            for s in self.subtasks:
                stack.add_assistant_input(self.generate_assistant_subtask_template(s))
                stack.add_user_input(self.generate_user_subtask_template(s))

        if memory:
            # inserting at index 1 to place memory right after system prompt
            memory.add_to_prompt_stack(stack, 1)

        return stack

    def preprocess(self, structure: Structure) -> ToolkitTask:
        super().preprocess(structure)

        if self.task_memory is None and structure.task_memory:
            self.set_default_tools_memory(structure.task_memory)

        return self

    def default_system_template_generator(self, _: PromptTask) -> str:
        schema = self.actions_schema().json_schema("Actions Schema")
        schema["minItems"] = 1  # The `schema` library doesn't support `minItems` so we must add it manually.

        return J2("tasks/toolkit_task/system.j2").render(
            rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.all_rulesets),
            action_names=str.join(", ", [tool.name for tool in self.tools]),
            actions_schema=utils.minify_json(json.dumps(schema)),
            meta_memory=J2("memory/meta/meta_memory.j2").render(meta_memories=self.meta_memories),
            stop_sequence=self.response_stop_sequence,
        )

    def default_assistant_subtask_template_generator(self, subtask: ActionsSubtask) -> str:
        return J2("tasks/toolkit_task/assistant_subtask.j2").render(
            stop_sequence=self.response_stop_sequence, subtask=subtask
        )

    def default_user_subtask_template_generator(self, subtask: ActionsSubtask) -> str:
        return J2("tasks/toolkit_task/user_subtask.j2").render(
            stop_sequence=self.response_stop_sequence, subtask=subtask
        )

    def actions_schema(self) -> Schema:
        return self._actions_schema_for_tools(self.tools)

    def set_default_tools_memory(self, memory: TaskMemory) -> None:
        self.task_memory = memory

        for tool in self.tools:
            if self.task_memory:
                if tool.input_memory is None:
                    tool.input_memory = [self.task_memory]
                if tool.output_memory is None and tool.off_prompt:
                    tool.output_memory = {getattr(a, "name"): [self.task_memory] for a in tool.activities()}

    def run(self) -> BaseArtifact:
        from griptape.tasks import ActionsSubtask

        self.subtasks.clear()

        self.prompt_driver.tokenizer.stop_sequences.extend([self.response_stop_sequence])
        subtask = self.add_subtask(ActionsSubtask(self.prompt_driver.run(prompt_stack=self.prompt_stack).to_text()))

        while True:
            if subtask.output is None:
                if len(self.subtasks) >= self.max_subtasks:
                    subtask.output = ErrorArtifact(f"Exceeded tool limit of {self.max_subtasks} subtasks per task")
                elif not subtask.actions:
                    # handle case when the LLM failed to follow the ReAct prompt and didn't return a proper action
                    subtask.output = subtask.input
                else:
                    subtask.before_run()
                    subtask.run()
                    subtask.after_run()

                    subtask = self.add_subtask(
                        ActionsSubtask(self.prompt_driver.run(prompt_stack=self.prompt_stack).to_text())
                    )
            else:
                break

        self.output = subtask.output

        return self.output

    def find_subtask(self, subtask_id: str) -> ActionsSubtask:
        for subtask in self.subtasks:
            if subtask.id == subtask_id:
                return subtask
        raise ValueError(f"Subtask with id {subtask_id} not found.")

    def add_subtask(self, subtask: ActionsSubtask) -> ActionsSubtask:
        subtask.attach_to(self)

        if len(self.subtasks) > 0:
            self.subtasks[-1].add_child(subtask)
            subtask.add_parent(self.subtasks[-1])

        self.subtasks.append(subtask)

        return subtask

    def find_tool(self, tool_name: str) -> BaseTool:
        for tool in self.tools:
            if tool.name == tool_name:
                return tool
        raise ValueError(f"Tool with name {tool_name} not found.")

    def find_memory(self, memory_name: str) -> TaskMemory:
        for memory in self.tool_output_memory:
            if memory.name == memory_name:
                return memory
        raise ValueError(f"Memory with name {memory_name} not found.")

DEFAULT_MAX_STEPS = 20 class-attribute instance-attribute

RESPONSE_STOP_SEQUENCE = '<|Response|>' class-attribute instance-attribute

generate_assistant_subtask_template: Callable[[ActionsSubtask], str] = field(default=Factory(lambda self: self.default_assistant_subtask_template_generator, takes_self=True), kw_only=True) class-attribute instance-attribute

generate_user_subtask_template: Callable[[ActionsSubtask], str] = field(default=Factory(lambda self: self.default_user_subtask_template_generator, takes_self=True), kw_only=True) class-attribute instance-attribute

max_subtasks: int = field(default=DEFAULT_MAX_STEPS, kw_only=True) class-attribute instance-attribute

prompt_stack: PromptStack property

response_stop_sequence: str = field(default=RESPONSE_STOP_SEQUENCE, kw_only=True) class-attribute instance-attribute

subtasks: list[ActionsSubtask] = field(factory=list) class-attribute instance-attribute

task_memory: Optional[TaskMemory] = field(default=None, kw_only=True) class-attribute instance-attribute

tool_output_memory: list[TaskMemory] property

tools: list[BaseTool] = field(factory=list, kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tasks/toolkit_task.py
def __attrs_post_init__(self) -> None:
    if self.task_memory:
        self.set_default_tools_memory(self.task_memory)

actions_schema()

Source code in griptape/tasks/toolkit_task.py
def actions_schema(self) -> Schema:
    return self._actions_schema_for_tools(self.tools)

add_subtask(subtask)

Source code in griptape/tasks/toolkit_task.py
def add_subtask(self, subtask: ActionsSubtask) -> ActionsSubtask:
    subtask.attach_to(self)

    if len(self.subtasks) > 0:
        self.subtasks[-1].add_child(subtask)
        subtask.add_parent(self.subtasks[-1])

    self.subtasks.append(subtask)

    return subtask

default_assistant_subtask_template_generator(subtask)

Source code in griptape/tasks/toolkit_task.py
def default_assistant_subtask_template_generator(self, subtask: ActionsSubtask) -> str:
    return J2("tasks/toolkit_task/assistant_subtask.j2").render(
        stop_sequence=self.response_stop_sequence, subtask=subtask
    )

default_system_template_generator(_)

Source code in griptape/tasks/toolkit_task.py
def default_system_template_generator(self, _: PromptTask) -> str:
    schema = self.actions_schema().json_schema("Actions Schema")
    schema["minItems"] = 1  # The `schema` library doesn't support `minItems` so we must add it manually.

    return J2("tasks/toolkit_task/system.j2").render(
        rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.all_rulesets),
        action_names=str.join(", ", [tool.name for tool in self.tools]),
        actions_schema=utils.minify_json(json.dumps(schema)),
        meta_memory=J2("memory/meta/meta_memory.j2").render(meta_memories=self.meta_memories),
        stop_sequence=self.response_stop_sequence,
    )

default_user_subtask_template_generator(subtask)

Source code in griptape/tasks/toolkit_task.py
def default_user_subtask_template_generator(self, subtask: ActionsSubtask) -> str:
    return J2("tasks/toolkit_task/user_subtask.j2").render(
        stop_sequence=self.response_stop_sequence, subtask=subtask
    )

find_memory(memory_name)

Source code in griptape/tasks/toolkit_task.py
def find_memory(self, memory_name: str) -> TaskMemory:
    for memory in self.tool_output_memory:
        if memory.name == memory_name:
            return memory
    raise ValueError(f"Memory with name {memory_name} not found.")

find_subtask(subtask_id)

Source code in griptape/tasks/toolkit_task.py
def find_subtask(self, subtask_id: str) -> ActionsSubtask:
    for subtask in self.subtasks:
        if subtask.id == subtask_id:
            return subtask
    raise ValueError(f"Subtask with id {subtask_id} not found.")

find_tool(tool_name)

Source code in griptape/tasks/toolkit_task.py
def find_tool(self, tool_name: str) -> BaseTool:
    for tool in self.tools:
        if tool.name == tool_name:
            return tool
    raise ValueError(f"Tool with name {tool_name} not found.")

preprocess(structure)

Source code in griptape/tasks/toolkit_task.py
def preprocess(self, structure: Structure) -> ToolkitTask:
    super().preprocess(structure)

    if self.task_memory is None and structure.task_memory:
        self.set_default_tools_memory(structure.task_memory)

    return self

run()

Source code in griptape/tasks/toolkit_task.py
def run(self) -> BaseArtifact:
    from griptape.tasks import ActionsSubtask

    self.subtasks.clear()

    self.prompt_driver.tokenizer.stop_sequences.extend([self.response_stop_sequence])
    subtask = self.add_subtask(ActionsSubtask(self.prompt_driver.run(prompt_stack=self.prompt_stack).to_text()))

    while True:
        if subtask.output is None:
            if len(self.subtasks) >= self.max_subtasks:
                subtask.output = ErrorArtifact(f"Exceeded tool limit of {self.max_subtasks} subtasks per task")
            elif not subtask.actions:
                # handle case when the LLM failed to follow the ReAct prompt and didn't return a proper action
                subtask.output = subtask.input
            else:
                subtask.before_run()
                subtask.run()
                subtask.after_run()

                subtask = self.add_subtask(
                    ActionsSubtask(self.prompt_driver.run(prompt_stack=self.prompt_stack).to_text())
                )
        else:
            break

    self.output = subtask.output

    return self.output

set_default_tools_memory(memory)

Source code in griptape/tasks/toolkit_task.py
def set_default_tools_memory(self, memory: TaskMemory) -> None:
    self.task_memory = memory

    for tool in self.tools:
        if self.task_memory:
            if tool.input_memory is None:
                tool.input_memory = [self.task_memory]
            if tool.output_memory is None and tool.off_prompt:
                tool.output_memory = {getattr(a, "name"): [self.task_memory] for a in tool.activities()}

validate_tools(_, tools)

Source code in griptape/tasks/toolkit_task.py
@tools.validator  # pyright: ignore
def validate_tools(self, _, tools: list[BaseTool]) -> None:
    tool_names = [t.name for t in tools]

    if len(tool_names) > len(set(tool_names)):
        raise ValueError("tools names have to be unique in task")