Skip to content

tasks

__all__ = ['BaseTask', 'BaseTextInputTask', 'PromptTask', 'ActionsSubtask', 'ToolkitTask', 'TextSummaryTask', 'ToolTask', 'RagTask', 'ExtractionTask', 'BaseImageGenerationTask', 'CodeExecutionTask', 'PromptImageGenerationTask', 'VariationImageGenerationTask', 'InpaintingImageGenerationTask', 'OutpaintingImageGenerationTask', 'BaseAudioGenerationTask', 'TextToSpeechTask', 'StructureRunTask', 'AudioTranscriptionTask', 'AssistantTask', 'BranchTask'] module-attribute

ActionsSubtask

Bases: BaseTask

Source code in griptape/tasks/actions_subtask.py
@define
class ActionsSubtask(BaseTask):
    THOUGHT_PATTERN = r"(?s)^Thought:\s*(.*?)$"
    ACTIONS_PATTERN = r"(?s)Actions:[^\[]*(\[.*\])"
    ANSWER_PATTERN = r"(?s)^Answer:\s?([\s\S]*)$"

    thought: Optional[str] = field(default=None, kw_only=True)
    actions: list[ToolAction] = field(factory=list, kw_only=True)
    output: Optional[BaseArtifact] = field(default=None, init=False)
    _input: Union[str, list, tuple, BaseArtifact, Callable[[BaseTask], BaseArtifact]] = field(
        default=lambda task: task.full_context["args"][0] if task.full_context["args"] else TextArtifact(value=""),
        alias="input",
    )
    _memory: Optional[TaskMemory] = None
    _origin_task: Optional[BaseTask] = field(default=None, kw_only=True)

    @property
    def input(self) -> TextArtifact | ListArtifact:
        return self._process_task_input(self._input)

    @input.setter
    def input(self, value: str | list | tuple | BaseArtifact | Callable[[BaseTask], BaseArtifact]) -> None:
        self._input = value

    @property
    def origin_task(self) -> BaseTask:
        if self._origin_task is not None:
            return self._origin_task
        else:
            raise Exception("ActionSubtask has no origin task.")

    @property
    def parents(self) -> list[BaseTask]:
        if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
            return [self.origin_task.find_subtask(parent_id) for parent_id in self.parent_ids]
        else:
            raise Exception("ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin.")

    @property
    def children(self) -> list[BaseTask]:
        if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
            return [self.origin_task.find_subtask(child_id) for child_id in self.child_ids]
        else:
            raise Exception("ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin.")

    def add_child(self, child: BaseTask) -> BaseTask:
        if child.id not in self.child_ids:
            self.child_ids.append(child.id)
        return child

    def add_parent(self, parent: BaseTask) -> BaseTask:
        if parent.id not in self.parent_ids:
            self.parent_ids.append(parent.id)
        return parent

    def attach_to(self, parent_task: BaseTask) -> None:
        self._origin_task = parent_task
        self.structure = parent_task.structure

        try:
            if isinstance(self.input, TextArtifact):
                self.__init_from_prompt(self.input.to_text())
            else:
                self.__init_from_artifacts(self.input)

            structured_outputs = [a for a in self.actions if isinstance(a.tool, StructuredOutputTool)]
            if structured_outputs:
                output_values = [JsonArtifact(a.input["values"]) for a in structured_outputs]
                if len(structured_outputs) > 1:
                    self.output = ListArtifact(output_values)
                else:
                    self.output = output_values[0]
        except Exception as e:
            logger.error("Subtask %s\nError parsing tool action: %s", self.origin_task.id, e)

            self.output = ErrorArtifact(f"ToolAction input parsing error: {e}", exception=e)

    def before_run(self) -> None:
        EventBus.publish_event(
            StartActionsSubtaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
                subtask_parent_task_id=self.origin_task.id,
                subtask_thought=self.thought,
                subtask_actions=self.actions_to_dicts(),
            ),
        )

        parts = [
            f"Subtask {self.id}",
            *([f"\nThought: {self.thought}"] if self.thought else []),
            f"\nActions: {self.actions_to_json()}",
        ]
        logger.info("".join(parts))

    def try_run(self) -> BaseArtifact:
        try:
            if any(isinstance(a.output, ErrorArtifact) for a in self.actions):
                errors = [a.output.value for a in self.actions if isinstance(a.output, ErrorArtifact)]

                self.output = ErrorArtifact("\n\n".join(errors))
            else:
                results = self.run_actions(self.actions)

                actions_output = []
                for result in results:
                    tag, output = result
                    output.name = f"{tag} output"

                    actions_output.append(output)
                self.output = ListArtifact(actions_output)
        except Exception as e:
            logger.exception("Subtask %s\n%s", self.id, e)

            self.output = ErrorArtifact(str(e), exception=e)
        if self.output is not None:
            return self.output
        else:
            return ErrorArtifact("no tool output")

    def run_actions(self, actions: list[ToolAction]) -> list[tuple[str, BaseArtifact]]:
        with self.create_futures_executor() as futures_executor:
            return utils.execute_futures_list(
                [futures_executor.submit(with_contextvars(self.run_action), a) for a in actions]
            )

    def run_action(self, action: ToolAction) -> tuple[str, BaseArtifact]:
        if action.tool is not None:
            if action.path is not None:
                output = action.tool.run(getattr(action.tool, action.path), self, action)
            else:
                output = ErrorArtifact("action path not found")
        else:
            output = ErrorArtifact("action name not found")
        action.output = output

        return action.tag, output

    def after_run(self) -> None:
        response = self.output.to_text() if isinstance(self.output, BaseArtifact) else str(self.output)

        EventBus.publish_event(
            FinishActionsSubtaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
                subtask_parent_task_id=self.origin_task.id,
                subtask_thought=self.thought,
                subtask_actions=self.actions_to_dicts(),
            ),
        )
        logger.info("Subtask %s\nResponse: %s", self.id, response)

    def actions_to_dicts(self) -> list[dict]:
        json_list = []

        for action in self.actions:
            json_dict = {}

            if action.tag:
                json_dict["tag"] = action.tag

            if action.name:
                json_dict["name"] = action.name

            if action.path:
                json_dict["path"] = action.path

            if action.input:
                json_dict["input"] = action.input

            json_list.append(json_dict)

        return json_list

    def actions_to_json(self) -> str:
        return json.dumps(self.actions_to_dicts(), indent=2)

    def _process_task_input(
        self,
        task_input: Union[str, tuple, list, BaseArtifact, Callable[[BaseTask], BaseArtifact]],
    ) -> Union[TextArtifact, ListArtifact]:
        if isinstance(task_input, (TextArtifact, ListArtifact)):
            return task_input
        elif isinstance(task_input, ActionArtifact):
            return ListArtifact([task_input])
        elif isinstance(task_input, Callable):
            return self._process_task_input(task_input(self))
        elif isinstance(task_input, str):
            return self._process_task_input(TextArtifact(task_input))
        elif isinstance(task_input, (list, tuple)):
            return ListArtifact([self._process_task_input(elem) for elem in task_input])
        else:
            raise ValueError(f"Invalid input type: {type(task_input)} ")

    def __init_from_prompt(self, value: str) -> None:
        thought_matches = re.findall(self.THOUGHT_PATTERN, value, re.MULTILINE)
        actions_matches = re.findall(self.ACTIONS_PATTERN, value, re.DOTALL)
        answer_matches = re.findall(self.ANSWER_PATTERN, value, re.MULTILINE)

        self.actions = self.__parse_actions(actions_matches)

        if thought_matches:
            self.thought = thought_matches[-1]

        if not self.actions and self.output is None:
            if answer_matches:
                # A direct answer is provided, set it as the output.
                self.output = TextArtifact(answer_matches[-1])
            else:
                # The LLM failed to follow the ReAct prompt, set the LLM's raw response as the output.
                self.output = TextArtifact(value)

    def __init_from_artifacts(self, artifacts: ListArtifact) -> None:
        """Parses the input Artifacts to extract the thought and actions.

        Text Artifacts are used to extract the thought, and ToolAction Artifacts are used to extract the actions.

        Args:
            artifacts: The input Artifacts.

        Returns:
            None
        """
        self.actions = [
            self.__process_action_object(artifact.value.to_dict())
            for artifact in artifacts.value
            if isinstance(artifact, ActionArtifact)
        ]

        # When parsing from Artifacts we can't determine the thought unless there are also Actions
        if self.actions:
            thoughts = [artifact.value for artifact in artifacts.value if isinstance(artifact, TextArtifact)]
            if thoughts:
                self.thought = thoughts[0]
        else:
            if self.output is None:
                self.output = TextArtifact(artifacts.to_text())

    def __parse_actions(self, actions_matches: list[str]) -> list[ToolAction]:
        if len(actions_matches) == 0:
            return []
        try:
            data = actions_matches[-1]
            actions_list: list[dict] = json.loads(data, strict=False)

            return [self.__process_action_object(action_object) for action_object in actions_list]
        except json.JSONDecodeError as e:
            logger.exception("Subtask %s\nInvalid actions JSON: %s", self.origin_task.id, e)

            self.output = ErrorArtifact(f"Actions JSON decoding error: {e}", exception=e)

            return []

    def __process_action_object(self, action_object: dict) -> ToolAction:
        # Load action tag; throw exception if the key is not present
        action_tag = action_object["tag"]

        # Load action name; throw exception if the key is not present
        action_name = action_object["name"]

        # Load action method; throw exception if the key is not present
        action_path = action_object["path"]

        # Load optional input value; don't throw exceptions if key is not present
        if "input" in action_object:
            # Some LLMs don't support nested parameters and therefore won't generate "values".
            # So we need to manually add it here.
            if "values" not in action_object["input"]:
                action_object["input"] = {"values": action_object["input"]}

            # The schema library has a bug, where something like `Or(str, None)` doesn't get
            # correctly translated into JSON schema. For some optional input fields LLMs sometimes
            # still provide null value, which trips up the validator. The temporary solution that
            # works is to strip all key-values where value is null.
            action_input = remove_null_values_in_dict_recursively(action_object["input"])
        else:
            action_input = {}

        # Load the action itself
        if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
            tool = self.origin_task.find_tool(action_name)
        else:
            raise Exception("ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin.")

        action = ToolAction(tag=action_tag, name=action_name, path=action_path, input=action_input, tool=tool)

        self.__validate_action(action)

        return action

    def __validate_action(self, action: ToolAction) -> None:
        try:
            if action.tool is not None:
                if action.path is not None:
                    activity = getattr(action.tool, action.path)
                else:
                    raise Exception("ToolAction path not found.")

                if activity is not None:
                    activity_schema = action.tool.activity_schema(activity)
                else:
                    raise Exception("Activity not found.")

                if activity_schema is not None and action.input is not None:
                    activity_schema.validate(action.input)
        except schema.SchemaError as e:
            logger.exception("Subtask %s\nInvalid action JSON: %s", self.origin_task.id, e)

            action.output = ErrorArtifact(f"Activity input JSON validation error: {e}", exception=e)
        except SyntaxError as e:
            logger.exception("Subtask %s\nSyntax error: %s", self.origin_task.id, e)

            action.output = ErrorArtifact(f"Syntax error: {e}", exception=e)

ACTIONS_PATTERN = '(?s)Actions:[^\\[]*(\\[.*\\])' class-attribute instance-attribute

ANSWER_PATTERN = '(?s)^Answer:\\s?([\\s\\S]*)$' class-attribute instance-attribute

THOUGHT_PATTERN = '(?s)^Thought:\\s*(.*?)$' class-attribute instance-attribute

actions: list[ToolAction] = field(factory=list, kw_only=True) class-attribute instance-attribute

children: list[BaseTask] property

input: TextArtifact | ListArtifact property writable

origin_task: BaseTask property

output: Optional[BaseArtifact] = field(default=None, init=False) class-attribute instance-attribute

parents: list[BaseTask] property

thought: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

__init_from_artifacts(artifacts)

Parses the input Artifacts to extract the thought and actions.

Text Artifacts are used to extract the thought, and ToolAction Artifacts are used to extract the actions.

Parameters:

Name Type Description Default
artifacts ListArtifact

The input Artifacts.

required

Returns:

Type Description
None

None

Source code in griptape/tasks/actions_subtask.py
def __init_from_artifacts(self, artifacts: ListArtifact) -> None:
    """Parses the input Artifacts to extract the thought and actions.

    Text Artifacts are used to extract the thought, and ToolAction Artifacts are used to extract the actions.

    Args:
        artifacts: The input Artifacts.

    Returns:
        None
    """
    self.actions = [
        self.__process_action_object(artifact.value.to_dict())
        for artifact in artifacts.value
        if isinstance(artifact, ActionArtifact)
    ]

    # When parsing from Artifacts we can't determine the thought unless there are also Actions
    if self.actions:
        thoughts = [artifact.value for artifact in artifacts.value if isinstance(artifact, TextArtifact)]
        if thoughts:
            self.thought = thoughts[0]
    else:
        if self.output is None:
            self.output = TextArtifact(artifacts.to_text())

__init_from_prompt(value)

Source code in griptape/tasks/actions_subtask.py
def __init_from_prompt(self, value: str) -> None:
    thought_matches = re.findall(self.THOUGHT_PATTERN, value, re.MULTILINE)
    actions_matches = re.findall(self.ACTIONS_PATTERN, value, re.DOTALL)
    answer_matches = re.findall(self.ANSWER_PATTERN, value, re.MULTILINE)

    self.actions = self.__parse_actions(actions_matches)

    if thought_matches:
        self.thought = thought_matches[-1]

    if not self.actions and self.output is None:
        if answer_matches:
            # A direct answer is provided, set it as the output.
            self.output = TextArtifact(answer_matches[-1])
        else:
            # The LLM failed to follow the ReAct prompt, set the LLM's raw response as the output.
            self.output = TextArtifact(value)

__parse_actions(actions_matches)

Source code in griptape/tasks/actions_subtask.py
def __parse_actions(self, actions_matches: list[str]) -> list[ToolAction]:
    if len(actions_matches) == 0:
        return []
    try:
        data = actions_matches[-1]
        actions_list: list[dict] = json.loads(data, strict=False)

        return [self.__process_action_object(action_object) for action_object in actions_list]
    except json.JSONDecodeError as e:
        logger.exception("Subtask %s\nInvalid actions JSON: %s", self.origin_task.id, e)

        self.output = ErrorArtifact(f"Actions JSON decoding error: {e}", exception=e)

        return []

__process_action_object(action_object)

Source code in griptape/tasks/actions_subtask.py
def __process_action_object(self, action_object: dict) -> ToolAction:
    # Load action tag; throw exception if the key is not present
    action_tag = action_object["tag"]

    # Load action name; throw exception if the key is not present
    action_name = action_object["name"]

    # Load action method; throw exception if the key is not present
    action_path = action_object["path"]

    # Load optional input value; don't throw exceptions if key is not present
    if "input" in action_object:
        # Some LLMs don't support nested parameters and therefore won't generate "values".
        # So we need to manually add it here.
        if "values" not in action_object["input"]:
            action_object["input"] = {"values": action_object["input"]}

        # The schema library has a bug, where something like `Or(str, None)` doesn't get
        # correctly translated into JSON schema. For some optional input fields LLMs sometimes
        # still provide null value, which trips up the validator. The temporary solution that
        # works is to strip all key-values where value is null.
        action_input = remove_null_values_in_dict_recursively(action_object["input"])
    else:
        action_input = {}

    # Load the action itself
    if isinstance(self.origin_task, ActionsSubtaskOriginMixin):
        tool = self.origin_task.find_tool(action_name)
    else:
        raise Exception("ActionSubtask must be attached to a Task that implements ActionSubtaskOriginMixin.")

    action = ToolAction(tag=action_tag, name=action_name, path=action_path, input=action_input, tool=tool)

    self.__validate_action(action)

    return action

__validate_action(action)

Source code in griptape/tasks/actions_subtask.py
def __validate_action(self, action: ToolAction) -> None:
    try:
        if action.tool is not None:
            if action.path is not None:
                activity = getattr(action.tool, action.path)
            else:
                raise Exception("ToolAction path not found.")

            if activity is not None:
                activity_schema = action.tool.activity_schema(activity)
            else:
                raise Exception("Activity not found.")

            if activity_schema is not None and action.input is not None:
                activity_schema.validate(action.input)
    except schema.SchemaError as e:
        logger.exception("Subtask %s\nInvalid action JSON: %s", self.origin_task.id, e)

        action.output = ErrorArtifact(f"Activity input JSON validation error: {e}", exception=e)
    except SyntaxError as e:
        logger.exception("Subtask %s\nSyntax error: %s", self.origin_task.id, e)

        action.output = ErrorArtifact(f"Syntax error: {e}", exception=e)

actions_to_dicts()

Source code in griptape/tasks/actions_subtask.py
def actions_to_dicts(self) -> list[dict]:
    json_list = []

    for action in self.actions:
        json_dict = {}

        if action.tag:
            json_dict["tag"] = action.tag

        if action.name:
            json_dict["name"] = action.name

        if action.path:
            json_dict["path"] = action.path

        if action.input:
            json_dict["input"] = action.input

        json_list.append(json_dict)

    return json_list

actions_to_json()

Source code in griptape/tasks/actions_subtask.py
def actions_to_json(self) -> str:
    return json.dumps(self.actions_to_dicts(), indent=2)

add_child(child)

Source code in griptape/tasks/actions_subtask.py
def add_child(self, child: BaseTask) -> BaseTask:
    if child.id not in self.child_ids:
        self.child_ids.append(child.id)
    return child

add_parent(parent)

Source code in griptape/tasks/actions_subtask.py
def add_parent(self, parent: BaseTask) -> BaseTask:
    if parent.id not in self.parent_ids:
        self.parent_ids.append(parent.id)
    return parent

after_run()

Source code in griptape/tasks/actions_subtask.py
def after_run(self) -> None:
    response = self.output.to_text() if isinstance(self.output, BaseArtifact) else str(self.output)

    EventBus.publish_event(
        FinishActionsSubtaskEvent(
            task_id=self.id,
            task_parent_ids=self.parent_ids,
            task_child_ids=self.child_ids,
            task_input=self.input,
            task_output=self.output,
            subtask_parent_task_id=self.origin_task.id,
            subtask_thought=self.thought,
            subtask_actions=self.actions_to_dicts(),
        ),
    )
    logger.info("Subtask %s\nResponse: %s", self.id, response)

attach_to(parent_task)

Source code in griptape/tasks/actions_subtask.py
def attach_to(self, parent_task: BaseTask) -> None:
    self._origin_task = parent_task
    self.structure = parent_task.structure

    try:
        if isinstance(self.input, TextArtifact):
            self.__init_from_prompt(self.input.to_text())
        else:
            self.__init_from_artifacts(self.input)

        structured_outputs = [a for a in self.actions if isinstance(a.tool, StructuredOutputTool)]
        if structured_outputs:
            output_values = [JsonArtifact(a.input["values"]) for a in structured_outputs]
            if len(structured_outputs) > 1:
                self.output = ListArtifact(output_values)
            else:
                self.output = output_values[0]
    except Exception as e:
        logger.error("Subtask %s\nError parsing tool action: %s", self.origin_task.id, e)

        self.output = ErrorArtifact(f"ToolAction input parsing error: {e}", exception=e)

before_run()

Source code in griptape/tasks/actions_subtask.py
def before_run(self) -> None:
    EventBus.publish_event(
        StartActionsSubtaskEvent(
            task_id=self.id,
            task_parent_ids=self.parent_ids,
            task_child_ids=self.child_ids,
            task_input=self.input,
            task_output=self.output,
            subtask_parent_task_id=self.origin_task.id,
            subtask_thought=self.thought,
            subtask_actions=self.actions_to_dicts(),
        ),
    )

    parts = [
        f"Subtask {self.id}",
        *([f"\nThought: {self.thought}"] if self.thought else []),
        f"\nActions: {self.actions_to_json()}",
    ]
    logger.info("".join(parts))

run_action(action)

Source code in griptape/tasks/actions_subtask.py
def run_action(self, action: ToolAction) -> tuple[str, BaseArtifact]:
    if action.tool is not None:
        if action.path is not None:
            output = action.tool.run(getattr(action.tool, action.path), self, action)
        else:
            output = ErrorArtifact("action path not found")
    else:
        output = ErrorArtifact("action name not found")
    action.output = output

    return action.tag, output

run_actions(actions)

Source code in griptape/tasks/actions_subtask.py
def run_actions(self, actions: list[ToolAction]) -> list[tuple[str, BaseArtifact]]:
    with self.create_futures_executor() as futures_executor:
        return utils.execute_futures_list(
            [futures_executor.submit(with_contextvars(self.run_action), a) for a in actions]
        )

try_run()

Source code in griptape/tasks/actions_subtask.py
def try_run(self) -> BaseArtifact:
    try:
        if any(isinstance(a.output, ErrorArtifact) for a in self.actions):
            errors = [a.output.value for a in self.actions if isinstance(a.output, ErrorArtifact)]

            self.output = ErrorArtifact("\n\n".join(errors))
        else:
            results = self.run_actions(self.actions)

            actions_output = []
            for result in results:
                tag, output = result
                output.name = f"{tag} output"

                actions_output.append(output)
            self.output = ListArtifact(actions_output)
    except Exception as e:
        logger.exception("Subtask %s\n%s", self.id, e)

        self.output = ErrorArtifact(str(e), exception=e)
    if self.output is not None:
        return self.output
    else:
        return ErrorArtifact("no tool output")

AssistantTask

Bases: BaseTextInputTask

Task to run an Assistant.

Attributes:

Name Type Description
assistant_driver BaseAssistantDriver

Driver to run the Assistant.

Source code in griptape/tasks/assistant_task.py
@define
class AssistantTask(BaseTextInputTask):
    """Task to run an Assistant.

    Attributes:
        assistant_driver: Driver to run the Assistant.
    """

    assistant_driver: BaseAssistantDriver = field(kw_only=True)

    def try_run(self) -> BaseArtifact:
        return self.assistant_driver.run(self.input)

assistant_driver: BaseAssistantDriver = field(kw_only=True) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/assistant_task.py
def try_run(self) -> BaseArtifact:
    return self.assistant_driver.run(self.input)

AudioTranscriptionTask

Bases: BaseAudioInputTask

Source code in griptape/tasks/audio_transcription_task.py
@define
class AudioTranscriptionTask(BaseAudioInputTask):
    audio_transcription_driver: BaseAudioTranscriptionDriver = field(
        default=Factory(lambda: Defaults.drivers_config.audio_transcription_driver),
        kw_only=True,
    )

    def try_run(self) -> TextArtifact:
        return self.audio_transcription_driver.run(self.input)

audio_transcription_driver: BaseAudioTranscriptionDriver = field(default=Factory(lambda: Defaults.drivers_config.audio_transcription_driver), kw_only=True) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/audio_transcription_task.py
def try_run(self) -> TextArtifact:
    return self.audio_transcription_driver.run(self.input)

BaseAudioGenerationTask

Bases: ArtifactFileOutputMixin, RuleMixin, BaseTask, ABC

Source code in griptape/tasks/base_audio_generation_task.py
@define
class BaseAudioGenerationTask(ArtifactFileOutputMixin, RuleMixin, BaseTask, ABC):
    def before_run(self) -> None:
        super().before_run()

        logger.info("%s %s\nInput: %s", self.__class__.__name__, self.id, self.input.to_text())

    def after_run(self) -> None:
        super().after_run()

        logger.info(
            "%s %s\nOutput: %s",
            self.__class__.__name__,
            self.id,
            self.output.to_text() if self.output is not None else "",
        )

after_run()

Source code in griptape/tasks/base_audio_generation_task.py
def after_run(self) -> None:
    super().after_run()

    logger.info(
        "%s %s\nOutput: %s",
        self.__class__.__name__,
        self.id,
        self.output.to_text() if self.output is not None else "",
    )

before_run()

Source code in griptape/tasks/base_audio_generation_task.py
def before_run(self) -> None:
    super().before_run()

    logger.info("%s %s\nInput: %s", self.__class__.__name__, self.id, self.input.to_text())

BaseImageGenerationTask

Bases: ArtifactFileOutputMixin, RuleMixin, BaseTask, ABC

Provides a base class for image generation-related tasks.

Attributes:

Name Type Description
negative_rulesets list[Ruleset]

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules list[Rule]

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir list[Rule]

If provided, the generated image will be written to disk in output_dir.

output_file list[Rule]

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/base_image_generation_task.py
@define
class BaseImageGenerationTask(ArtifactFileOutputMixin, RuleMixin, BaseTask, ABC):
    """Provides a base class for image generation-related tasks.

    Attributes:
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    DEFAULT_NEGATIVE_RULESET_NAME = "Negative Ruleset"

    image_generation_driver: BaseImageGenerationDriver = field(
        default=Factory(lambda: Defaults.drivers_config.image_generation_driver),
        kw_only=True,
    )
    _negative_rulesets: list[Ruleset] = field(factory=list, kw_only=True, alias="negative_rulesets")
    negative_rules: list[Rule] = field(factory=list, kw_only=True)

    @property
    def negative_rulesets(self) -> list[Ruleset]:
        negative_rulesets = self._negative_rulesets

        if self.negative_rules:
            negative_rulesets.append(Ruleset(name=self.DEFAULT_NEGATIVE_RULESET_NAME, rules=self.negative_rules))

        return negative_rulesets

    def _read_from_file(self, path: str) -> ImageArtifact:
        logger.info("Reading image from %s", os.path.abspath(path))
        return ImageLoader().load(Path(path))

    def _get_prompts(self, prompt: str) -> list[str]:
        return [prompt, *[rule.value for ruleset in self.rulesets for rule in ruleset.rules]]

    def _get_negative_prompts(self) -> list[str]:
        return [rule.value for ruleset in self.negative_rulesets for rule in ruleset.rules]

DEFAULT_NEGATIVE_RULESET_NAME = 'Negative Ruleset' class-attribute instance-attribute

image_generation_driver: BaseImageGenerationDriver = field(default=Factory(lambda: Defaults.drivers_config.image_generation_driver), kw_only=True) class-attribute instance-attribute

negative_rules: list[Rule] = field(factory=list, kw_only=True) class-attribute instance-attribute

negative_rulesets: list[Ruleset] property

BaseTask

Bases: FuturesExecutorMixin, SerializableMixin, RunnableMixin['BaseTask'], ABC

Source code in griptape/tasks/base_task.py
@define
class BaseTask(FuturesExecutorMixin, SerializableMixin, RunnableMixin["BaseTask"], ABC):
    class State(Enum):
        PENDING = 1
        RUNNING = 2
        FINISHED = 3
        SKIPPED = 4

    id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={"serializable": True})
    state: State = field(default=State.PENDING, kw_only=True, metadata={"serializable": True})
    parent_ids: list[str] = field(factory=list, kw_only=True, metadata={"serializable": True})
    child_ids: list[str] = field(factory=list, kw_only=True, metadata={"serializable": True})
    max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True, metadata={"serializable": True})
    structure: Optional[Structure] = field(default=None, kw_only=True)

    output: Optional[BaseArtifact] = field(default=None, init=False)
    context: dict[str, Any] = field(factory=dict, kw_only=True, metadata={"serializable": True})

    def __rshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]:
        if isinstance(other, list):
            self.add_children(other)
        else:
            self.add_child(other)

        return other

    def __lshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]:
        if isinstance(other, list):
            self.add_parents(other)
        else:
            self.add_parent(other)

        return other

    def __attrs_post_init__(self) -> None:
        if self.structure is not None:
            self.structure.add_task(self)

    @property
    @abstractmethod
    def input(self) -> BaseArtifact: ...

    @property
    def parents(self) -> list[BaseTask]:
        if self.structure is not None:
            return [self.structure.find_task(parent_id) for parent_id in self.parent_ids]
        raise ValueError("Structure must be set to access parents")

    @property
    def children(self) -> list[BaseTask]:
        if self.structure is not None:
            return [self.structure.find_task(child_id) for child_id in self.child_ids]
        raise ValueError("Structure must be set to access children")

    @property
    def parent_outputs(self) -> dict[str, BaseArtifact]:
        return {parent.id: parent.output for parent in self.parents if parent.output}

    @property
    def parents_output_text(self) -> str:
        return "\n".join([parent.output.to_text() for parent in self.parents if parent.output])

    @property
    def meta_memories(self) -> list[BaseMetaEntry]:
        if self.structure is not None and self.structure.meta_memory:
            if self.max_meta_memory_entries:
                return self.structure.meta_memory.entries[: self.max_meta_memory_entries]
            else:
                return self.structure.meta_memory.entries
        else:
            return []

    def __str__(self) -> str:
        return str(self.output.value) if self.output is not None else ""

    def add_parents(self, parents: list[BaseTask]) -> None:
        for parent in parents:
            self.add_parent(parent)

    def add_parent(self, parent: BaseTask) -> BaseTask:
        if parent.id not in self.parent_ids:
            self.parent_ids.append(parent.id)

        if self.id not in parent.child_ids:
            parent.child_ids.append(self.id)

        if self.structure is not None and parent not in self.structure.tasks:
            self.structure.add_task(parent)

        return self

    def add_children(self, children: list[BaseTask]) -> None:
        for child in children:
            self.add_child(child)

    def add_child(self, child: BaseTask) -> BaseTask:
        if child.id not in self.child_ids:
            self.child_ids.append(child.id)

        if self.id not in child.parent_ids:
            child.parent_ids.append(self.id)

        if self.structure is not None and child not in self.structure.tasks:
            self.structure.add_task(child)

        return self

    def preprocess(self, structure: Structure) -> BaseTask:
        self.structure = structure

        return self

    def is_pending(self) -> bool:
        return self.state == BaseTask.State.PENDING

    def is_finished(self) -> bool:
        return self.state == BaseTask.State.FINISHED

    def is_running(self) -> bool:
        return self.state == BaseTask.State.RUNNING

    def is_skipped(self) -> bool:
        return self.state == BaseTask.State.SKIPPED

    def before_run(self) -> None:
        super().before_run()
        if self.structure is not None:
            EventBus.publish_event(
                StartTaskEvent(
                    task_id=self.id,
                    task_parent_ids=self.parent_ids,
                    task_child_ids=self.child_ids,
                    task_input=self.input,
                    task_output=self.output,
                ),
            )

    def run(self) -> BaseArtifact:
        try:
            self.state = BaseTask.State.RUNNING

            self.before_run()

            self.output = self.try_run()

            self.after_run()
        except Exception as e:
            logger.exception("%s %s\n%s", self.__class__.__name__, self.id, e)

            self.output = ErrorArtifact(str(e), exception=e)
        finally:
            self.state = BaseTask.State.FINISHED

        return self.output

    def after_run(self) -> None:
        super().after_run()
        if self.structure is not None:
            EventBus.publish_event(
                FinishTaskEvent(
                    task_id=self.id,
                    task_parent_ids=self.parent_ids,
                    task_child_ids=self.child_ids,
                    task_input=self.input,
                    task_output=self.output,
                ),
            )

    def can_run(self) -> bool:
        # If this Task has been skipped or is not pending, it should not run
        if self.is_skipped() or not self.is_pending():
            return False

        # If this Task has parents, and _all_ of them are skipped, it should not run
        if self.parents and all(parent.is_skipped() for parent in self.parents):
            self.state = BaseTask.State.SKIPPED
            return False

        # If _all_ this Task's unskipped parents are finished, it should run
        unskipped_parents = [parent for parent in self.parents if not parent.is_skipped()]

        return all(parent.is_finished() for parent in unskipped_parents)

    def reset(self) -> BaseTask:
        self.state = BaseTask.State.PENDING
        self.output = None

        return self

    @abstractmethod
    def try_run(self) -> BaseArtifact: ...

    @property
    def full_context(self) -> dict[str, Any]:
        # Need to deep copy so that the serialized context doesn't contain non-serializable data
        context = deepcopy(self.context)
        if self.structure is not None:
            context.update(self.structure.context(self))

        return context

child_ids: list[str] = field(factory=list, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

children: list[BaseTask] property

context: dict[str, Any] = field(factory=dict, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

full_context: dict[str, Any] property

id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

input: BaseArtifact abstractmethod property

max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

meta_memories: list[BaseMetaEntry] property

output: Optional[BaseArtifact] = field(default=None, init=False) class-attribute instance-attribute

parent_ids: list[str] = field(factory=list, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

parent_outputs: dict[str, BaseArtifact] property

parents: list[BaseTask] property

parents_output_text: str property

state: State = field(default=State.PENDING, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

structure: Optional[Structure] = field(default=None, kw_only=True) class-attribute instance-attribute

State

Bases: Enum

Source code in griptape/tasks/base_task.py
class State(Enum):
    PENDING = 1
    RUNNING = 2
    FINISHED = 3
    SKIPPED = 4
FINISHED = 3 class-attribute instance-attribute
PENDING = 1 class-attribute instance-attribute
RUNNING = 2 class-attribute instance-attribute
SKIPPED = 4 class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tasks/base_task.py
def __attrs_post_init__(self) -> None:
    if self.structure is not None:
        self.structure.add_task(self)

__lshift__(other)

Source code in griptape/tasks/base_task.py
def __lshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]:
    if isinstance(other, list):
        self.add_parents(other)
    else:
        self.add_parent(other)

    return other

__rshift__(other)

Source code in griptape/tasks/base_task.py
def __rshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]:
    if isinstance(other, list):
        self.add_children(other)
    else:
        self.add_child(other)

    return other

__str__()

Source code in griptape/tasks/base_task.py
def __str__(self) -> str:
    return str(self.output.value) if self.output is not None else ""

add_child(child)

Source code in griptape/tasks/base_task.py
def add_child(self, child: BaseTask) -> BaseTask:
    if child.id not in self.child_ids:
        self.child_ids.append(child.id)

    if self.id not in child.parent_ids:
        child.parent_ids.append(self.id)

    if self.structure is not None and child not in self.structure.tasks:
        self.structure.add_task(child)

    return self

add_children(children)

Source code in griptape/tasks/base_task.py
def add_children(self, children: list[BaseTask]) -> None:
    for child in children:
        self.add_child(child)

add_parent(parent)

Source code in griptape/tasks/base_task.py
def add_parent(self, parent: BaseTask) -> BaseTask:
    if parent.id not in self.parent_ids:
        self.parent_ids.append(parent.id)

    if self.id not in parent.child_ids:
        parent.child_ids.append(self.id)

    if self.structure is not None and parent not in self.structure.tasks:
        self.structure.add_task(parent)

    return self

add_parents(parents)

Source code in griptape/tasks/base_task.py
def add_parents(self, parents: list[BaseTask]) -> None:
    for parent in parents:
        self.add_parent(parent)

after_run()

Source code in griptape/tasks/base_task.py
def after_run(self) -> None:
    super().after_run()
    if self.structure is not None:
        EventBus.publish_event(
            FinishTaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
            ),
        )

before_run()

Source code in griptape/tasks/base_task.py
def before_run(self) -> None:
    super().before_run()
    if self.structure is not None:
        EventBus.publish_event(
            StartTaskEvent(
                task_id=self.id,
                task_parent_ids=self.parent_ids,
                task_child_ids=self.child_ids,
                task_input=self.input,
                task_output=self.output,
            ),
        )

can_run()

Source code in griptape/tasks/base_task.py
def can_run(self) -> bool:
    # If this Task has been skipped or is not pending, it should not run
    if self.is_skipped() or not self.is_pending():
        return False

    # If this Task has parents, and _all_ of them are skipped, it should not run
    if self.parents and all(parent.is_skipped() for parent in self.parents):
        self.state = BaseTask.State.SKIPPED
        return False

    # If _all_ this Task's unskipped parents are finished, it should run
    unskipped_parents = [parent for parent in self.parents if not parent.is_skipped()]

    return all(parent.is_finished() for parent in unskipped_parents)

is_finished()

Source code in griptape/tasks/base_task.py
def is_finished(self) -> bool:
    return self.state == BaseTask.State.FINISHED

is_pending()

Source code in griptape/tasks/base_task.py
def is_pending(self) -> bool:
    return self.state == BaseTask.State.PENDING

is_running()

Source code in griptape/tasks/base_task.py
def is_running(self) -> bool:
    return self.state == BaseTask.State.RUNNING

is_skipped()

Source code in griptape/tasks/base_task.py
def is_skipped(self) -> bool:
    return self.state == BaseTask.State.SKIPPED

preprocess(structure)

Source code in griptape/tasks/base_task.py
def preprocess(self, structure: Structure) -> BaseTask:
    self.structure = structure

    return self

reset()

Source code in griptape/tasks/base_task.py
def reset(self) -> BaseTask:
    self.state = BaseTask.State.PENDING
    self.output = None

    return self

run()

Source code in griptape/tasks/base_task.py
def run(self) -> BaseArtifact:
    try:
        self.state = BaseTask.State.RUNNING

        self.before_run()

        self.output = self.try_run()

        self.after_run()
    except Exception as e:
        logger.exception("%s %s\n%s", self.__class__.__name__, self.id, e)

        self.output = ErrorArtifact(str(e), exception=e)
    finally:
        self.state = BaseTask.State.FINISHED

    return self.output

try_run() abstractmethod

Source code in griptape/tasks/base_task.py
@abstractmethod
def try_run(self) -> BaseArtifact: ...

BaseTextInputTask

Bases: RuleMixin, BaseTask, ABC

Source code in griptape/tasks/base_text_input_task.py
@define
class BaseTextInputTask(RuleMixin, BaseTask, ABC):
    DEFAULT_INPUT_TEMPLATE = "{{ args[0] }}"

    _input: Union[str, TextArtifact, Callable[[BaseTask], TextArtifact]] = field(
        default=DEFAULT_INPUT_TEMPLATE,
        alias="input",
    )

    @property
    def input(self) -> TextArtifact:
        if isinstance(self._input, TextArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            return TextArtifact(J2().render_from_string(self._input, **self.full_context))

    @input.setter
    def input(self, value: str | TextArtifact | Callable[[BaseTask], TextArtifact]) -> None:
        self._input = value

    def before_run(self) -> None:
        super().before_run()

        logger.info("%s %s\nInput: %s", self.__class__.__name__, self.id, self.input.to_text())

    def after_run(self) -> None:
        super().after_run()

        logger.info(
            "%s %s\nOutput: %s",
            self.__class__.__name__,
            self.id,
            self.output.to_text() if self.output is not None else "",
        )

DEFAULT_INPUT_TEMPLATE = '{{ args[0] }}' class-attribute instance-attribute

input: TextArtifact property writable

after_run()

Source code in griptape/tasks/base_text_input_task.py
def after_run(self) -> None:
    super().after_run()

    logger.info(
        "%s %s\nOutput: %s",
        self.__class__.__name__,
        self.id,
        self.output.to_text() if self.output is not None else "",
    )

before_run()

Source code in griptape/tasks/base_text_input_task.py
def before_run(self) -> None:
    super().before_run()

    logger.info("%s %s\nInput: %s", self.__class__.__name__, self.id, self.input.to_text())

BranchTask

Bases: CodeExecutionTask

Source code in griptape/tasks/branch_task.py
@define
class BranchTask(CodeExecutionTask):
    on_run: Callable[[BranchTask], Union[InfoArtifact, ListArtifact[InfoArtifact]]] = field(kw_only=True)

    def try_run(self) -> InfoArtifact | ListArtifact[InfoArtifact]:
        result = self.on_run(self)

        if isinstance(result, ListArtifact):
            branch_task_ids = {artifact.value for artifact in result}
        else:
            branch_task_ids = {result.value}

        if not all(branch_task_id in self.child_ids for branch_task_id in branch_task_ids):
            raise ValueError(f"Branch task returned invalid child task id {branch_task_ids}")

        if self.structure is not None:
            children_to_skip = [child for child in self.children if child.id not in branch_task_ids]
            for child in children_to_skip:
                child.state = BaseTask.State.SKIPPED

        return result

on_run: Callable[[BranchTask], Union[InfoArtifact, ListArtifact[InfoArtifact]]] = field(kw_only=True) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/branch_task.py
def try_run(self) -> InfoArtifact | ListArtifact[InfoArtifact]:
    result = self.on_run(self)

    if isinstance(result, ListArtifact):
        branch_task_ids = {artifact.value for artifact in result}
    else:
        branch_task_ids = {result.value}

    if not all(branch_task_id in self.child_ids for branch_task_id in branch_task_ids):
        raise ValueError(f"Branch task returned invalid child task id {branch_task_ids}")

    if self.structure is not None:
        children_to_skip = [child for child in self.children if child.id not in branch_task_ids]
        for child in children_to_skip:
            child.state = BaseTask.State.SKIPPED

    return result

CodeExecutionTask

Bases: BaseTextInputTask

Source code in griptape/tasks/code_execution_task.py
@define
class CodeExecutionTask(BaseTextInputTask):
    on_run: Callable[[CodeExecutionTask], BaseArtifact] = field(kw_only=True)

    def try_run(self) -> BaseArtifact:
        return self.on_run(self)

on_run: Callable[[CodeExecutionTask], BaseArtifact] = field(kw_only=True) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/code_execution_task.py
def try_run(self) -> BaseArtifact:
    return self.on_run(self)

ExtractionTask

Bases: BaseTextInputTask

Source code in griptape/tasks/extraction_task.py
@define
class ExtractionTask(BaseTextInputTask):
    extraction_engine: BaseExtractionEngine = field(kw_only=True)
    args: dict = field(kw_only=True, factory=dict)

    def try_run(self) -> ListArtifact | ErrorArtifact:
        return self.extraction_engine.extract_artifacts(ListArtifact([self.input]), rulesets=self.rulesets, **self.args)

args: dict = field(kw_only=True, factory=dict) class-attribute instance-attribute

extraction_engine: BaseExtractionEngine = field(kw_only=True) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/extraction_task.py
def try_run(self) -> ListArtifact | ErrorArtifact:
    return self.extraction_engine.extract_artifacts(ListArtifact([self.input]), rulesets=self.rulesets, **self.args)

InpaintingImageGenerationTask

Bases: BaseImageGenerationTask

A task that modifies a select region within an image using a mask.

Accepts a text prompt, image, and mask as input in one of the following formats: - tuple of (template string, ImageArtifact, ImageArtifact) - tuple of (TextArtifact, ImageArtifact, ImageArtifact) - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact).

Attributes:

Name Type Description
image_generation_driver

The driver used to generate the image.

negative_rulesets

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir

If provided, the generated image will be written to disk in output_dir.

output_file

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/inpainting_image_generation_task.py
@define
class InpaintingImageGenerationTask(BaseImageGenerationTask):
    """A task that modifies a select region within an image using a mask.

    Accepts a text prompt, image, and mask as
    input in one of the following formats:
    - tuple of (template string, ImageArtifact, ImageArtifact)
    - tuple of (TextArtifact, ImageArtifact, ImageArtifact)
    - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact).

    Attributes:
        image_generation_driver: The driver used to generate the image.
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    _input: Union[
        tuple[Union[str, TextArtifact], ImageArtifact, ImageArtifact], Callable[[BaseTask], ListArtifact], ListArtifact
    ] = field(default=None, alias="input")

    @property
    def input(self) -> ListArtifact:
        if isinstance(self._input, ListArtifact):
            return self._input
        elif isinstance(self._input, tuple):
            if isinstance(self._input[0], TextArtifact):
                input_text = self._input[0]
            else:
                input_text = TextArtifact(J2().render_from_string(self._input[0], **self.full_context))

            return ListArtifact([input_text, self._input[1], self._input[2]])
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            raise ValueError("Input must be a tuple of (text, image, mask) or a callable that returns such a tuple.")

    @input.setter
    def input(
        self,
        value: tuple[str | TextArtifact, ImageArtifact, ImageArtifact] | Callable[[BaseTask], ListArtifact],
    ) -> None:
        self._input = value

    def try_run(self) -> ImageArtifact:
        prompt_artifact = self.input[0]

        image_artifact = self.input[1]
        if not isinstance(image_artifact, ImageArtifact):
            raise ValueError("Image must be an ImageArtifact.")

        mask_artifact = self.input[2]
        if not isinstance(mask_artifact, ImageArtifact):
            raise ValueError("Mask must be an ImageArtifact.")

        output_image_artifact = self.image_generation_driver.run_image_inpainting(
            prompts=self._get_prompts(prompt_artifact.to_text()),
            negative_prompts=self._get_negative_prompts(),
            image=image_artifact,
            mask=mask_artifact,
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_image_artifact)

        return output_image_artifact

input: ListArtifact property writable

try_run()

Source code in griptape/tasks/inpainting_image_generation_task.py
def try_run(self) -> ImageArtifact:
    prompt_artifact = self.input[0]

    image_artifact = self.input[1]
    if not isinstance(image_artifact, ImageArtifact):
        raise ValueError("Image must be an ImageArtifact.")

    mask_artifact = self.input[2]
    if not isinstance(mask_artifact, ImageArtifact):
        raise ValueError("Mask must be an ImageArtifact.")

    output_image_artifact = self.image_generation_driver.run_image_inpainting(
        prompts=self._get_prompts(prompt_artifact.to_text()),
        negative_prompts=self._get_negative_prompts(),
        image=image_artifact,
        mask=mask_artifact,
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_image_artifact)

    return output_image_artifact

OutpaintingImageGenerationTask

Bases: BaseImageGenerationTask

A task that modifies an image outside the bounds of a mask.

Accepts a text prompt, image, and mask as input in one of the following formats: - tuple of (template string, ImageArtifact, ImageArtifact) - tuple of (TextArtifact, ImageArtifact, ImageArtifact) - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact).

Attributes:

Name Type Description
image_generation_driver

The engine used to generate the image.

negative_rulesets

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir

If provided, the generated image will be written to disk in output_dir.

output_file

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/outpainting_image_generation_task.py
@define
class OutpaintingImageGenerationTask(BaseImageGenerationTask):
    """A task that modifies an image outside the bounds of a mask.

    Accepts a text prompt, image, and mask as
    input in one of the following formats:
    - tuple of (template string, ImageArtifact, ImageArtifact)
    - tuple of (TextArtifact, ImageArtifact, ImageArtifact)
    - Callable that returns a tuple of (TextArtifact, ImageArtifact, ImageArtifact).

    Attributes:
        image_generation_driver: The engine used to generate the image.
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    _input: Union[
        tuple[Union[str, TextArtifact], ImageArtifact, ImageArtifact], Callable[[BaseTask], ListArtifact], ListArtifact
    ] = field(default=None, alias="input")

    @property
    def input(self) -> ListArtifact:
        if isinstance(self._input, ListArtifact):
            return self._input
        elif isinstance(self._input, tuple):
            if isinstance(self._input[0], TextArtifact):
                input_text = self._input[0]
            else:
                input_text = TextArtifact(J2().render_from_string(self._input[0], **self.full_context))

            return ListArtifact([input_text, self._input[1], self._input[2]])
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            raise ValueError("Input must be a tuple of (text, image, mask) or a callable that returns such a tuple.")

    @input.setter
    def input(
        self,
        value: tuple[str | TextArtifact, ImageArtifact, ImageArtifact] | Callable[[BaseTask], ListArtifact],
    ) -> None:
        self._input = value

    def try_run(self) -> ImageArtifact:
        prompt_artifact = self.input[0]

        image_artifact = self.input[1]
        if not isinstance(image_artifact, ImageArtifact):
            raise ValueError("Image must be an ImageArtifact.")

        mask_artifact = self.input[2]
        if not isinstance(mask_artifact, ImageArtifact):
            raise ValueError("Mask must be an ImageArtifact.")

        output_image_artifact = self.image_generation_driver.run_image_outpainting(
            prompts=self._get_prompts(prompt_artifact.to_text()),
            negative_prompts=self._get_negative_prompts(),
            image=image_artifact,
            mask=mask_artifact,
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_image_artifact)

        return output_image_artifact

input: ListArtifact property writable

try_run()

Source code in griptape/tasks/outpainting_image_generation_task.py
def try_run(self) -> ImageArtifact:
    prompt_artifact = self.input[0]

    image_artifact = self.input[1]
    if not isinstance(image_artifact, ImageArtifact):
        raise ValueError("Image must be an ImageArtifact.")

    mask_artifact = self.input[2]
    if not isinstance(mask_artifact, ImageArtifact):
        raise ValueError("Mask must be an ImageArtifact.")

    output_image_artifact = self.image_generation_driver.run_image_outpainting(
        prompts=self._get_prompts(prompt_artifact.to_text()),
        negative_prompts=self._get_negative_prompts(),
        image=image_artifact,
        mask=mask_artifact,
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_image_artifact)

    return output_image_artifact

PromptImageGenerationTask

Bases: BaseImageGenerationTask

Used to generate an image from a text prompt.

Accepts prompt as input in one of the following formats: - template string - TextArtifact - Callable that returns a TextArtifact.

Attributes:

Name Type Description
image_generation_driver

The engine used to generate the image.

negative_rulesets

List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.

negative_rules

List of negatively-weighted rules applied to the text prompt, if supported by the driver.

output_dir

If provided, the generated image will be written to disk in output_dir.

output_file

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tasks/prompt_image_generation_task.py
@define
class PromptImageGenerationTask(BaseImageGenerationTask):
    """Used to generate an image from a text prompt.

    Accepts prompt as input in one of the following formats:
    - template string
    - TextArtifact
    - Callable that returns a TextArtifact.

    Attributes:
        image_generation_driver: The engine used to generate the image.
        negative_rulesets: List of negatively-weighted rulesets applied to the text prompt, if supported by the driver.
        negative_rules: List of negatively-weighted rules applied to the text prompt, if supported by the driver.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    DEFAULT_INPUT_TEMPLATE = "{{ args[0] }}"

    _input: Union[str, TextArtifact, Callable[[BaseTask], TextArtifact]] = field(
        default=DEFAULT_INPUT_TEMPLATE, alias="input"
    )

    @property
    def input(self) -> TextArtifact:
        if isinstance(self._input, TextArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            return TextArtifact(J2().render_from_string(self._input, **self.full_context))

    @input.setter
    def input(self, value: TextArtifact) -> None:
        self._input = value

    def try_run(self) -> ImageArtifact:
        image_artifact = self.image_generation_driver.run_text_to_image(
            prompts=self._get_prompts(self.input.to_text()),
            negative_prompts=self._get_negative_prompts(),
        )

        if self.output_dir or self.output_file:
            self._write_to_file(image_artifact)

        return image_artifact

DEFAULT_INPUT_TEMPLATE = '{{ args[0] }}' class-attribute instance-attribute

input: TextArtifact property writable

try_run()

Source code in griptape/tasks/prompt_image_generation_task.py
def try_run(self) -> ImageArtifact:
    image_artifact = self.image_generation_driver.run_text_to_image(
        prompts=self._get_prompts(self.input.to_text()),
        negative_prompts=self._get_negative_prompts(),
    )

    if self.output_dir or self.output_file:
        self._write_to_file(image_artifact)

    return image_artifact

PromptTask

Bases: BaseTask, RuleMixin, ActionsSubtaskOriginMixin

Source code in griptape/tasks/prompt_task.py
@define
class PromptTask(BaseTask, RuleMixin, ActionsSubtaskOriginMixin):
    DEFAULT_MAX_STEPS = 20
    # Stop sequence for chain-of-thought in the framework. Using this "token-like" string to make it more unique,
    # so that it doesn't trigger on accident.
    RESPONSE_STOP_SEQUENCE = "<|Response|>"
    prompt_driver: BasePromptDriver = field(
        default=Factory(lambda: Defaults.drivers_config.prompt_driver), kw_only=True, metadata={"serializable": True}
    )
    output_schema: Optional[Schema] = field(default=None, kw_only=True)
    generate_system_template: Callable[[PromptTask], str] = field(
        default=Factory(lambda self: self.default_generate_system_template, takes_self=True),
        kw_only=True,
    )
    _conversation_memory: Union[Optional[BaseConversationMemory], NothingType] = field(
        default=Factory(lambda: NOTHING), kw_only=True, alias="conversation_memory"
    )
    _input: Union[str, list, tuple, BaseArtifact, Callable[[BaseTask], BaseArtifact]] = field(
        default=lambda task: task.full_context["args"][0] if task.full_context["args"] else TextArtifact(value=""),
        alias="input",
    )
    tools: list[BaseTool] = field(factory=list, kw_only=True, metadata={"serializable": True})
    max_subtasks: int = field(default=DEFAULT_MAX_STEPS, kw_only=True, metadata={"serializable": True})
    task_memory: Optional[TaskMemory] = field(default=None, kw_only=True)
    subtasks: list[ActionsSubtask] = field(factory=list)
    generate_assistant_subtask_template: Callable[[ActionsSubtask], str] = field(
        default=Factory(lambda self: self.default_generate_assistant_subtask_template, takes_self=True),
        kw_only=True,
    )
    generate_user_subtask_template: Callable[[ActionsSubtask], str] = field(
        default=Factory(lambda self: self.default_generate_user_subtask_template, takes_self=True),
        kw_only=True,
    )
    response_stop_sequence: str = field(default=RESPONSE_STOP_SEQUENCE, kw_only=True)

    @property
    def rulesets(self) -> list:
        default_rules = self.rules
        rulesets = self._rulesets.copy()

        if self.structure is not None:
            if self.structure._rulesets:
                rulesets = self.structure._rulesets + self._rulesets
            if self.structure.rules:
                default_rules = self.structure.rules + self.rules

        if default_rules:
            rulesets.append(Ruleset(name=self.DEFAULT_RULESET_NAME, rules=default_rules))

        return rulesets

    @property
    def input(self) -> BaseArtifact:
        return self._process_task_input(self._input)

    @input.setter
    def input(self, value: str | list | tuple | BaseArtifact | Callable[[BaseTask], BaseArtifact]) -> None:
        self._input = value

    @property
    def conversation_memory(self) -> Optional[BaseConversationMemory]:
        if self._conversation_memory is NOTHING:
            if self.structure is None:
                return None
            else:
                return self.structure.conversation_memory
        else:
            return self._conversation_memory

    @conversation_memory.setter
    def conversation_memory(self, value: Optional[BaseConversationMemory]) -> None:
        self._conversation_memory = value

    @property
    def prompt_stack(self) -> PromptStack:
        stack = PromptStack(tools=self.tools, output_schema=self.output_schema)
        memory = self.conversation_memory

        system_template = self.generate_system_template(self)
        if system_template:
            stack.add_system_message(system_template)

        stack.add_user_message(self.input)

        if self.output:
            stack.add_assistant_message(self.output.to_text())
        else:
            self._add_subtasks_to_prompt_stack(stack)

        if memory is not None:
            # inserting at index 1 to place memory right after system prompt
            memory.add_to_prompt_stack(self.prompt_driver, stack, 1 if system_template else 0)

        return stack

    @property
    def tool_output_memory(self) -> list[TaskMemory]:
        unique_memory_dict = {}

        for memories in [tool.output_memory for tool in self.tools if tool.output_memory]:
            for memory_list in memories.values():
                for memory in memory_list:
                    if memory.name not in unique_memory_dict:
                        unique_memory_dict[memory.name] = memory

        return list(unique_memory_dict.values())

    @tools.validator  # pyright: ignore[reportAttributeAccessIssue]
    def validate_tools(self, _: Attribute, tools: list[BaseTool]) -> None:
        tool_names = [t.name for t in tools]

        if len(tool_names) > len(set(tool_names)):
            raise ValueError("tools names have to be unique in task")

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()
        if self.task_memory:
            self.set_default_tools_memory(self.task_memory)

    output: Optional[BaseArtifact] = field(default=None, init=False)

    def before_run(self) -> None:
        super().before_run()

        logger.info("%s %s\nInput: %s", self.__class__.__name__, self.id, self.input.to_text())

    def after_run(self) -> None:
        super().after_run()

        logger.info(
            "%s %s\nOutput: %s",
            self.__class__.__name__,
            self.id,
            self.output.to_text() if self.output is not None else "",
        )
        conversation_memory = self.conversation_memory
        if (
            (self.structure is None or self.structure.conversation_memory_strategy == "per_task")
            and conversation_memory is not None
            and self.output is not None
        ):
            run = Run(input=self.input, output=self.output)

            conversation_memory.add_run(run)

    def try_run(self) -> BaseArtifact:
        from griptape.tasks import ActionsSubtask

        self.subtasks.clear()

        if self.response_stop_sequence not in self.prompt_driver.tokenizer.stop_sequences:
            self.prompt_driver.tokenizer.stop_sequences.extend([self.response_stop_sequence])

        result = self.prompt_driver.run(self.prompt_stack)
        if self.tools:
            subtask = self.add_subtask(ActionsSubtask(result.to_artifact()))

            while True:
                if subtask.output is None:
                    if len(self.subtasks) >= self.max_subtasks:
                        subtask.output = ErrorArtifact(f"Exceeded tool limit of {self.max_subtasks} subtasks per task")
                    else:
                        subtask.run()

                        result = self.prompt_driver.run(self.prompt_stack)
                        subtask = self.add_subtask(ActionsSubtask(result.to_artifact()))
                else:
                    break

            output = subtask.output
        else:
            output = result.to_artifact()

        if self.output_schema is not None and self.prompt_driver.structured_output_strategy in ("native", "rule"):
            return JsonArtifact(output.value)
        else:
            return output

    def preprocess(self, structure: Structure) -> BaseTask:
        super().preprocess(structure)

        if self.task_memory is None and structure.task_memory:
            self.set_default_tools_memory(structure.task_memory)

        return self

    def default_generate_system_template(self, _: PromptTask) -> str:
        schema = self.actions_schema().json_schema("Actions Schema")
        schema["minItems"] = 1  # The `schema` library doesn't support `minItems` so we must add it manually.

        return J2("tasks/prompt_task/system.j2").render(
            rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.rulesets),
            action_names=str.join(", ", [tool.name for tool in self.tools]),
            actions_schema=utils.minify_json(json.dumps(schema)),
            meta_memory=J2("memory/meta/meta_memory.j2").render(meta_memories=self.meta_memories),
            use_native_tools=self.prompt_driver.use_native_tools,
            stop_sequence=self.response_stop_sequence,
        )

    def default_generate_assistant_subtask_template(self, subtask: ActionsSubtask) -> str:
        return J2("tasks/prompt_task/assistant_subtask.j2").render(
            stop_sequence=self.response_stop_sequence,
            subtask=subtask,
        )

    def default_generate_user_subtask_template(self, subtask: ActionsSubtask) -> str:
        return J2("tasks/prompt_task/user_subtask.j2").render(
            stop_sequence=self.response_stop_sequence,
            subtask=subtask,
        )

    def actions_schema(self) -> Schema:
        return self._actions_schema_for_tools(self.tools)

    def set_default_tools_memory(self, memory: TaskMemory) -> None:
        self.task_memory = memory

        for tool in self.tools:
            if self.task_memory:
                if tool.input_memory is None:
                    tool.input_memory = [self.task_memory]
                if tool.output_memory is None and tool.off_prompt:
                    tool.output_memory = {getattr(a, "name"): [self.task_memory] for a in tool.activities()}

    def find_subtask(self, subtask_id: str) -> ActionsSubtask:
        for subtask in self.subtasks:
            if subtask.id == subtask_id:
                return subtask
        raise ValueError(f"Subtask with id {subtask_id} not found.")

    def add_subtask(self, subtask: ActionsSubtask) -> ActionsSubtask:
        subtask.attach_to(self)
        subtask.structure = self.structure

        if len(self.subtasks) > 0:
            self.subtasks[-1].add_child(subtask)
            subtask.add_parent(self.subtasks[-1])

        self.subtasks.append(subtask)

        return subtask

    def find_tool(self, tool_name: str) -> BaseTool:
        for tool in self.tools:
            if tool.name == tool_name:
                return tool
        raise ValueError(f"Tool with name {tool_name} not found.")

    def find_memory(self, memory_name: str) -> TaskMemory:
        for memory in self.tool_output_memory:
            if memory.name == memory_name:
                return memory
        raise ValueError(f"Memory with name {memory_name} not found.")

    def _process_task_input(
        self,
        task_input: str | tuple | list | BaseArtifact | Callable[[BaseTask], BaseArtifact],
    ) -> BaseArtifact:
        if isinstance(task_input, TextArtifact):
            task_input.value = J2().render_from_string(task_input.value, **self.full_context)

            return task_input
        elif isinstance(task_input, Callable):
            return self._process_task_input(task_input(self))
        elif isinstance(task_input, ListArtifact):
            return ListArtifact([self._process_task_input(elem) for elem in task_input.value])
        elif isinstance(task_input, BaseArtifact):
            return task_input
        elif isinstance(task_input, (list, tuple)):
            return ListArtifact([self._process_task_input(elem) for elem in task_input])
        else:
            return self._process_task_input(TextArtifact(task_input))

    def _add_subtasks_to_prompt_stack(self, stack: PromptStack) -> None:
        for s in self.subtasks:
            if self.prompt_driver.use_native_tools:
                action_calls = [
                    ToolAction(name=action.name, path=action.path, tag=action.tag, input=action.input)
                    for action in s.actions
                ]
                action_results = [
                    ToolAction(
                        name=action.name,
                        path=action.path,
                        tag=action.tag,
                        output=action.output if action.output is not None else s.output,
                    )
                    for action in s.actions
                ]

                stack.add_assistant_message(
                    ListArtifact(
                        [
                            *([TextArtifact(s.thought)] if s.thought else []),
                            *[ActionArtifact(a) for a in action_calls],
                        ],
                    ),
                )
                stack.add_user_message(
                    ListArtifact(
                        [
                            *[ActionArtifact(a) for a in action_results],
                            *([] if s.output else [TextArtifact("Please keep going")]),
                        ],
                    ),
                )
            else:
                stack.add_assistant_message(self.generate_assistant_subtask_template(s))
                stack.add_user_message(self.generate_user_subtask_template(s))

DEFAULT_MAX_STEPS = 20 class-attribute instance-attribute

RESPONSE_STOP_SEQUENCE = '<|Response|>' class-attribute instance-attribute

conversation_memory: Optional[BaseConversationMemory] property writable

generate_assistant_subtask_template: Callable[[ActionsSubtask], str] = field(default=Factory(lambda self: self.default_generate_assistant_subtask_template, takes_self=True), kw_only=True) class-attribute instance-attribute

generate_system_template: Callable[[PromptTask], str] = field(default=Factory(lambda self: self.default_generate_system_template, takes_self=True), kw_only=True) class-attribute instance-attribute

generate_user_subtask_template: Callable[[ActionsSubtask], str] = field(default=Factory(lambda self: self.default_generate_user_subtask_template, takes_self=True), kw_only=True) class-attribute instance-attribute

input: BaseArtifact property writable

max_subtasks: int = field(default=DEFAULT_MAX_STEPS, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

output: Optional[BaseArtifact] = field(default=None, init=False) class-attribute instance-attribute

output_schema: Optional[Schema] = field(default=None, kw_only=True) class-attribute instance-attribute

prompt_driver: BasePromptDriver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

prompt_stack: PromptStack property

response_stop_sequence: str = field(default=RESPONSE_STOP_SEQUENCE, kw_only=True) class-attribute instance-attribute

rulesets: list property

subtasks: list[ActionsSubtask] = field(factory=list) class-attribute instance-attribute

task_memory: Optional[TaskMemory] = field(default=None, kw_only=True) class-attribute instance-attribute

tool_output_memory: list[TaskMemory] property

tools: list[BaseTool] = field(factory=list, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tasks/prompt_task.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()
    if self.task_memory:
        self.set_default_tools_memory(self.task_memory)

actions_schema()

Source code in griptape/tasks/prompt_task.py
def actions_schema(self) -> Schema:
    return self._actions_schema_for_tools(self.tools)

add_subtask(subtask)

Source code in griptape/tasks/prompt_task.py
def add_subtask(self, subtask: ActionsSubtask) -> ActionsSubtask:
    subtask.attach_to(self)
    subtask.structure = self.structure

    if len(self.subtasks) > 0:
        self.subtasks[-1].add_child(subtask)
        subtask.add_parent(self.subtasks[-1])

    self.subtasks.append(subtask)

    return subtask

after_run()

Source code in griptape/tasks/prompt_task.py
def after_run(self) -> None:
    super().after_run()

    logger.info(
        "%s %s\nOutput: %s",
        self.__class__.__name__,
        self.id,
        self.output.to_text() if self.output is not None else "",
    )
    conversation_memory = self.conversation_memory
    if (
        (self.structure is None or self.structure.conversation_memory_strategy == "per_task")
        and conversation_memory is not None
        and self.output is not None
    ):
        run = Run(input=self.input, output=self.output)

        conversation_memory.add_run(run)

before_run()

Source code in griptape/tasks/prompt_task.py
def before_run(self) -> None:
    super().before_run()

    logger.info("%s %s\nInput: %s", self.__class__.__name__, self.id, self.input.to_text())

default_generate_assistant_subtask_template(subtask)

Source code in griptape/tasks/prompt_task.py
def default_generate_assistant_subtask_template(self, subtask: ActionsSubtask) -> str:
    return J2("tasks/prompt_task/assistant_subtask.j2").render(
        stop_sequence=self.response_stop_sequence,
        subtask=subtask,
    )

default_generate_system_template(_)

Source code in griptape/tasks/prompt_task.py
def default_generate_system_template(self, _: PromptTask) -> str:
    schema = self.actions_schema().json_schema("Actions Schema")
    schema["minItems"] = 1  # The `schema` library doesn't support `minItems` so we must add it manually.

    return J2("tasks/prompt_task/system.j2").render(
        rulesets=J2("rulesets/rulesets.j2").render(rulesets=self.rulesets),
        action_names=str.join(", ", [tool.name for tool in self.tools]),
        actions_schema=utils.minify_json(json.dumps(schema)),
        meta_memory=J2("memory/meta/meta_memory.j2").render(meta_memories=self.meta_memories),
        use_native_tools=self.prompt_driver.use_native_tools,
        stop_sequence=self.response_stop_sequence,
    )

default_generate_user_subtask_template(subtask)

Source code in griptape/tasks/prompt_task.py
def default_generate_user_subtask_template(self, subtask: ActionsSubtask) -> str:
    return J2("tasks/prompt_task/user_subtask.j2").render(
        stop_sequence=self.response_stop_sequence,
        subtask=subtask,
    )

find_memory(memory_name)

Source code in griptape/tasks/prompt_task.py
def find_memory(self, memory_name: str) -> TaskMemory:
    for memory in self.tool_output_memory:
        if memory.name == memory_name:
            return memory
    raise ValueError(f"Memory with name {memory_name} not found.")

find_subtask(subtask_id)

Source code in griptape/tasks/prompt_task.py
def find_subtask(self, subtask_id: str) -> ActionsSubtask:
    for subtask in self.subtasks:
        if subtask.id == subtask_id:
            return subtask
    raise ValueError(f"Subtask with id {subtask_id} not found.")

find_tool(tool_name)

Source code in griptape/tasks/prompt_task.py
def find_tool(self, tool_name: str) -> BaseTool:
    for tool in self.tools:
        if tool.name == tool_name:
            return tool
    raise ValueError(f"Tool with name {tool_name} not found.")

preprocess(structure)

Source code in griptape/tasks/prompt_task.py
def preprocess(self, structure: Structure) -> BaseTask:
    super().preprocess(structure)

    if self.task_memory is None and structure.task_memory:
        self.set_default_tools_memory(structure.task_memory)

    return self

set_default_tools_memory(memory)

Source code in griptape/tasks/prompt_task.py
def set_default_tools_memory(self, memory: TaskMemory) -> None:
    self.task_memory = memory

    for tool in self.tools:
        if self.task_memory:
            if tool.input_memory is None:
                tool.input_memory = [self.task_memory]
            if tool.output_memory is None and tool.off_prompt:
                tool.output_memory = {getattr(a, "name"): [self.task_memory] for a in tool.activities()}

try_run()

Source code in griptape/tasks/prompt_task.py
def try_run(self) -> BaseArtifact:
    from griptape.tasks import ActionsSubtask

    self.subtasks.clear()

    if self.response_stop_sequence not in self.prompt_driver.tokenizer.stop_sequences:
        self.prompt_driver.tokenizer.stop_sequences.extend([self.response_stop_sequence])

    result = self.prompt_driver.run(self.prompt_stack)
    if self.tools:
        subtask = self.add_subtask(ActionsSubtask(result.to_artifact()))

        while True:
            if subtask.output is None:
                if len(self.subtasks) >= self.max_subtasks:
                    subtask.output = ErrorArtifact(f"Exceeded tool limit of {self.max_subtasks} subtasks per task")
                else:
                    subtask.run()

                    result = self.prompt_driver.run(self.prompt_stack)
                    subtask = self.add_subtask(ActionsSubtask(result.to_artifact()))
            else:
                break

        output = subtask.output
    else:
        output = result.to_artifact()

    if self.output_schema is not None and self.prompt_driver.structured_output_strategy in ("native", "rule"):
        return JsonArtifact(output.value)
    else:
        return output

validate_tools(_, tools)

Source code in griptape/tasks/prompt_task.py
@tools.validator  # pyright: ignore[reportAttributeAccessIssue]
def validate_tools(self, _: Attribute, tools: list[BaseTool]) -> None:
    tool_names = [t.name for t in tools]

    if len(tool_names) > len(set(tool_names)):
        raise ValueError("tools names have to be unique in task")

RagTask

Bases: BaseTextInputTask

Source code in griptape/tasks/rag_task.py
@define
class RagTask(BaseTextInputTask):
    rag_engine: RagEngine = field(kw_only=True, default=Factory(lambda: RagEngine()))

    def try_run(self) -> BaseArtifact:
        outputs = self.rag_engine.process_query(self.input.to_text()).outputs

        if len(outputs) > 0:
            return ListArtifact(outputs)
        else:
            return ErrorArtifact("empty output")

rag_engine: RagEngine = field(kw_only=True, default=Factory(lambda: RagEngine())) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/rag_task.py
def try_run(self) -> BaseArtifact:
    outputs = self.rag_engine.process_query(self.input.to_text()).outputs

    if len(outputs) > 0:
        return ListArtifact(outputs)
    else:
        return ErrorArtifact("empty output")

StructureRunTask

Bases: PromptTask

Task to run a Structure.

Attributes:

Name Type Description
structure_run_driver BaseStructureRunDriver

Driver to run the Structure.

Source code in griptape/tasks/structure_run_task.py
@define
class StructureRunTask(PromptTask):
    """Task to run a Structure.

    Attributes:
        structure_run_driver: Driver to run the Structure.
    """

    structure_run_driver: BaseStructureRunDriver = field(kw_only=True)

    def try_run(self) -> BaseArtifact:
        if isinstance(self.input, ListArtifact):
            return self.structure_run_driver.run(*self.input.value)
        else:
            return self.structure_run_driver.run(self.input)

structure_run_driver: BaseStructureRunDriver = field(kw_only=True) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/structure_run_task.py
def try_run(self) -> BaseArtifact:
    if isinstance(self.input, ListArtifact):
        return self.structure_run_driver.run(*self.input.value)
    else:
        return self.structure_run_driver.run(self.input)

TextSummaryTask

Bases: BaseTextInputTask

Source code in griptape/tasks/text_summary_task.py
@define
class TextSummaryTask(BaseTextInputTask):
    summary_engine: BaseSummaryEngine = field(default=Factory(lambda: PromptSummaryEngine()), kw_only=True)

    def try_run(self) -> TextArtifact:
        return TextArtifact(self.summary_engine.summarize_text(self.input.to_text(), rulesets=self.rulesets))

summary_engine: BaseSummaryEngine = field(default=Factory(lambda: PromptSummaryEngine()), kw_only=True) class-attribute instance-attribute

try_run()

Source code in griptape/tasks/text_summary_task.py
def try_run(self) -> TextArtifact:
    return TextArtifact(self.summary_engine.summarize_text(self.input.to_text(), rulesets=self.rulesets))

TextToSpeechTask

Bases: BaseAudioGenerationTask

Source code in griptape/tasks/text_to_speech_task.py
@define
class TextToSpeechTask(BaseAudioGenerationTask):
    DEFAULT_INPUT_TEMPLATE = "{{ args[0] }}"

    _input: Union[str, TextArtifact, Callable[[BaseTask], TextArtifact]] = field(default=DEFAULT_INPUT_TEMPLATE)
    text_to_speech_driver: BaseTextToSpeechDriver = field(
        default=Factory(lambda: Defaults.drivers_config.text_to_speech_driver), kw_only=True
    )

    @property
    def input(self) -> TextArtifact:
        if isinstance(self._input, TextArtifact):
            return self._input
        elif isinstance(self._input, Callable):
            return self._input(self)
        else:
            return TextArtifact(J2().render_from_string(self._input, **self.full_context))

    @input.setter
    def input(self, value: TextArtifact) -> None:
        self._input = value

    def try_run(self) -> AudioArtifact:
        audio_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[self.input.to_text()])

        if self.output_dir or self.output_file:
            self._write_to_file(audio_artifact)

        return audio_artifact

DEFAULT_INPUT_TEMPLATE = '{{ args[0] }}' class-attribute instance-attribute