Skip to content

tools

__all__ = ['AudioTranscriptionTool', 'BaseImageGenerationTool', 'BaseTool', 'CalculatorTool', 'ComputerTool', 'DateTimeTool', 'EmailTool', 'ExtractionTool', 'FileManagerTool', 'GriptapeCloudToolTool', 'ImageQueryTool', 'InpaintingImageGenerationTool', 'OutpaintingImageGenerationTool', 'PromptImageGenerationTool', 'PromptSummaryTool', 'QueryTool', 'RagTool', 'RestApiTool', 'SqlTool', 'StructureRunTool', 'StructuredOutputTool', 'TextToSpeechTool', 'VariationImageGenerationTool', 'VectorStoreTool', 'WebScraperTool', 'WebSearchTool'] module-attribute

AudioTranscriptionTool

Bases: BaseTool

A tool that can be used to generate transcriptions from input audio.

Source code in griptape/tools/audio_transcription/tool.py
@define
class AudioTranscriptionTool(BaseTool):
    """A tool that can be used to generate transcriptions from input audio."""

    audio_transcription_driver: BaseAudioTranscriptionDriver = field(kw_only=True)
    audio_loader: AudioLoader = field(default=Factory(lambda: AudioLoader()), kw_only=True)

    @activity(
        config={
            "description": "This tool can be used to generate transcriptions of audio files on disk.",
            "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}),
        },
    )
    def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        audio_path = params["values"]["path"]

        audio_artifact = self.audio_loader.load(audio_path)

        return self.audio_transcription_driver.run(audio_artifact)

    @activity(
        config={
            "description": "This tool can be used to generate the transcription of an audio artifact in memory.",
            "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}),
        },
    )
    def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        artifact_name = params["values"]["artifact_name"]

        if memory is None:
            return ErrorArtifact("memory not found")

        audio_artifact = cast(
            "AudioArtifact",
            load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact),
        )

        return self.audio_transcription_driver.run(audio_artifact)

audio_loader = field(default=Factory(lambda: AudioLoader()), kw_only=True) class-attribute instance-attribute

audio_transcription_driver = field(kw_only=True) class-attribute instance-attribute

transcribe_audio_from_disk(params)

Source code in griptape/tools/audio_transcription/tool.py
@activity(
    config={
        "description": "This tool can be used to generate transcriptions of audio files on disk.",
        "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}),
    },
)
def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    audio_path = params["values"]["path"]

    audio_artifact = self.audio_loader.load(audio_path)

    return self.audio_transcription_driver.run(audio_artifact)

transcribe_audio_from_memory(params)

Source code in griptape/tools/audio_transcription/tool.py
@activity(
    config={
        "description": "This tool can be used to generate the transcription of an audio artifact in memory.",
        "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}),
    },
)
def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    artifact_name = params["values"]["artifact_name"]

    if memory is None:
        return ErrorArtifact("memory not found")

    audio_artifact = cast(
        "AudioArtifact",
        load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact),
    )

    return self.audio_transcription_driver.run(audio_artifact)

BaseImageGenerationTool

Bases: ArtifactFileOutputMixin, BaseTool

A base class for tools that generate images from text prompts.

Source code in griptape/tools/base_image_generation_tool.py
@define
class BaseImageGenerationTool(ArtifactFileOutputMixin, BaseTool):
    """A base class for tools that generate images from text prompts."""

    PROMPT_DESCRIPTION = "Features and qualities to include in the generated image, descriptive and succinct."
    NEGATIVE_PROMPT_DESCRIPTION = (
        "Features and qualities to avoid in the generated image. Affirmatively describe "
        "what to avoid, for example: to avoid the color red, include 'red' "
        "rather than 'no red'."
    )

NEGATIVE_PROMPT_DESCRIPTION = "Features and qualities to avoid in the generated image. Affirmatively describe what to avoid, for example: to avoid the color red, include 'red' rather than 'no red'." class-attribute instance-attribute

PROMPT_DESCRIPTION = 'Features and qualities to include in the generated image, descriptive and succinct.' class-attribute instance-attribute

BaseTool

Bases: ActivityMixin, SerializableMixin, RunnableMixin['BaseTool'], ABC

Abstract class for all tools to inherit from for.

Attributes:

Name Type Description
name str

Tool name.

input_memory Optional[list[TaskMemory]]

TaskMemory available in tool activities. Gets automatically set if None.

output_memory Optional[dict[str, list[TaskMemory]]]

TaskMemory that activities write to be default. Gets automatically set if None.

install_dependencies_on_init bool

Determines whether dependencies from the tool requirements.txt file are installed in init.

dependencies_install_directory Optional[str]

Custom dependency install directory.

verbose bool

Determines whether tool operations (such as dependency installation) should be verbose.

off_prompt bool

Determines whether tool activity output goes to the output memory.

Source code in griptape/tools/base_tool.py
@define
class BaseTool(ActivityMixin, SerializableMixin, RunnableMixin["BaseTool"], ABC):
    """Abstract class for all tools to inherit from for.

    Attributes:
        name: Tool name.
        input_memory: TaskMemory available in tool activities. Gets automatically set if None.
        output_memory: TaskMemory that activities write to be default. Gets automatically set if None.
        install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init.
        dependencies_install_directory: Custom dependency install directory.
        verbose: Determines whether tool operations (such as dependency installation) should be verbose.
        off_prompt: Determines whether tool activity output goes to the output memory.
    """

    REQUIREMENTS_FILE = "requirements.txt"

    name: str = field(
        default=Factory(lambda self: self.__class__.__name__, takes_self=True),
        kw_only=True,
        metadata={"serializable": True},
    )
    input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True, metadata={"serializable": True})
    output_memory: Optional[dict[str, list[TaskMemory]]] = field(
        default=None, kw_only=True, metadata={"serializable": True}
    )
    install_dependencies_on_init: bool = field(default=True, kw_only=True, metadata={"serializable": True})
    dependencies_install_directory: Optional[str] = field(default=None, kw_only=True, metadata={"serializable": True})
    verbose: bool = field(default=False, kw_only=True, metadata={"serializable": True})
    off_prompt: bool = field(default=False, kw_only=True, metadata={"serializable": True})

    def __attrs_post_init__(self) -> None:
        if (
            self.install_dependencies_on_init
            and self.has_requirements
            and not self.are_requirements_met(self.requirements_path)
        ):
            self.install_dependencies(os.environ.copy())

    @output_memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
    def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
        if output_memory:
            for activity_name, memory_list in output_memory.items():
                if not self.find_activity(activity_name):
                    raise ValueError(f"activity {activity_name} doesn't exist")
                if memory_list is None:
                    raise ValueError(f"memory list for activity '{activity_name}' can't be None")

                output_memory_names = [memory.name for memory in memory_list]

                if len(output_memory_names) > len(set(output_memory_names)):
                    raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

    @property
    def requirements_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE)

    @property
    def abs_file_path(self) -> str:
        return os.path.abspath(inspect.getfile(self.__class__))

    @property
    def abs_dir_path(self) -> str:
        return os.path.dirname(self.abs_file_path)

    @property
    def has_requirements(self) -> bool:
        return os.path.exists(self.requirements_path)

    # This method has to remain a method and can't be decorated with @property because
    # of the max depth recursion issue in `self.activities`.
    def schema(self) -> dict:
        full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

        return full_schema.json_schema(f"{self.name} ToolAction Schema")

    def activity_schemas(self) -> list[Schema]:
        schemas = []

        for activity in self.activities():
            schema_dict: dict[Literal | schema.Optional, Any] = {
                Literal("name"): self.name,
                Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
            }

            activity_schema = self.activity_schema(activity)
            # If no schema is defined, we just make `input` optional instead of omitting it.
            # This works better with lower-end models that may accidentally pass in an empty dict.
            if activity_schema is None:
                schema_dict[schema.Optional("input")] = {}
            else:
                schema_dict[Literal("input")] = activity_schema.schema

            schemas.append(Schema(schema_dict))

        return schemas

    def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact:
        try:
            output = self.before_run(activity, subtask, action)

            output = self.try_run(activity, subtask, action, output)

            output = self.after_run(activity, subtask, action, output)
        except Exception as e:
            logging.debug(traceback.format_exc())
            output = ErrorArtifact(str(e), exception=e)

        return output

    def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]:
        super().before_run()

        return action.input

    @observable(tags=["Tool.run()"])
    def try_run(
        self,
        activity: Callable,
        subtask: ActionsSubtask,
        action: ToolAction,
        value: Optional[dict],
    ) -> BaseArtifact:
        activity_result = activity(deepcopy(value))

        if isinstance(activity_result, BaseArtifact):
            result = activity_result
        else:
            logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

            if activity_result is None:
                result = InfoArtifact("Tool returned an empty value")
            else:
                result = InfoArtifact(activity_result)

        return result

    def after_run(
        self,
        activity: Callable,
        subtask: ActionsSubtask,
        action: ToolAction,
        value: BaseArtifact,
    ) -> BaseArtifact:
        super().after_run()

        if self.output_memory:
            output_memories = self.output_memory[getattr(activity, "name")] or []
            for memory in output_memories:
                value = memory.process_output(activity, subtask, value)

            return value
        return value

    def validate(self) -> bool:
        if not os.path.exists(self.requirements_path):
            raise Exception(f"{self.REQUIREMENTS_FILE} not found")
        return True

    def tool_dir(self) -> str:
        class_file = inspect.getfile(self.__class__)

        return os.path.dirname(os.path.abspath(class_file))

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        env = env or {}

        command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

        if self.dependencies_install_directory is None:
            command.extend(["-U"])
        else:
            command.extend(["-t", self.dependencies_install_directory])

        subprocess.run(
            command,
            env=env,
            cwd=self.tool_dir(),
            stdout=None if self.verbose else subprocess.DEVNULL,
            stderr=None if self.verbose else subprocess.DEVNULL,
            check=False,
        )

    def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
        if self.input_memory:
            return next((m for m in self.input_memory if m.name == memory_name), None)
        return None

    def to_native_tool_name(self, activity: Callable) -> str:
        """Converts a Tool's name and an Activity into to a native tool name.

        The native tool name is a combination of the Tool's name and the Activity's name.
        The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

        Args:
            activity: Activity to convert

        Returns:
            str: Native tool name.
        """
        tool_name = self.name
        if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None:
            raise ValueError("Tool name can only contain letters and numbers.")

        activity_name = self.activity_name(activity)
        if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None:
            raise ValueError("Activity name can only contain letters, numbers, and underscores.")

        return f"{tool_name}_{activity_name}"

    def are_requirements_met(self, requirements_path: str) -> bool:
        requirements = Path(requirements_path).read_text().splitlines()

        try:
            for requirement in requirements:
                importlib.metadata.version(requirement)
            return True
        except importlib.metadata.PackageNotFoundError:
            return False

REQUIREMENTS_FILE = 'requirements.txt' class-attribute instance-attribute

abs_dir_path property

abs_file_path property

dependencies_install_directory = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

has_requirements property

input_memory = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

install_dependencies_on_init = field(default=True, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

name = field(default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

off_prompt = field(default=False, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

output_memory = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

requirements_path property

verbose = field(default=False, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None:
    if (
        self.install_dependencies_on_init
        and self.has_requirements
        and not self.are_requirements_met(self.requirements_path)
    ):
        self.install_dependencies(os.environ.copy())

activity_schemas()

Source code in griptape/tools/base_tool.py
def activity_schemas(self) -> list[Schema]:
    schemas = []

    for activity in self.activities():
        schema_dict: dict[Literal | schema.Optional, Any] = {
            Literal("name"): self.name,
            Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
        }

        activity_schema = self.activity_schema(activity)
        # If no schema is defined, we just make `input` optional instead of omitting it.
        # This works better with lower-end models that may accidentally pass in an empty dict.
        if activity_schema is None:
            schema_dict[schema.Optional("input")] = {}
        else:
            schema_dict[Literal("input")] = activity_schema.schema

        schemas.append(Schema(schema_dict))

    return schemas

after_run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
def after_run(
    self,
    activity: Callable,
    subtask: ActionsSubtask,
    action: ToolAction,
    value: BaseArtifact,
) -> BaseArtifact:
    super().after_run()

    if self.output_memory:
        output_memories = self.output_memory[getattr(activity, "name")] or []
        for memory in output_memories:
            value = memory.process_output(activity, subtask, value)

        return value
    return value

are_requirements_met(requirements_path)

Source code in griptape/tools/base_tool.py
def are_requirements_met(self, requirements_path: str) -> bool:
    requirements = Path(requirements_path).read_text().splitlines()

    try:
        for requirement in requirements:
            importlib.metadata.version(requirement)
        return True
    except importlib.metadata.PackageNotFoundError:
        return False

before_run(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]:
    super().before_run()

    return action.input

find_input_memory(memory_name)

Source code in griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
    if self.input_memory:
        return next((m for m in self.input_memory if m.name == memory_name), None)
    return None

install_dependencies(env=None)

Source code in griptape/tools/base_tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    env = env or {}

    command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

    if self.dependencies_install_directory is None:
        command.extend(["-U"])
    else:
        command.extend(["-t", self.dependencies_install_directory])

    subprocess.run(
        command,
        env=env,
        cwd=self.tool_dir(),
        stdout=None if self.verbose else subprocess.DEVNULL,
        stderr=None if self.verbose else subprocess.DEVNULL,
        check=False,
    )

run(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact:
    try:
        output = self.before_run(activity, subtask, action)

        output = self.try_run(activity, subtask, action, output)

        output = self.after_run(activity, subtask, action, output)
    except Exception as e:
        logging.debug(traceback.format_exc())
        output = ErrorArtifact(str(e), exception=e)

    return output

schema()

Source code in griptape/tools/base_tool.py
def schema(self) -> dict:
    full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

    return full_schema.json_schema(f"{self.name} ToolAction Schema")

to_native_tool_name(activity)

Converts a Tool's name and an Activity into to a native tool name.

The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

Parameters:

Name Type Description Default
activity Callable

Activity to convert

required

Returns:

Name Type Description
str str

Native tool name.

Source code in griptape/tools/base_tool.py
def to_native_tool_name(self, activity: Callable) -> str:
    """Converts a Tool's name and an Activity into to a native tool name.

    The native tool name is a combination of the Tool's name and the Activity's name.
    The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

    Args:
        activity: Activity to convert

    Returns:
        str: Native tool name.
    """
    tool_name = self.name
    if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None:
        raise ValueError("Tool name can only contain letters and numbers.")

    activity_name = self.activity_name(activity)
    if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None:
        raise ValueError("Activity name can only contain letters, numbers, and underscores.")

    return f"{tool_name}_{activity_name}"

tool_dir()

Source code in griptape/tools/base_tool.py
def tool_dir(self) -> str:
    class_file = inspect.getfile(self.__class__)

    return os.path.dirname(os.path.abspath(class_file))

try_run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
@observable(tags=["Tool.run()"])
def try_run(
    self,
    activity: Callable,
    subtask: ActionsSubtask,
    action: ToolAction,
    value: Optional[dict],
) -> BaseArtifact:
    activity_result = activity(deepcopy(value))

    if isinstance(activity_result, BaseArtifact):
        result = activity_result
    else:
        logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

        if activity_result is None:
            result = InfoArtifact("Tool returned an empty value")
        else:
            result = InfoArtifact(activity_result)

    return result

validate()

Source code in griptape/tools/base_tool.py
def validate(self) -> bool:
    if not os.path.exists(self.requirements_path):
        raise Exception(f"{self.REQUIREMENTS_FILE} not found")
    return True

validate_output_memory(_, output_memory)

Source code in griptape/tools/base_tool.py
@output_memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
    if output_memory:
        for activity_name, memory_list in output_memory.items():
            if not self.find_activity(activity_name):
                raise ValueError(f"activity {activity_name} doesn't exist")
            if memory_list is None:
                raise ValueError(f"memory list for activity '{activity_name}' can't be None")

            output_memory_names = [memory.name for memory in memory_list]

            if len(output_memory_names) > len(set(output_memory_names)):
                raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

CalculatorTool

Bases: BaseTool

Source code in griptape/tools/calculator/tool.py
class CalculatorTool(BaseTool):
    @activity(
        config={
            "description": "Can be used for computing simple numerical or algebraic calculations in Python",
            "schema": Schema(
                {
                    Literal(
                        "expression",
                        description="Arithmetic expression parsable in pure Python. Single line only. "
                        "Don't use variables. Don't use any imports or external libraries",
                    ): str,
                },
            ),
        },
    )
    def calculate(self, params: dict) -> BaseArtifact:
        import numexpr  # pyright: ignore[reportMissingImports]

        try:
            expression = params["values"]["expression"]

            return TextArtifact(numexpr.evaluate(expression))
        except Exception as e:
            return ErrorArtifact(f"error calculating: {e}")

calculate(params)

Source code in griptape/tools/calculator/tool.py
@activity(
    config={
        "description": "Can be used for computing simple numerical or algebraic calculations in Python",
        "schema": Schema(
            {
                Literal(
                    "expression",
                    description="Arithmetic expression parsable in pure Python. Single line only. "
                    "Don't use variables. Don't use any imports or external libraries",
                ): str,
            },
        ),
    },
)
def calculate(self, params: dict) -> BaseArtifact:
    import numexpr  # pyright: ignore[reportMissingImports]

    try:
        expression = params["values"]["expression"]

        return TextArtifact(numexpr.evaluate(expression))
    except Exception as e:
        return ErrorArtifact(f"error calculating: {e}")

ComputerTool

Bases: BaseTool

Source code in griptape/tools/computer/tool.py
@define
class ComputerTool(BaseTool):
    local_workdir: Optional[str] = field(default=None, kw_only=True)
    container_workdir: str = field(default="/griptape", kw_only=True)
    env_vars: dict = field(factory=dict, kw_only=True)
    dockerfile_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True),
        kw_only=True,
    )
    requirements_txt_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True),
        kw_only=True,
    )
    docker_client: DockerClient = field(
        default=Factory(lambda self: self.default_docker_client(), takes_self=True),
        kw_only=True,
    )

    _tempdir: Optional[tempfile.TemporaryDirectory] = field(default=None, kw_only=True)

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()

        if self.local_workdir:
            Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
        else:
            self._tempdir = tempfile.TemporaryDirectory()
            self.local_workdir = self._tempdir.name

    @docker_client.validator  # pyright: ignore[reportAttributeAccessIssue]
    def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None:
        if not docker_client:
            raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        super().install_dependencies(env)

        self.remove_existing_container(self.container_name(self))
        self.build_image(self)

    @activity(
        config={
            "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
            " files in the file system. If you need to use code output use `print` statements. "
            "You have access to the following external Python libraries: "
            "{{ _self.dependencies() }}",
            "schema": Schema(
                {
                    Literal("code", description="Python code to execute"): str,
                    Literal(
                        "filename",
                        description="name of the file to put the Python code in before executing it",
                    ): str,
                },
            ),
        },
    )
    def execute_code(self, params: dict) -> BaseArtifact:
        code = params["values"]["code"]
        filename = params["values"]["filename"]

        return self.execute_code_in_container(filename, code)

    @activity(
        config={
            "description": "Can be used to execute shell commands in Linux",
            "schema": Schema({Literal("command", description="shell command to execute"): str}),
        },
    )
    def execute_command(self, params: dict) -> BaseArtifact:
        command = params["values"]["command"]

        return self.execute_command_in_container(command)

    def execute_command_in_container(self, command: str) -> BaseArtifact:
        from docker.models.containers import Container

        try:
            binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None

            container = self.docker_client.containers.run(  # pyright: ignore[reportCallIssue]
                self.image_name(self),
                environment=self.env_vars,
                command=command,
                name=self.container_name(self),
                volumes=binds,  # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]].
                stdout=True,
                stderr=True,
                detach=True,
            )

            if isinstance(container, Container):
                container.wait()

                stderr = container.logs(stdout=False, stderr=True).decode().strip()
                stdout = container.logs(stdout=True, stderr=False).decode().strip()

                container.stop()
                container.remove()

                if stderr:
                    return ErrorArtifact(stderr)
                return TextArtifact(stdout)
            return ErrorArtifact("error running container")
        except Exception as e:
            return ErrorArtifact(f"error executing command: {e}")

    def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
        container_file_path = os.path.join(self.container_workdir, filename)

        if self.local_workdir:
            tempdir = None
            local_workdir = self.local_workdir
        else:
            tempdir = tempfile.TemporaryDirectory()
            local_workdir = tempdir.name

        local_file_path = os.path.join(local_workdir, filename)

        try:
            Path(local_file_path).write_text(code)

            return self.execute_command_in_container(f"python {container_file_path}")
        except Exception as e:
            return ErrorArtifact(f"error executing code: {e}")
        finally:
            if tempdir:
                tempdir.cleanup()

    def default_docker_client(self) -> Optional[DockerClient]:
        import docker

        try:
            return docker.from_env()
        except Exception as e:
            logging.exception(e)

            return None

    def image_name(self, tool: BaseTool) -> str:
        import stringcase  # pyright: ignore[reportMissingImports]

        return f"{stringcase.snakecase(tool.name)}_image"

    def container_name(self, tool: BaseTool) -> str:
        import stringcase  # pyright: ignore[reportMissingImports]

        return f"{stringcase.snakecase(tool.name)}_container"

    def remove_existing_container(self, name: str) -> None:
        from docker.errors import NotFound
        from docker.models.containers import Container

        try:
            existing_container = self.docker_client.containers.get(name)
            if isinstance(existing_container, Container):
                existing_container.remove(force=True)

                logging.info("Removed existing container %s", name)
        except NotFound:
            pass

    def build_image(self, tool: BaseTool) -> None:
        with tempfile.TemporaryDirectory() as temp_dir:
            shutil.copy(self.dockerfile_path, temp_dir)
            shutil.copy(self.requirements_txt_path, temp_dir)

            image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

            if isinstance(image, tuple):
                logging.info("Built image: %s", image[0].short_id)

    def dependencies(self) -> list[str]:
        with open(self.requirements_txt_path) as file:
            return [line.strip() for line in file]

    def __del__(self) -> None:
        if self._tempdir:
            self._tempdir.cleanup()

_tempdir = field(default=None, kw_only=True) class-attribute instance-attribute

container_workdir = field(default='/griptape', kw_only=True) class-attribute instance-attribute

docker_client = field(default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True) class-attribute instance-attribute

dockerfile_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True) class-attribute instance-attribute

env_vars = field(factory=dict, kw_only=True) class-attribute instance-attribute

local_workdir = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_txt_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()

    if self.local_workdir:
        Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
    else:
        self._tempdir = tempfile.TemporaryDirectory()
        self.local_workdir = self._tempdir.name

__del__()

Source code in griptape/tools/computer/tool.py
def __del__(self) -> None:
    if self._tempdir:
        self._tempdir.cleanup()

build_image(tool)

Source code in griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None:
    with tempfile.TemporaryDirectory() as temp_dir:
        shutil.copy(self.dockerfile_path, temp_dir)
        shutil.copy(self.requirements_txt_path, temp_dir)

        image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

        if isinstance(image, tuple):
            logging.info("Built image: %s", image[0].short_id)

container_name(tool)

Source code in griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str:
    import stringcase  # pyright: ignore[reportMissingImports]

    return f"{stringcase.snakecase(tool.name)}_container"

default_docker_client()

Source code in griptape/tools/computer/tool.py
def default_docker_client(self) -> Optional[DockerClient]:
    import docker

    try:
        return docker.from_env()
    except Exception as e:
        logging.exception(e)

        return None

dependencies()

Source code in griptape/tools/computer/tool.py
def dependencies(self) -> list[str]:
    with open(self.requirements_txt_path) as file:
        return [line.strip() for line in file]

execute_code(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
        " files in the file system. If you need to use code output use `print` statements. "
        "You have access to the following external Python libraries: "
        "{{ _self.dependencies() }}",
        "schema": Schema(
            {
                Literal("code", description="Python code to execute"): str,
                Literal(
                    "filename",
                    description="name of the file to put the Python code in before executing it",
                ): str,
            },
        ),
    },
)
def execute_code(self, params: dict) -> BaseArtifact:
    code = params["values"]["code"]
    filename = params["values"]["filename"]

    return self.execute_code_in_container(filename, code)

execute_code_in_container(filename, code)

Source code in griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
    container_file_path = os.path.join(self.container_workdir, filename)

    if self.local_workdir:
        tempdir = None
        local_workdir = self.local_workdir
    else:
        tempdir = tempfile.TemporaryDirectory()
        local_workdir = tempdir.name

    local_file_path = os.path.join(local_workdir, filename)

    try:
        Path(local_file_path).write_text(code)

        return self.execute_command_in_container(f"python {container_file_path}")
    except Exception as e:
        return ErrorArtifact(f"error executing code: {e}")
    finally:
        if tempdir:
            tempdir.cleanup()

execute_command(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute shell commands in Linux",
        "schema": Schema({Literal("command", description="shell command to execute"): str}),
    },
)
def execute_command(self, params: dict) -> BaseArtifact:
    command = params["values"]["command"]

    return self.execute_command_in_container(command)

execute_command_in_container(command)

Source code in griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact:
    from docker.models.containers import Container

    try:
        binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None

        container = self.docker_client.containers.run(  # pyright: ignore[reportCallIssue]
            self.image_name(self),
            environment=self.env_vars,
            command=command,
            name=self.container_name(self),
            volumes=binds,  # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]].
            stdout=True,
            stderr=True,
            detach=True,
        )

        if isinstance(container, Container):
            container.wait()

            stderr = container.logs(stdout=False, stderr=True).decode().strip()
            stdout = container.logs(stdout=True, stderr=False).decode().strip()

            container.stop()
            container.remove()

            if stderr:
                return ErrorArtifact(stderr)
            return TextArtifact(stdout)
        return ErrorArtifact("error running container")
    except Exception as e:
        return ErrorArtifact(f"error executing command: {e}")

image_name(tool)

Source code in griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str:
    import stringcase  # pyright: ignore[reportMissingImports]

    return f"{stringcase.snakecase(tool.name)}_image"

install_dependencies(env=None)

Source code in griptape/tools/computer/tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    super().install_dependencies(env)

    self.remove_existing_container(self.container_name(self))
    self.build_image(self)

remove_existing_container(name)

Source code in griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None:
    from docker.errors import NotFound
    from docker.models.containers import Container

    try:
        existing_container = self.docker_client.containers.get(name)
        if isinstance(existing_container, Container):
            existing_container.remove(force=True)

            logging.info("Removed existing container %s", name)
    except NotFound:
        pass

validate_docker_client(_, docker_client)

Source code in griptape/tools/computer/tool.py
@docker_client.validator  # pyright: ignore[reportAttributeAccessIssue]
def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None:
    if not docker_client:
        raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

DateTimeTool

Bases: BaseTool

Source code in griptape/tools/date_time/tool.py
@define
class DateTimeTool(BaseTool):
    denylist: list[str] = field(default=Factory(lambda: ["get_relative_datetime"]), kw_only=True)

    @activity(config={"description": "Can be used to return current date and time."})
    def get_current_datetime(self) -> BaseArtifact:
        return TextArtifact(str(datetime.now()))

    @activity(
        config={
            "description": "Can be used to return a relative date and time.",
            "schema": schema.Schema(
                {
                    schema.Literal(
                        "relative_date_string",
                        description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", '
                        '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                    ): str,
                },
            ),
        },
    )
    def get_relative_datetime(self, params: dict) -> BaseArtifact:
        from dateparser import parse

        try:
            date_string = params["values"]["relative_date_string"]
            relative_datetime = parse(date_string)

            if relative_datetime:
                return TextArtifact(str(relative_datetime))
            return ErrorArtifact("invalid date string")
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

    @activity(
        config={
            "description": "Can be used to add a timedelta to a datetime.",
            "schema": schema.Schema(
                {
                    schema.Literal(
                        "timedelta_kwargs",
                        description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}',
                    ): dict,
                    schema.Optional(
                        schema.Literal(
                            "iso_datetime",
                            description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.',
                        )
                    ): str,
                },
            ),
        },
    )
    def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact:
        if iso_datetime is None:
            iso_datetime = datetime.now().isoformat()
        return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat())

    @activity(
        config={
            "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.",
            "schema": schema.Schema(
                {
                    schema.Literal(
                        "start_datetime",
                        description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"',
                    ): str,
                    schema.Literal(
                        "end_datetime",
                        description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"',
                    ): str,
                }
            ),
        }
    )
    def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact:
        return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))

denylist = field(default=Factory(lambda: ['get_relative_datetime']), kw_only=True) class-attribute instance-attribute

add_timedelta(timedelta_kwargs, iso_datetime=None)

Source code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to add a timedelta to a datetime.",
        "schema": schema.Schema(
            {
                schema.Literal(
                    "timedelta_kwargs",
                    description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}',
                ): dict,
                schema.Optional(
                    schema.Literal(
                        "iso_datetime",
                        description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.',
                    )
                ): str,
            },
        ),
    },
)
def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact:
    if iso_datetime is None:
        iso_datetime = datetime.now().isoformat()
    return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat())

get_current_datetime()

Source code in griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."})
def get_current_datetime(self) -> BaseArtifact:
    return TextArtifact(str(datetime.now()))

get_datetime_diff(start_datetime, end_datetime)

Source code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.",
        "schema": schema.Schema(
            {
                schema.Literal(
                    "start_datetime",
                    description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"',
                ): str,
                schema.Literal(
                    "end_datetime",
                    description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"',
                ): str,
            }
        ),
    }
)
def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact:
    return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))

get_relative_datetime(params)

Source code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to return a relative date and time.",
        "schema": schema.Schema(
            {
                schema.Literal(
                    "relative_date_string",
                    description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", '
                    '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                ): str,
            },
        ),
    },
)
def get_relative_datetime(self, params: dict) -> BaseArtifact:
    from dateparser import parse

    try:
        date_string = params["values"]["relative_date_string"]
        relative_datetime = parse(date_string)

        if relative_datetime:
            return TextArtifact(str(relative_datetime))
        return ErrorArtifact("invalid date string")
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

EmailTool

Bases: BaseTool

Tool for working with email.

Attributes:

Name Type Description
username Optional[str]

Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com.

password Optional[str]

Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password.

email_max_retrieve_count Optional[int]

Used to limit the number of messages retrieved during any given activities.

smtp_host Optional[str]

Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the send activity.

smtp_port Optional[int]

Port of the SMTP server. Example: 465. Required when using the send activity.

smtp_use_ssl bool

Whether to use SSL when sending email via the SMTP protocol.

smtp_user Optional[str]

Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the send activity.

smtp_password Optional[str]

Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the send activity.

imap_url Optional[str]

Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the retrieve activity.

imap_user Optional[str]

Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the retrieve activity.

imap_password Optional[str]

Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the retrieve activity.

mailboxes Optional[dict[str, Optional[str]]]

Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the retrieve activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}

email_loader EmailLoader

Instance of EmailLoader.

Source code in griptape/tools/email/tool.py
@define
class EmailTool(BaseTool):
    """Tool for working with email.

    Attributes:
        username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol.
            Example: bender@futurama.com.
        password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail,
            this would be an App Password.
        email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities.
        smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the `send` activity.
        smtp_port: Port of the SMTP server. Example: 465. Required when using the `send` activity.
        smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol.
        smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the `send` activity.
        smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the `send` activity.
        imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the `retrieve` activity.
        imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the `retrieve` activity.
        imap_password: Password to retrieve email via the IMAP protocol.  Overrides password for IMAP only. Required when using the `retrieve` activity.
        mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the `retrieve` activity.
            Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}
        email_loader: Instance of `EmailLoader`.
    """

    username: Optional[str] = field(default=None, kw_only=True)
    password: Optional[str] = field(default=None, kw_only=True)
    email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True)
    smtp_host: Optional[str] = field(default=None, kw_only=True)
    smtp_port: Optional[int] = field(default=None, kw_only=True)
    smtp_use_ssl: bool = field(default=True, kw_only=True)
    smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    imap_url: Optional[str] = field(default=None, kw_only=True)
    imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    mailboxes: Optional[dict[str, Optional[str]]] = field(default=None, kw_only=True)
    email_loader: EmailLoader = field(
        default=Factory(
            lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password),
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to retrieve emails."
            "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
            "schema": Schema(
                {
                    Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                    schema.Optional(
                        Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"),
                    ): str,
                    schema.Optional(
                        Literal("search_criteria", description="Optional search criteria to filter emails by key"),
                    ): str,
                    schema.Optional(Literal("max_count", description="Optional max email count")): int,
                },
            ),
        },
    )
    def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
        if self.mailboxes is None:
            return ErrorArtifact("mailboxes is required")

        values = params["values"]
        max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

        return self.email_loader.load(
            EmailLoader.EmailQuery(
                label=values["label"],
                key=values.get("key"),
                search_criteria=values.get("search_criteria"),
                max_count=max_count,
            ),
        )

    @activity(
        config={
            "description": "Can be used to send emails",
            "schema": Schema(
                {
                    Literal("to", description="Recipient's email address"): str,
                    Literal("subject", description="Email subject"): str,
                    Literal("body", description="Email body"): str,
                },
            ),
        },
    )
    def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        if self.smtp_user is None:
            return ErrorArtifact("smtp_user is required")
        if self.smtp_password is None:
            return ErrorArtifact("smtp_password is required")
        if self.smtp_host is None:
            return ErrorArtifact("smtp_host is required")
        if self.smtp_port is None:
            return ErrorArtifact("smtp_port is required")

        msg = MIMEText(values["body"])
        msg["Subject"] = values["subject"]
        msg["From"] = self.smtp_user
        msg["To"] = values["to"]

        try:
            with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
                client.login(self.smtp_user, self.smtp_password)
                client.sendmail(msg["From"], [msg["To"]], msg.as_string())
                return InfoArtifact("email was successfully sent")
        except Exception as e:
            logging.exception(e)
            return ErrorArtifact(f"error sending email: {e}")

    def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL:
        if self.smtp_use_ssl:
            return smtplib.SMTP_SSL(smtp_host, smtp_port)
        return smtplib.SMTP(smtp_host, smtp_port)

email_loader = field(default=Factory(lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True) class-attribute instance-attribute

email_max_retrieve_count = field(default=None, kw_only=True) class-attribute instance-attribute

imap_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

imap_url = field(default=None, kw_only=True) class-attribute instance-attribute

imap_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

mailboxes = field(default=None, kw_only=True) class-attribute instance-attribute

password = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_host = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

smtp_port = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_use_ssl = field(default=True, kw_only=True) class-attribute instance-attribute

smtp_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

username = field(default=None, kw_only=True) class-attribute instance-attribute

_create_smtp_client(smtp_host, smtp_port)

Source code in griptape/tools/email/tool.py
def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL:
    if self.smtp_use_ssl:
        return smtplib.SMTP_SSL(smtp_host, smtp_port)
    return smtplib.SMTP(smtp_host, smtp_port)

retrieve(params)

Source code in griptape/tools/email/tool.py
@activity(
    config={
        "description": "Can be used to retrieve emails."
        "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
        "schema": Schema(
            {
                Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                schema.Optional(
                    Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"),
                ): str,
                schema.Optional(
                    Literal("search_criteria", description="Optional search criteria to filter emails by key"),
                ): str,
                schema.Optional(Literal("max_count", description="Optional max email count")): int,
            },
        ),
    },
)
def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
    if self.mailboxes is None:
        return ErrorArtifact("mailboxes is required")

    values = params["values"]
    max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

    return self.email_loader.load(
        EmailLoader.EmailQuery(
            label=values["label"],
            key=values.get("key"),
            search_criteria=values.get("search_criteria"),
            max_count=max_count,
        ),
    )

send(params)

Source code in griptape/tools/email/tool.py
@activity(
    config={
        "description": "Can be used to send emails",
        "schema": Schema(
            {
                Literal("to", description="Recipient's email address"): str,
                Literal("subject", description="Email subject"): str,
                Literal("body", description="Email body"): str,
            },
        ),
    },
)
def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    if self.smtp_user is None:
        return ErrorArtifact("smtp_user is required")
    if self.smtp_password is None:
        return ErrorArtifact("smtp_password is required")
    if self.smtp_host is None:
        return ErrorArtifact("smtp_host is required")
    if self.smtp_port is None:
        return ErrorArtifact("smtp_port is required")

    msg = MIMEText(values["body"])
    msg["Subject"] = values["subject"]
    msg["From"] = self.smtp_user
    msg["To"] = values["to"]

    try:
        with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
            client.login(self.smtp_user, self.smtp_password)
            client.sendmail(msg["From"], [msg["To"]], msg.as_string())
            return InfoArtifact("email was successfully sent")
    except Exception as e:
        logging.exception(e)
        return ErrorArtifact(f"error sending email: {e}")

ExtractionTool

Bases: BaseTool, RuleMixin

Tool for using an Extraction Engine.

Attributes:

Name Type Description
extraction_engine BaseExtractionEngine

ExtractionEngine.

Source code in griptape/tools/extraction/tool.py
@define(kw_only=True)
class ExtractionTool(BaseTool, RuleMixin):
    """Tool for using an Extraction Engine.

    Attributes:
        extraction_engine: `ExtractionEngine`.
    """

    extraction_engine: BaseExtractionEngine = field()

    @activity(
        config={
            "description": "Can be used extract structured text from data.",
            "schema": Schema(
                {
                    Literal("data"): Or(
                        str,
                        Schema(
                            {
                                "memory_name": str,
                                "artifact_namespace": str,
                            }
                        ),
                    ),
                }
            ),
        },
    )
    def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
        data = params["values"]["data"]

        if isinstance(data, str):
            artifacts = ListArtifact([TextArtifact(data)])
        else:
            memory = self.find_input_memory(data["memory_name"])
            artifact_namespace = data["artifact_namespace"]

            if memory is not None:
                artifacts = memory.load_artifacts(artifact_namespace)
            else:
                return ErrorArtifact("memory not found")

        return self.extraction_engine.extract_artifacts(artifacts)

extraction_engine = field() class-attribute instance-attribute

extract(params)

Source code in griptape/tools/extraction/tool.py
@activity(
    config={
        "description": "Can be used extract structured text from data.",
        "schema": Schema(
            {
                Literal("data"): Or(
                    str,
                    Schema(
                        {
                            "memory_name": str,
                            "artifact_namespace": str,
                        }
                    ),
                ),
            }
        ),
    },
)
def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
    data = params["values"]["data"]

    if isinstance(data, str):
        artifacts = ListArtifact([TextArtifact(data)])
    else:
        memory = self.find_input_memory(data["memory_name"])
        artifact_namespace = data["artifact_namespace"]

        if memory is not None:
            artifacts = memory.load_artifacts(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

    return self.extraction_engine.extract_artifacts(artifacts)

FileManagerTool

Bases: BaseTool

FileManagerTool is a tool that can be used to list, load, and save files.

Attributes:

Name Type Description
file_manager_driver BaseFileManagerDriver

File Manager Driver to use to list, load, and save files.

Source code in griptape/tools/file_manager/tool.py
@define
class FileManagerTool(BaseTool):
    """FileManagerTool is a tool that can be used to list, load, and save files.

    Attributes:
        file_manager_driver: File Manager Driver to use to list, load, and save files.
    """

    file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True)

    loaders: dict[str, loaders.BaseLoader] = field(
        default=Factory(
            lambda self: {
                "application/pdf": loaders.PdfLoader(file_manager_driver=self.file_manager_driver),
                "text/csv": loaders.CsvLoader(file_manager_driver=self.file_manager_driver),
                "text": loaders.TextLoader(file_manager_driver=self.file_manager_driver),
                "image": loaders.ImageLoader(file_manager_driver=self.file_manager_driver),
                "application/octet-stream": BlobLoader(file_manager_driver=self.file_manager_driver),
            },
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to list files on disk",
            "schema": Schema(
                {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str},
            ),
        },
    )
    def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        path = params["values"]["path"]
        return self.file_manager_driver.list_files(path)

    @activity(
        config={
            "description": "Can be used to load files from disk",
            "schema": Schema(
                {
                    Literal(
                        "paths",
                        description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                    ): Schema([str]),
                },
            ),
        },
    )
    def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
        paths = params["values"]["paths"]
        artifacts = []

        for path in paths:
            # Fetch the file to try and determine the appropriate loader
            abs_path = os.path.join(self.file_manager_driver.workdir, path)
            mime_type = get_mime_type(abs_path)
            loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key)))

            artifact = loader.load(path)
            if isinstance(artifact, ListArtifact):
                artifacts.extend(artifact.value)
            else:
                artifacts.append(artifact)
        return ListArtifact(artifacts)

    @activity(
        config={
            "description": "Can be used to save memory artifacts to disk",
            "schema": Schema(
                {
                    Literal(
                        "dir_name",
                        description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                    ): str,
                    Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                },
            ),
        },
    )
    def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
        dir_name = params["values"]["dir_name"]
        file_name = params["values"]["file_name"]
        memory_name = params["values"]["memory_name"]
        artifact_namespace = params["values"]["artifact_namespace"]

        memory = self.find_input_memory(params["values"]["memory_name"])
        if not memory:
            return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

        list_artifact = memory.load_artifacts(artifact_namespace)

        if len(list_artifact) == 0:
            return ErrorArtifact(
                f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts",
            )

        for artifact in list_artifact.value:
            formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
            try:
                value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text()
                self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value)
            except FileNotFoundError:
                return ErrorArtifact("Path not found")
            except IsADirectoryError:
                return ErrorArtifact("Path is a directory")
            except NotADirectoryError:
                return ErrorArtifact("Not a directory")
            except Exception as e:
                return ErrorArtifact(f"Failed to load file: {e!s}")

        return InfoArtifact("Successfully saved memory artifacts to disk")

    @activity(
        config={
            "description": "Can be used to save content to a file",
            "schema": Schema(
                {
                    Literal(
                        "path",
                        description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                    ): str,
                    "content": str,
                },
            ),
        },
    )
    def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
        path = params["values"]["path"]
        content = params["values"]["content"]
        return self.file_manager_driver.save_file(path, content)

file_manager_driver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) class-attribute instance-attribute

loaders = field(default=Factory(lambda self: {'application/pdf': loaders.PdfLoader(file_manager_driver=self.file_manager_driver), 'text/csv': loaders.CsvLoader(file_manager_driver=self.file_manager_driver), 'text': loaders.TextLoader(file_manager_driver=self.file_manager_driver), 'image': loaders.ImageLoader(file_manager_driver=self.file_manager_driver), 'application/octet-stream': BlobLoader(file_manager_driver=self.file_manager_driver)}, takes_self=True), kw_only=True) class-attribute instance-attribute

list_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to list files on disk",
        "schema": Schema(
            {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str},
        ),
    },
)
def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    path = params["values"]["path"]
    return self.file_manager_driver.list_files(path)

load_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to load files from disk",
        "schema": Schema(
            {
                Literal(
                    "paths",
                    description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                ): Schema([str]),
            },
        ),
    },
)
def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
    paths = params["values"]["paths"]
    artifacts = []

    for path in paths:
        # Fetch the file to try and determine the appropriate loader
        abs_path = os.path.join(self.file_manager_driver.workdir, path)
        mime_type = get_mime_type(abs_path)
        loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key)))

        artifact = loader.load(path)
        if isinstance(artifact, ListArtifact):
            artifacts.extend(artifact.value)
        else:
            artifacts.append(artifact)
    return ListArtifact(artifacts)

save_content_to_file(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save content to a file",
        "schema": Schema(
            {
                Literal(
                    "path",
                    description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                ): str,
                "content": str,
            },
        ),
    },
)
def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
    path = params["values"]["path"]
    content = params["values"]["content"]
    return self.file_manager_driver.save_file(path, content)

save_memory_artifacts_to_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save memory artifacts to disk",
        "schema": Schema(
            {
                Literal(
                    "dir_name",
                    description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                ): str,
                Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                "memory_name": str,
                "artifact_namespace": str,
            },
        ),
    },
)
def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
    dir_name = params["values"]["dir_name"]
    file_name = params["values"]["file_name"]
    memory_name = params["values"]["memory_name"]
    artifact_namespace = params["values"]["artifact_namespace"]

    memory = self.find_input_memory(params["values"]["memory_name"])
    if not memory:
        return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

    list_artifact = memory.load_artifacts(artifact_namespace)

    if len(list_artifact) == 0:
        return ErrorArtifact(
            f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts",
        )

    for artifact in list_artifact.value:
        formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
        try:
            value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text()
            self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value)
        except FileNotFoundError:
            return ErrorArtifact("Path not found")
        except IsADirectoryError:
            return ErrorArtifact("Path is a directory")
        except NotADirectoryError:
            return ErrorArtifact("Not a directory")
        except Exception as e:
            return ErrorArtifact(f"Failed to load file: {e!s}")

    return InfoArtifact("Successfully saved memory artifacts to disk")

GriptapeCloudToolTool

Bases: BaseGriptapeCloudTool

Runs a Griptape Cloud hosted Tool.

Attributes:

Name Type Description
tool_id str

The ID of the tool to run.

Source code in griptape/tools/griptape_cloud_tool/tool.py
@define()
class GriptapeCloudToolTool(BaseGriptapeCloudTool):
    """Runs a Griptape Cloud hosted Tool.

    Attributes:
        tool_id: The ID of the tool to run.
    """

    tool_id: str = field(kw_only=True)

    def __attrs_post_init__(self) -> None:
        self._init_activities()

    def _init_activities(self) -> None:
        schema = self._get_schema()
        tool_name, activity_schemas = self._parse_schema(schema)

        if self.name == self.__class__.__name__:
            self.name = tool_name

        for activity_name, (description, activity_schema) in activity_schemas.items():
            activity_handler = self._create_activity_handler(activity_name, description, activity_schema)

            setattr(self, activity_name, MethodType(activity_handler, self))

    def _get_schema(self) -> dict:
        response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers)

        response.raise_for_status()
        schema = response.json()

        if not isinstance(schema, dict):
            raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}")

        if "error" in schema and "tool_run_id" in schema:
            raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}")

        return schema

    def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]:
        """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas."""
        activities = {}

        name = schema.get("info", {}).get("title")

        for path, path_info in schema.get("paths", {}).items():
            if not path.startswith("/activities"):
                continue
            for method, method_info in path_info.items():
                if "post" in method.lower():
                    activity_name = method_info["operationId"]
                    description = method_info.get("description", "")

                    activity_schema = self.__extract_schema_from_ref(
                        schema,
                        method_info.get("requestBody", {})
                        .get("content", {})
                        .get("application/json", {})
                        .get("schema", {}),
                    )

                    activities[activity_name] = (description, activity_schema)

        return name, activities

    def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema:
        """Extracts a schema from a $ref if present, resolving it into native schema properties."""
        if "$ref" in schema_ref:
            # Resolve the reference and retrieve the schema data
            ref_path = schema_ref["$ref"].split("/")[-1]
            schema_data = schema["components"]["schemas"].get(ref_path, {})
        else:
            # Use the provided schema directly if no $ref is found
            schema_data = schema_ref

        # Convert the schema_data dictionary into a Schema with its properties
        properties = {}
        for prop, prop_info in schema_data.get("properties", {}).items():
            prop_type = prop_info.get("type", "string")
            prop_description = prop_info.get("description", "")
            schema_prop = Literal(prop, description=prop_description)
            is_optional = prop not in schema_data.get("required", [])

            if is_optional:
                schema_prop = SchemaOptional(schema_prop)

            properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info)

        return Schema(properties)

    def _map_openapi_type_to_python(
        self, openapi_type: str, schema_info: Optional[dict] = None
    ) -> type | list[type] | Or:
        """Maps OpenAPI types to native Python types."""
        type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict}

        if openapi_type == "array" and schema_info is not None and "items" in schema_info:
            enum = schema_info["items"].get("enum")
            if enum:
                return enum
            items_type = schema_info["items"].get("type", "string")
            return [self._map_openapi_type_to_python(items_type)]  # pyright: ignore[reportReturnType]
        if schema_info is not None and schema_info.get("enum"):
            return Or(*schema_info["enum"])
        return type_mapping.get(openapi_type, str)

    def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable:
        """Creates an activity handler method for the tool."""

        @activity(config={"name": activity_name, "description": description, "schema": activity_schema})
        def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any:
            return self._run_activity(activity_name, values)

        return activity_handler

    def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact:
        """Runs an activity on the tool with the provided parameters."""
        url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}")

        response = requests.post(url, json=params, headers=self.headers)

        response.raise_for_status()

        try:
            return BaseArtifact.from_dict(response.json())
        except ValueError:
            return TextArtifact(response.text)

tool_id = field(kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/griptape_cloud_tool/tool.py
def __attrs_post_init__(self) -> None:
    self._init_activities()

__extract_schema_from_ref(schema, schema_ref)

Extracts a schema from a $ref if present, resolving it into native schema properties.

Source code in griptape/tools/griptape_cloud_tool/tool.py
def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema:
    """Extracts a schema from a $ref if present, resolving it into native schema properties."""
    if "$ref" in schema_ref:
        # Resolve the reference and retrieve the schema data
        ref_path = schema_ref["$ref"].split("/")[-1]
        schema_data = schema["components"]["schemas"].get(ref_path, {})
    else:
        # Use the provided schema directly if no $ref is found
        schema_data = schema_ref

    # Convert the schema_data dictionary into a Schema with its properties
    properties = {}
    for prop, prop_info in schema_data.get("properties", {}).items():
        prop_type = prop_info.get("type", "string")
        prop_description = prop_info.get("description", "")
        schema_prop = Literal(prop, description=prop_description)
        is_optional = prop not in schema_data.get("required", [])

        if is_optional:
            schema_prop = SchemaOptional(schema_prop)

        properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info)

    return Schema(properties)

_create_activity_handler(activity_name, description, activity_schema)

Creates an activity handler method for the tool.

Source code in griptape/tools/griptape_cloud_tool/tool.py
def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable:
    """Creates an activity handler method for the tool."""

    @activity(config={"name": activity_name, "description": description, "schema": activity_schema})
    def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any:
        return self._run_activity(activity_name, values)

    return activity_handler

_get_schema()

Source code in griptape/tools/griptape_cloud_tool/tool.py
def _get_schema(self) -> dict:
    response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers)

    response.raise_for_status()
    schema = response.json()

    if not isinstance(schema, dict):
        raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}")

    if "error" in schema and "tool_run_id" in schema:
        raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}")

    return schema

_init_activities()

Source code in griptape/tools/griptape_cloud_tool/tool.py
def _init_activities(self) -> None:
    schema = self._get_schema()
    tool_name, activity_schemas = self._parse_schema(schema)

    if self.name == self.__class__.__name__:
        self.name = tool_name

    for activity_name, (description, activity_schema) in activity_schemas.items():
        activity_handler = self._create_activity_handler(activity_name, description, activity_schema)

        setattr(self, activity_name, MethodType(activity_handler, self))

_map_openapi_type_to_python(openapi_type, schema_info=None)

Maps OpenAPI types to native Python types.

Source code in griptape/tools/griptape_cloud_tool/tool.py
def _map_openapi_type_to_python(
    self, openapi_type: str, schema_info: Optional[dict] = None
) -> type | list[type] | Or:
    """Maps OpenAPI types to native Python types."""
    type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict}

    if openapi_type == "array" and schema_info is not None and "items" in schema_info:
        enum = schema_info["items"].get("enum")
        if enum:
            return enum
        items_type = schema_info["items"].get("type", "string")
        return [self._map_openapi_type_to_python(items_type)]  # pyright: ignore[reportReturnType]
    if schema_info is not None and schema_info.get("enum"):
        return Or(*schema_info["enum"])
    return type_mapping.get(openapi_type, str)

_parse_schema(schema)

Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas.

Source code in griptape/tools/griptape_cloud_tool/tool.py
def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]:
    """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas."""
    activities = {}

    name = schema.get("info", {}).get("title")

    for path, path_info in schema.get("paths", {}).items():
        if not path.startswith("/activities"):
            continue
        for method, method_info in path_info.items():
            if "post" in method.lower():
                activity_name = method_info["operationId"]
                description = method_info.get("description", "")

                activity_schema = self.__extract_schema_from_ref(
                    schema,
                    method_info.get("requestBody", {})
                    .get("content", {})
                    .get("application/json", {})
                    .get("schema", {}),
                )

                activities[activity_name] = (description, activity_schema)

    return name, activities

_run_activity(activity_name, params)

Runs an activity on the tool with the provided parameters.

Source code in griptape/tools/griptape_cloud_tool/tool.py
def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact:
    """Runs an activity on the tool with the provided parameters."""
    url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}")

    response = requests.post(url, json=params, headers=self.headers)

    response.raise_for_status()

    try:
        return BaseArtifact.from_dict(response.json())
    except ValueError:
        return TextArtifact(response.text)

ImageQueryTool

Bases: BaseTool

Source code in griptape/tools/image_query/tool.py
@define
class ImageQueryTool(BaseTool):
    prompt_driver: BasePromptDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True)

    @activity(
        config={
            "description": "This tool can be used to query the contents of images on disk.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_paths", description="The paths to an image files on disk."): Schema([str]),
                },
            ),
        },
    )
    def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_paths = params["values"]["image_paths"]

        image_artifacts = []
        for image_path in image_paths:
            image_artifacts.append(self.image_loader.load(image_path))

        return cast(
            "TextArtifact",
            self.prompt_driver.run(
                PromptStack.from_artifact(
                    ListArtifact([TextArtifact(query), *image_artifacts]),
                )
            ).to_artifact(),
        )

    @activity(
        config={
            "description": "This tool can be used to query the contents of images in memory.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_artifacts", description="Image artifact memory references."): [
                        {"image_artifact_namespace": str, "image_artifact_name": str},
                    ],
                    "memory_name": str,
                },
            ),
        },
    )
    def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_artifact_references = params["values"]["image_artifacts"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        image_artifacts = []
        for image_artifact_reference in image_artifact_references:
            try:
                image_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    ImageArtifact,
                )

                image_artifacts.append(cast("ImageArtifact", image_artifact))
            except ValueError:
                # If we're unable to parse the artifact as an ImageArtifact, attempt to
                # parse a BlobArtifact and load it as an ImageArtifact.
                blob_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    BlobArtifact,
                )

                image_artifacts.append(self.image_loader.load(blob_artifact.value))
            except Exception as e:
                return ErrorArtifact(str(e))

        return cast(
            "TextArtifact",
            self.prompt_driver.run(
                PromptStack.from_artifact(
                    ListArtifact([TextArtifact(query), *image_artifacts]),
                )
            ).to_artifact(),
        )

image_loader = field(default=Factory(lambda: ImageLoader()), kw_only=True) class-attribute instance-attribute

prompt_driver = field(kw_only=True) class-attribute instance-attribute

query_image_from_disk(params)

Source code in griptape/tools/image_query/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images on disk.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_paths", description="The paths to an image files on disk."): Schema([str]),
            },
        ),
    },
)
def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_paths = params["values"]["image_paths"]

    image_artifacts = []
    for image_path in image_paths:
        image_artifacts.append(self.image_loader.load(image_path))

    return cast(
        "TextArtifact",
        self.prompt_driver.run(
            PromptStack.from_artifact(
                ListArtifact([TextArtifact(query), *image_artifacts]),
            )
        ).to_artifact(),
    )

query_images_from_memory(params)

Source code in griptape/tools/image_query/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images in memory.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_artifacts", description="Image artifact memory references."): [
                    {"image_artifact_namespace": str, "image_artifact_name": str},
                ],
                "memory_name": str,
            },
        ),
    },
)
def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_artifact_references = params["values"]["image_artifacts"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    image_artifacts = []
    for image_artifact_reference in image_artifact_references:
        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                ImageArtifact,
            )

            image_artifacts.append(cast("ImageArtifact", image_artifact))
        except ValueError:
            # If we're unable to parse the artifact as an ImageArtifact, attempt to
            # parse a BlobArtifact and load it as an ImageArtifact.
            blob_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                BlobArtifact,
            )

            image_artifacts.append(self.image_loader.load(blob_artifact.value))
        except Exception as e:
            return ErrorArtifact(str(e))

    return cast(
        "TextArtifact",
        self.prompt_driver.run(
            PromptStack.from_artifact(
                ListArtifact([TextArtifact(query), *image_artifacts]),
            )
        ).to_artifact(),
    )

InpaintingImageGenerationTool

Bases: BaseImageGenerationTool

A tool that can be used to generate prompted inpaintings of an image.

Attributes:

Name Type Description
image_generation_driver BaseImageGenerationDriver

The image generation driver used to generate the image.

output_dir Optional[str]

If provided, the generated image will be written to disk in output_dir.

output_file Optional[str]

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/inpainting_image_generation/tool.py
@define
class InpaintingImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted inpaintings of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Modifies an image within a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                },
            ),
        },
    )
    def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        return self._generate_inpainting(
            prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact)
        )

    @activity(
        config={
            "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "image_artifact_name": str,
                    "mask_artifact_namespace": str,
                    "mask_artifact_name": str,
                },
            ),
        },
    )
    def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_namespace,
                image_artifact_name,
                ImageArtifact,
            )
            mask_artifact = load_artifact_from_memory(
                memory,
                mask_artifact_namespace,
                mask_artifact_name,
                ImageArtifact,
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_inpainting(
            prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
        )

    def _generate_inpainting(
        self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
    ) -> ImageArtifact:
        output_artifact = self.image_generation_driver.run_image_inpainting(
            prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

image_generation_driver = field(kw_only=True) class-attribute instance-attribute

image_loader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

_generate_inpainting(prompt, negative_prompt, image_artifact, mask_artifact)

Source code in griptape/tools/inpainting_image_generation/tool.py
def _generate_inpainting(
    self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
) -> ImageArtifact:
    output_artifact = self.image_generation_driver.run_image_inpainting(
        prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

image_inpainting_from_file(params)

Source code in griptape/tools/inpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image within a specified mask area using image and mask files.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
                Literal("mask_file", description="The path to mask image file."): str,
            },
        ),
    },
)
def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_file = params["values"]["image_file"]
    mask_file = params["values"]["mask_file"]

    input_artifact = self.image_loader.load(image_file)
    mask_artifact = self.image_loader.load(mask_file)

    return self._generate_inpainting(
        prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact)
    )

image_inpainting_from_memory(params)

Source code in griptape/tools/inpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                "memory_name": str,
                "image_artifact_namespace": str,
                "image_artifact_name": str,
                "mask_artifact_namespace": str,
                "mask_artifact_name": str,
            },
        ),
    },
)
def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_artifact_namespace = params["values"]["image_artifact_namespace"]
    image_artifact_name = params["values"]["image_artifact_name"]
    mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
    mask_artifact_name = params["values"]["mask_artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(
            memory,
            image_artifact_namespace,
            image_artifact_name,
            ImageArtifact,
        )
        mask_artifact = load_artifact_from_memory(
            memory,
            mask_artifact_namespace,
            mask_artifact_name,
            ImageArtifact,
        )
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_inpainting(
        prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
    )

OutpaintingImageGenerationTool

Bases: BaseImageGenerationTool

A tool that can be used to generate prompted outpaintings of an image.

Attributes:

Name Type Description
image_generation_driver BaseImageGenerationDriver

The image generation driver used to generate the image.

output_dir Optional[str]

If provided, the generated image will be written to disk in output_dir.

output_file Optional[str]

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/outpainting_image_generation/tool.py
@define
class OutpaintingImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted outpaintings of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Modifies an image outside a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                },
            ),
        },
    )
    def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact)

    @activity(
        config={
            "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "mask_artifact_namespace": str,
                },
            ),
        },
    )
    def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_namespace,
                image_artifact_name,
                ImageArtifact,
            )
            mask_artifact = load_artifact_from_memory(
                memory,
                mask_artifact_namespace,
                mask_artifact_name,
                ImageArtifact,
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_outpainting(
            prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
        )

    def _generate_outpainting(
        self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
    ) -> ImageArtifact | ErrorArtifact:
        output_artifact = self.image_generation_driver.run_image_outpainting(
            prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

image_generation_driver = field(kw_only=True) class-attribute instance-attribute

image_loader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

_generate_outpainting(prompt, negative_prompt, image_artifact, mask_artifact)

Source code in griptape/tools/outpainting_image_generation/tool.py
def _generate_outpainting(
    self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
) -> ImageArtifact | ErrorArtifact:
    output_artifact = self.image_generation_driver.run_image_outpainting(
        prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

image_outpainting_from_file(params)

Source code in griptape/tools/outpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image outside a specified mask area using image and mask files.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
                Literal("mask_file", description="The path to mask image file."): str,
            },
        ),
    },
)
def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_file = params["values"]["image_file"]
    mask_file = params["values"]["mask_file"]

    input_artifact = self.image_loader.load(image_file)
    mask_artifact = self.image_loader.load(mask_file)

    return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact)

image_outpainting_from_memory(params)

Source code in griptape/tools/outpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                "memory_name": str,
                "image_artifact_namespace": str,
                "mask_artifact_namespace": str,
            },
        ),
    },
)
def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_artifact_namespace = params["values"]["image_artifact_namespace"]
    image_artifact_name = params["values"]["image_artifact_name"]
    mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
    mask_artifact_name = params["values"]["mask_artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(
            memory,
            image_artifact_namespace,
            image_artifact_name,
            ImageArtifact,
        )
        mask_artifact = load_artifact_from_memory(
            memory,
            mask_artifact_namespace,
            mask_artifact_name,
            ImageArtifact,
        )
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_outpainting(
        prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
    )

PromptImageGenerationTool

Bases: BaseImageGenerationTool

A tool that can be used to generate an image from a text prompt.

Attributes:

Name Type Description
image_generation_driver BaseImageGenerationDriver

The image generation driver used to generate the image.

output_dir Optional[str]

If provided, the generated image will be written to disk in output_dir.

output_file Optional[str]

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/prompt_image_generation/tool.py
@define
class PromptImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate an image from a text prompt.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Generates an image from text prompts.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                }
            ),
        },
    )
    def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]

        output_artifact = self.image_generation_driver.run_text_to_image(
            prompts=[prompt], negative_prompts=[negative_prompt]
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

image_generation_driver = field(kw_only=True) class-attribute instance-attribute

generate_image(params)

Source code in griptape/tools/prompt_image_generation/tool.py
@activity(
    config={
        "description": "Generates an image from text prompts.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
            }
        ),
    },
)
def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]

    output_artifact = self.image_generation_driver.run_text_to_image(
        prompts=[prompt], negative_prompts=[negative_prompt]
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

PromptSummaryTool

Bases: BaseTool, RuleMixin

Tool for using a Prompt Summary Engine.

Attributes:

Name Type Description
prompt_summary_engine PromptSummaryEngine

PromptSummaryEngine.

Source code in griptape/tools/prompt_summary/tool.py
@define(kw_only=True)
class PromptSummaryTool(BaseTool, RuleMixin):
    """Tool for using a Prompt Summary Engine.

    Attributes:
        prompt_summary_engine: `PromptSummaryEngine`.
    """

    prompt_summary_engine: PromptSummaryEngine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine()))

    @activity(
        config={
            "description": "Can be used to summarize text content.",
            "schema": Schema(
                {
                    Literal("summary"): Or(
                        str,
                        Schema(
                            {
                                "memory_name": str,
                                "artifact_namespace": str,
                            }
                        ),
                    ),
                }
            ),
        },
    )
    def summarize(self, params: dict) -> BaseArtifact:
        summary = params["values"]["summary"]

        if isinstance(summary, str):
            artifacts = ListArtifact([TextArtifact(summary)])
        else:
            memory = self.find_input_memory(summary["memory_name"])
            artifact_namespace = summary["artifact_namespace"]

            if memory is not None:
                artifacts = memory.load_artifacts(artifact_namespace)
            else:
                return ErrorArtifact("memory not found")

        return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)

prompt_summary_engine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine())) class-attribute instance-attribute

summarize(params)

Source code in griptape/tools/prompt_summary/tool.py
@activity(
    config={
        "description": "Can be used to summarize text content.",
        "schema": Schema(
            {
                Literal("summary"): Or(
                    str,
                    Schema(
                        {
                            "memory_name": str,
                            "artifact_namespace": str,
                        }
                    ),
                ),
            }
        ),
    },
)
def summarize(self, params: dict) -> BaseArtifact:
    summary = params["values"]["summary"]

    if isinstance(summary, str):
        artifacts = ListArtifact([TextArtifact(summary)])
    else:
        memory = self.find_input_memory(summary["memory_name"])
        artifact_namespace = summary["artifact_namespace"]

        if memory is not None:
            artifacts = memory.load_artifacts(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

    return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)

QueryTool

Bases: BaseTool, RuleMixin

Tool for performing a query against data.

Source code in griptape/tools/query/tool.py
@define(kw_only=True)
class QueryTool(BaseTool, RuleMixin):
    """Tool for performing a query against data."""

    prompt_driver: BasePromptDriver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver))

    _rag_engine: RagEngine = field(
        default=Factory(
            lambda self: RagEngine(
                response_stage=ResponseRagStage(
                    response_modules=[
                        PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets)
                    ],
                ),
            ),
            takes_self=True,
        ),
        alias="_rag_engine",
    )

    @activity(
        config={
            "description": "Can be used to search through textual content.",
            "schema": Schema(
                {
                    Literal("query", description="A natural language search query"): str,
                    Literal("content"): Or(
                        str,
                        Schema(
                            {
                                "memory_name": str,
                                "artifact_namespace": str,
                            }
                        ),
                    ),
                }
            ),
        },
    )
    def query(self, params: dict) -> ListArtifact | ErrorArtifact:
        query = params["values"]["query"]
        content = params["values"]["content"]

        if isinstance(content, str):
            text_artifacts = [TextArtifact(content)]
        else:
            memory = self.find_input_memory(content["memory_name"])
            artifact_namespace = content["artifact_namespace"]

            if memory is not None:
                artifacts = memory.load_artifacts(artifact_namespace)
            else:
                return ErrorArtifact("memory not found")

            text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)]

        outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs

        if len(outputs) > 0:
            return ListArtifact(outputs)
        return ErrorArtifact("query output is empty")

_rag_engine = field(default=Factory(lambda self: RagEngine(response_stage=ResponseRagStage(response_modules=[PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets)])), takes_self=True), alias='_rag_engine') class-attribute instance-attribute

prompt_driver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver)) class-attribute instance-attribute

query(params)

Source code in griptape/tools/query/tool.py
@activity(
    config={
        "description": "Can be used to search through textual content.",
        "schema": Schema(
            {
                Literal("query", description="A natural language search query"): str,
                Literal("content"): Or(
                    str,
                    Schema(
                        {
                            "memory_name": str,
                            "artifact_namespace": str,
                        }
                    ),
                ),
            }
        ),
    },
)
def query(self, params: dict) -> ListArtifact | ErrorArtifact:
    query = params["values"]["query"]
    content = params["values"]["content"]

    if isinstance(content, str):
        text_artifacts = [TextArtifact(content)]
    else:
        memory = self.find_input_memory(content["memory_name"])
        artifact_namespace = content["artifact_namespace"]

        if memory is not None:
            artifacts = memory.load_artifacts(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

        text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)]

    outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs

    if len(outputs) > 0:
        return ListArtifact(outputs)
    return ErrorArtifact("query output is empty")

RagTool

Bases: BaseTool

Tool for querying a RAG engine.

Attributes:

Name Type Description
description str

LLM-friendly RAG engine description.

rag_engine RagEngine

RagEngine.

Source code in griptape/tools/rag/tool.py
@define(kw_only=True)
class RagTool(BaseTool):
    """Tool for querying a RAG engine.

    Attributes:
        description: LLM-friendly RAG engine description.
        rag_engine: `RagEngine`.
    """

    description: str = field()
    rag_engine: RagEngine = field()

    @activity(
        config={
            "description": "{{ _self.description }}",
            "schema": Schema({Literal("query", description="A natural language search query"): str}),
        },
    )
    def search(self, params: dict) -> ListArtifact | ErrorArtifact:
        query = params["values"]["query"]

        try:
            artifacts = self.rag_engine.process_query(query).outputs

            outputs = []
            for artifact in artifacts:
                if isinstance(artifact, ListArtifact):
                    outputs.extend(artifact.value)
                else:
                    outputs.append(artifact)

            if len(outputs) > 0:
                return ListArtifact(outputs)
            return ErrorArtifact("query output is empty")

        except Exception as e:
            return ErrorArtifact(f"error querying: {e}")

description = field() class-attribute instance-attribute

rag_engine = field() class-attribute instance-attribute

search(params)

Source code in griptape/tools/rag/tool.py
@activity(
    config={
        "description": "{{ _self.description }}",
        "schema": Schema({Literal("query", description="A natural language search query"): str}),
    },
)
def search(self, params: dict) -> ListArtifact | ErrorArtifact:
    query = params["values"]["query"]

    try:
        artifacts = self.rag_engine.process_query(query).outputs

        outputs = []
        for artifact in artifacts:
            if isinstance(artifact, ListArtifact):
                outputs.extend(artifact.value)
            else:
                outputs.append(artifact)

        if len(outputs) > 0:
            return ListArtifact(outputs)
        return ErrorArtifact("query output is empty")

    except Exception as e:
        return ErrorArtifact(f"error querying: {e}")

RestApiTool

Bases: BaseTool

A tool for making REST API requests.

Attributes:

Name Type Description
base_url str

The base url that will be used for the request.

path Optional[str]

The resource path that will be appended to base_url.

description str

A description of what the REST API does.

request_body_schema Optional[str]

A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests.

request_query_params_schema Optional[str]

A JSON schema string describing the available query parameters.

request_path_params_schema Optional[str]

A JSON schema string describing the available path parameters. The schema must describe an array of string values.

response_body_schema Optional[str]

A JSON schema string describing the response body.

request_headers Optional[dict[str, str]]

Headers to include in the requests.

Source code in griptape/tools/rest_api/tool.py
@define
class RestApiTool(BaseTool):
    """A tool for making REST API requests.

    Attributes:
        base_url: The base url that will be used for the request.
        path: The resource path that will be appended to base_url.
        description: A description of what the REST API does.
        request_body_schema: A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests.
        request_query_params_schema: A JSON schema string describing the available query parameters.
        request_path_params_schema: A JSON schema string describing the available path parameters. The schema must describe an array of string values.
        response_body_schema: A JSON schema string describing the response body.
        request_headers: Headers to include in the requests.
    """

    base_url: str = field(kw_only=True)
    path: Optional[str] = field(default=None, kw_only=True)
    description: str = field(kw_only=True)
    request_path_params_schema: Optional[str] = field(default=None, kw_only=True)
    request_query_params_schema: Optional[str] = field(default=None, kw_only=True)
    request_body_schema: Optional[str] = field(default=None, kw_only=True)
    response_body_schema: Optional[str] = field(default=None, kw_only=True)
    request_headers: Optional[dict[str, str]] = field(default=None, kw_only=True)

    @property
    def full_url(self) -> str:
        return self._build_url(self.base_url, path=self.path)

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a put request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": Schema({Literal("body", description="The request body."): dict}),
        },
    )
    def put(self, params: dict) -> BaseArtifact:
        from requests import exceptions, put

        values = params["values"]
        base_url = self.base_url
        path = self.path
        body = values["body"]
        url = self._build_url(base_url, path=path)

        try:
            response = put(url, json=body, timeout=30, headers=self.request_headers)

            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a patch request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": Schema(
                {
                    Literal("path_params", description="The request path parameters."): Schema([str]),
                    Literal("body", description="The request body."): dict,
                },
            ),
        },
    )
    def patch(self, params: dict) -> BaseArtifact:
        from requests import exceptions, patch

        values = params["values"]
        base_url = self.base_url
        path = self.path
        body = values["body"]
        path_params = values["path_params"]
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = patch(url, json=body, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a post request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": Schema({Literal("body", description="The request body."): dict}),
        },
    )
    def post(self, params: dict) -> BaseArtifact:
        from requests import exceptions, post

        values = params["values"]
        base_url = self.base_url
        path = self.path
        url = self._build_url(base_url, path=path)
        body = values["body"]

        try:
            response = post(url, json=body, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a get request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": schema.Optional(
                Schema(
                    {
                        schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                        schema.Optional(Literal("path_params", description="The request path parameters.")): Schema(
                            [str]
                        ),
                    },
                ),
            ),
        },
    )
    def get(self, params: dict) -> BaseArtifact:
        from requests import exceptions, get

        values = params["values"]
        base_url = self.base_url
        path = self.path

        query_params = {}
        path_params = []
        if values:
            query_params = values.get("query_params", {})
            path_params = values.get("path_params", [])
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = get(url, params=query_params, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a delete request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                """,
            ),
            "schema": Schema(
                {
                    schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                    schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]),
                },
            ),
        },
    )
    def delete(self, params: dict) -> BaseArtifact:
        from requests import delete, exceptions

        values = params["values"]
        base_url = self.base_url
        path = self.path

        query_params = values.get("query_params", {})
        path_params = values.get("path_params", [])
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = delete(url, params=query_params, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str:
        url = ""

        if path:
            url += path.strip("/")

        if path_params:
            url += f"/{str.join('/', map(str, path_params))}"

        return urljoin(base_url.strip("/"), url)

base_url = field(kw_only=True) class-attribute instance-attribute

description = field(kw_only=True) class-attribute instance-attribute

full_url property

path = field(default=None, kw_only=True) class-attribute instance-attribute

request_body_schema = field(default=None, kw_only=True) class-attribute instance-attribute

request_headers = field(default=None, kw_only=True) class-attribute instance-attribute

request_path_params_schema = field(default=None, kw_only=True) class-attribute instance-attribute

request_query_params_schema = field(default=None, kw_only=True) class-attribute instance-attribute

response_body_schema = field(default=None, kw_only=True) class-attribute instance-attribute

_build_url(base_url, path=None, path_params=None)

Source code in griptape/tools/rest_api/tool.py
def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str:
    url = ""

    if path:
        url += path.strip("/")

    if path_params:
        url += f"/{str.join('/', map(str, path_params))}"

    return urljoin(base_url.strip("/"), url)

delete(params)

Source code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a delete request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            """,
        ),
        "schema": Schema(
            {
                schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]),
            },
        ),
    },
)
def delete(self, params: dict) -> BaseArtifact:
    from requests import delete, exceptions

    values = params["values"]
    base_url = self.base_url
    path = self.path

    query_params = values.get("query_params", {})
    path_params = values.get("path_params", [])
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = delete(url, params=query_params, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

get(params)

Source code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a get request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": schema.Optional(
            Schema(
                {
                    schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                    schema.Optional(Literal("path_params", description="The request path parameters.")): Schema(
                        [str]
                    ),
                },
            ),
        ),
    },
)
def get(self, params: dict) -> BaseArtifact:
    from requests import exceptions, get

    values = params["values"]
    base_url = self.base_url
    path = self.path

    query_params = {}
    path_params = []
    if values:
        query_params = values.get("query_params", {})
        path_params = values.get("path_params", [])
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = get(url, params=query_params, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

patch(params)

Source code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a patch request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": Schema(
            {
                Literal("path_params", description="The request path parameters."): Schema([str]),
                Literal("body", description="The request body."): dict,
            },
        ),
    },
)
def patch(self, params: dict) -> BaseArtifact:
    from requests import exceptions, patch

    values = params["values"]
    base_url = self.base_url
    path = self.path
    body = values["body"]
    path_params = values["path_params"]
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = patch(url, json=body, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

post(params)

Source code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a post request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": Schema({Literal("body", description="The request body."): dict}),
    },
)
def post(self, params: dict) -> BaseArtifact:
    from requests import exceptions, post

    values = params["values"]
    base_url = self.base_url
    path = self.path
    url = self._build_url(base_url, path=path)
    body = values["body"]

    try:
        response = post(url, json=body, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

put(params)

Source code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a put request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": Schema({Literal("body", description="The request body."): dict}),
    },
)
def put(self, params: dict) -> BaseArtifact:
    from requests import exceptions, put

    values = params["values"]
    base_url = self.base_url
    path = self.path
    body = values["body"]
    url = self._build_url(base_url, path=path)

    try:
        response = put(url, json=body, timeout=30, headers=self.request_headers)

        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

SqlTool

Bases: BaseTool

Source code in griptape/tools/sql/tool.py
@define
class SqlTool(BaseTool):
    sql_loader: SqlLoader = field(kw_only=True)
    schema_name: Optional[str] = field(default=None, kw_only=True)
    table_name: str = field(kw_only=True)
    table_description: Optional[str] = field(default=None, kw_only=True)
    engine_name: Optional[str] = field(default=None, kw_only=True)

    @property
    def full_table_name(self) -> str:
        return f"{self.schema_name}.{self.table_name}" if self.schema_name else self.table_name

    @property
    def table_schema(self) -> Optional[str]:
        return self.sql_loader.sql_driver.get_table_schema(self.full_table_name, schema=self.schema_name)

    @activity(
        config={
            "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries "
            "in table {{ _self.full_table_name }}. "
            "Make sure the `SELECT` statement contains enough columns to get an answer without knowing "
            "the original question. "
            "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions "
            "to get better results. "
            "You can use JOINs if more tables are available in other tools.\n"
            "{{ _self.table_name }} schema: {{ _self.table_schema }}\n"
            "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}",
            "schema": Schema({"sql_query": str}),
        },
    )
    def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
        try:
            query = params["values"]["sql_query"]
            rows = self.sql_loader.load(query)
        except Exception as e:
            return ErrorArtifact(f"error executing query: {e}")

        if len(rows) > 0:
            return rows
        return InfoArtifact("No results found")

engine_name = field(default=None, kw_only=True) class-attribute instance-attribute

full_table_name property

schema_name = field(default=None, kw_only=True) class-attribute instance-attribute

sql_loader = field(kw_only=True) class-attribute instance-attribute

table_description = field(default=None, kw_only=True) class-attribute instance-attribute

table_name = field(kw_only=True) class-attribute instance-attribute

table_schema property

execute_query(params)

Source code in griptape/tools/sql/tool.py
@activity(
    config={
        "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries "
        "in table {{ _self.full_table_name }}. "
        "Make sure the `SELECT` statement contains enough columns to get an answer without knowing "
        "the original question. "
        "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions "
        "to get better results. "
        "You can use JOINs if more tables are available in other tools.\n"
        "{{ _self.table_name }} schema: {{ _self.table_schema }}\n"
        "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}",
        "schema": Schema({"sql_query": str}),
    },
)
def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
    try:
        query = params["values"]["sql_query"]
        rows = self.sql_loader.load(query)
    except Exception as e:
        return ErrorArtifact(f"error executing query: {e}")

    if len(rows) > 0:
        return rows
    return InfoArtifact("No results found")

StructureRunTool

Bases: BaseTool

Tool for running a Structure.

Attributes:

Name Type Description
description str

A description of what the Structure does.

structure_run_driver BaseStructureRunDriver

Driver to run the Structure.

Source code in griptape/tools/structure_run/tool.py
@define
class StructureRunTool(BaseTool):
    """Tool for running a Structure.

    Attributes:
        description: A description of what the Structure does.
        structure_run_driver: Driver to run the Structure.
    """

    description: str = field(kw_only=True)
    structure_run_driver: BaseStructureRunDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to run a Griptape Structure with the following description: {{ _self.description }}",
            "schema": Schema(
                {
                    Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema(
                        [str]
                    )
                },
            ),
        },
    )
    def run_structure(self, params: dict) -> BaseArtifact:
        args: list[str] = params["values"]["args"]

        return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])

description = field(kw_only=True) class-attribute instance-attribute

structure_run_driver = field(kw_only=True) class-attribute instance-attribute

run_structure(params)

Source code in griptape/tools/structure_run/tool.py
@activity(
    config={
        "description": "Can be used to run a Griptape Structure with the following description: {{ _self.description }}",
        "schema": Schema(
            {
                Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema(
                    [str]
                )
            },
        ),
    },
)
def run_structure(self, params: dict) -> BaseArtifact:
    args: list[str] = params["values"]["args"]

    return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])

StructuredOutputTool

Bases: BaseTool

Source code in griptape/tools/structured_output/tool.py
@define
class StructuredOutputTool(BaseTool):
    output_schema: Union[Schema, type[BaseModel]] = field(kw_only=True)

    @activity(
        config={
            "description": "Used to provide the final response which ends this conversation.",
            "schema": lambda _self: _self.output_schema,
        }
    )
    def provide_output(self, params: dict) -> BaseArtifact:
        return JsonArtifact(params["values"])

output_schema = field(kw_only=True) class-attribute instance-attribute

provide_output(params)

Source code in griptape/tools/structured_output/tool.py
@activity(
    config={
        "description": "Used to provide the final response which ends this conversation.",
        "schema": lambda _self: _self.output_schema,
    }
)
def provide_output(self, params: dict) -> BaseArtifact:
    return JsonArtifact(params["values"])

TextToSpeechTool

Bases: ArtifactFileOutputMixin, BaseTool

A tool that can be used to generate speech from input text.

Attributes:

Name Type Description
text_to_speech_driver BaseTextToSpeechDriver

The text to audio generation driver used to generate the speech audio.

output_dir Optional[str]

If provided, the generated audio will be written to disk in output_dir.

output_file Optional[str]

If provided, the generated audio will be written to disk as output_file.

Source code in griptape/tools/text_to_speech/tool.py
@define
class TextToSpeechTool(ArtifactFileOutputMixin, BaseTool):
    """A tool that can be used to generate speech from input text.

    Attributes:
        text_to_speech_driver: The text to audio generation driver used to generate the speech audio.
        output_dir: If provided, the generated audio will be written to disk in output_dir.
        output_file: If provided, the generated audio will be written to disk as output_file.
    """

    text_to_speech_driver: BaseTextToSpeechDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to generate speech from the provided input text.",
            "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}),
        },
    )
    def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact:
        text = params["values"]["text"]

        output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text])

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

text_to_speech_driver = field(kw_only=True) class-attribute instance-attribute

text_to_speech(params)

Source code in griptape/tools/text_to_speech/tool.py
@activity(
    config={
        "description": "Can be used to generate speech from the provided input text.",
        "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}),
    },
)
def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact:
    text = params["values"]["text"]

    output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text])

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

VariationImageGenerationTool

Bases: BaseImageGenerationTool

A tool that can be used to generate prompted variations of an image.

Attributes:

Name Type Description
image_generation_driver BaseImageGenerationDriver

The image generation driver used to generate the image.

output_dir Optional[str]

If provided, the generated image will be written to disk in output_dir.

output_file Optional[str]

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/variation_image_generation/tool.py
@define
class VariationImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted variations of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Generates a variation of a given input image file.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                },
            ),
        },
    )
    def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]

        image_artifact = self.image_loader.load(image_file)

        return self._generate_variation(prompt, negative_prompt, image_artifact)

    @activity(
        config={
            "description": "Generates a variation of a given input image artifact in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                    "artifact_name": str,
                },
            ),
        },
    )
    def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        artifact_namespace = params["values"]["artifact_namespace"]
        artifact_name = params["values"]["artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact)
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact))

    def _generate_variation(
        self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact
    ) -> ImageArtifact | ErrorArtifact:
        output_artifact = self.image_generation_driver.run_image_variation(
            prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

image_generation_driver = field(kw_only=True) class-attribute instance-attribute

image_loader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

_generate_variation(prompt, negative_prompt, image_artifact)

Source code in griptape/tools/variation_image_generation/tool.py
def _generate_variation(
    self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact
) -> ImageArtifact | ErrorArtifact:
    output_artifact = self.image_generation_driver.run_image_variation(
        prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

image_variation_from_file(params)

Source code in griptape/tools/variation_image_generation/tool.py
@activity(
    config={
        "description": "Generates a variation of a given input image file.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
            },
        ),
    },
)
def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_file = params["values"]["image_file"]

    image_artifact = self.image_loader.load(image_file)

    return self._generate_variation(prompt, negative_prompt, image_artifact)

image_variation_from_memory(params)

Source code in griptape/tools/variation_image_generation/tool.py
@activity(
    config={
        "description": "Generates a variation of a given input image artifact in memory.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                "memory_name": str,
                "artifact_namespace": str,
                "artifact_name": str,
            },
        ),
    },
)
def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    artifact_namespace = params["values"]["artifact_namespace"]
    artifact_name = params["values"]["artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact)
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact))

VectorStoreTool

Bases: BaseTool

A tool for querying a vector database.

Attributes:

Name Type Description
description str

LLM-friendly vector DB description.

vector_store_driver BaseVectorStoreDriver

BaseVectorStoreDriver.

query_params dict[str, Any]

Optional dictionary of vector store driver query parameters.

process_query_output Callable[[list[Entry]], BaseArtifact]

Optional lambda for processing vector store driver query output Entrys.

Source code in griptape/tools/vector_store/tool.py
@define(kw_only=True)
class VectorStoreTool(BaseTool):
    """A tool for querying a vector database.

    Attributes:
        description: LLM-friendly vector DB description.
        vector_store_driver: `BaseVectorStoreDriver`.
        query_params: Optional dictionary of vector store driver query parameters.
        process_query_output: Optional lambda for processing vector store driver query output `Entry`s.
    """

    DEFAULT_TOP_N = 5

    description: str = field()
    vector_store_driver: BaseVectorStoreDriver = field()
    query_params: dict[str, Any] = field(factory=dict)
    process_query_output: Callable[[list[BaseVectorStoreDriver.Entry]], BaseArtifact] = field(
        default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es])),
    )

    @activity(
        config={
            "description": "Can be used to search a database with the following description: {{ _self.description }}",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A natural language search query to run against the vector database",
                    ): str,
                },
            ),
        },
    )
    def search(self, params: dict) -> BaseArtifact:
        query = params["values"]["query"]

        try:
            return self.process_query_output(self.vector_store_driver.query(query, **self.query_params))
        except Exception as e:
            return ErrorArtifact(f"error querying vector store: {e}")

DEFAULT_TOP_N = 5 class-attribute instance-attribute

description = field() class-attribute instance-attribute

process_query_output = field(default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es]))) class-attribute instance-attribute

query_params = field(factory=dict) class-attribute instance-attribute

vector_store_driver = field() class-attribute instance-attribute

search(params)

Source code in griptape/tools/vector_store/tool.py
@activity(
    config={
        "description": "Can be used to search a database with the following description: {{ _self.description }}",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A natural language search query to run against the vector database",
                ): str,
            },
        ),
    },
)
def search(self, params: dict) -> BaseArtifact:
    query = params["values"]["query"]

    try:
        return self.process_query_output(self.vector_store_driver.query(query, **self.query_params))
    except Exception as e:
        return ErrorArtifact(f"error querying vector store: {e}")

WebScraperTool

Bases: BaseTool

Source code in griptape/tools/web_scraper/tool.py
@define
class WebScraperTool(BaseTool):
    web_loader: WebLoader = field(default=Factory(lambda: WebLoader()), kw_only=True)
    text_chunker: TextChunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True)

    @activity(
        config={
            "description": "Can be used to browse a web page and load its content",
            "schema": Schema({Literal("url", description="Valid HTTP URL"): str}),
        },
    )
    def get_content(self, params: dict) -> ListArtifact | ErrorArtifact:
        url = params["values"]["url"]

        try:
            result = self.web_loader.load(url)
            chunks = self.text_chunker.chunk(result)

            return ListArtifact(chunks)
        except Exception as e:
            return ErrorArtifact("Error getting page content: " + str(e))

text_chunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True) class-attribute instance-attribute

web_loader = field(default=Factory(lambda: WebLoader()), kw_only=True) class-attribute instance-attribute

get_content(params)

Source code in griptape/tools/web_scraper/tool.py
@activity(
    config={
        "description": "Can be used to browse a web page and load its content",
        "schema": Schema({Literal("url", description="Valid HTTP URL"): str}),
    },
)
def get_content(self, params: dict) -> ListArtifact | ErrorArtifact:
    url = params["values"]["url"]

    try:
        result = self.web_loader.load(url)
        chunks = self.text_chunker.chunk(result)

        return ListArtifact(chunks)
    except Exception as e:
        return ErrorArtifact("Error getting page content: " + str(e))

WebSearchTool

Bases: BaseTool

Source code in griptape/tools/web_search/tool.py
@define
class WebSearchTool(BaseTool):
    web_search_driver: BaseWebSearchDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="Search engine request that returns a list of pages with titles, descriptions, and URLs",
                    ): str,
                },
            ),
        },
    )
    def search(self, values: dict) -> ListArtifact | ErrorArtifact:
        # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values
        # to avoid passing it twice.
        query = values.pop("query")

        try:
            return self.web_search_driver.search(query, **values)
        except Exception as e:
            return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")

web_search_driver = field(kw_only=True) class-attribute instance-attribute

search(values)

Source code in griptape/tools/web_search/tool.py
@activity(
    config={
        "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="Search engine request that returns a list of pages with titles, descriptions, and URLs",
                ): str,
            },
        ),
    },
)
def search(self, values: dict) -> ListArtifact | ErrorArtifact:
    # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values
    # to avoid passing it twice.
    query = values.pop("query")

    try:
        return self.web_search_driver.search(query, **values)
    except Exception as e:
        return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")