Skip to content

tools

__all__ = ['BaseTool', 'BaseImageGenerationTool', 'CalculatorTool', 'WebSearchTool', 'WebScraperTool', 'SqlTool', 'EmailTool', 'RestApiTool', 'FileManagerTool', 'VectorStoreTool', 'DateTimeTool', 'ComputerTool', 'PromptImageGenerationTool', 'VariationImageGenerationTool', 'InpaintingImageGenerationTool', 'OutpaintingImageGenerationTool', 'GriptapeCloudToolTool', 'StructureRunTool', 'ImageQueryTool', 'RagTool', 'TextToSpeechTool', 'AudioTranscriptionTool', 'ExtractionTool', 'PromptSummaryTool', 'QueryTool', 'StructuredOutputTool'] module-attribute

AudioTranscriptionTool

Bases: BaseTool

A tool that can be used to generate transcriptions from input audio.

Source code in griptape/tools/audio_transcription/tool.py
@define
class AudioTranscriptionTool(BaseTool):
    """A tool that can be used to generate transcriptions from input audio."""

    audio_transcription_driver: BaseAudioTranscriptionDriver = field(kw_only=True)
    audio_loader: AudioLoader = field(default=Factory(lambda: AudioLoader()), kw_only=True)

    @activity(
        config={
            "description": "This tool can be used to generate transcriptions of audio files on disk.",
            "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}),
        },
    )
    def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        audio_path = params["values"]["path"]

        audio_artifact = self.audio_loader.load(audio_path)

        return self.audio_transcription_driver.run(audio_artifact)

    @activity(
        config={
            "description": "This tool can be used to generate the transcription of an audio artifact in memory.",
            "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}),
        },
    )
    def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        artifact_name = params["values"]["artifact_name"]

        if memory is None:
            return ErrorArtifact("memory not found")

        audio_artifact = cast(
            AudioArtifact,
            load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact),
        )

        return self.audio_transcription_driver.run(audio_artifact)

audio_loader: AudioLoader = field(default=Factory(lambda: AudioLoader()), kw_only=True) class-attribute instance-attribute

audio_transcription_driver: BaseAudioTranscriptionDriver = field(kw_only=True) class-attribute instance-attribute

transcribe_audio_from_disk(params)

Source code in griptape/tools/audio_transcription/tool.py
@activity(
    config={
        "description": "This tool can be used to generate transcriptions of audio files on disk.",
        "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}),
    },
)
def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    audio_path = params["values"]["path"]

    audio_artifact = self.audio_loader.load(audio_path)

    return self.audio_transcription_driver.run(audio_artifact)

transcribe_audio_from_memory(params)

Source code in griptape/tools/audio_transcription/tool.py
@activity(
    config={
        "description": "This tool can be used to generate the transcription of an audio artifact in memory.",
        "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}),
    },
)
def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    artifact_name = params["values"]["artifact_name"]

    if memory is None:
        return ErrorArtifact("memory not found")

    audio_artifact = cast(
        AudioArtifact,
        load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact),
    )

    return self.audio_transcription_driver.run(audio_artifact)

BaseImageGenerationTool

Bases: ArtifactFileOutputMixin, BaseTool

A base class for tools that generate images from text prompts.

Source code in griptape/tools/base_image_generation_tool.py
@define
class BaseImageGenerationTool(ArtifactFileOutputMixin, BaseTool):
    """A base class for tools that generate images from text prompts."""

    PROMPT_DESCRIPTION = "Features and qualities to include in the generated image, descriptive and succinct."
    NEGATIVE_PROMPT_DESCRIPTION = (
        "Features and qualities to avoid in the generated image. Affirmatively describe "
        "what to avoid, for example: to avoid the color red, include 'red' "
        "rather than 'no red'."
    )

NEGATIVE_PROMPT_DESCRIPTION = "Features and qualities to avoid in the generated image. Affirmatively describe what to avoid, for example: to avoid the color red, include 'red' rather than 'no red'." class-attribute instance-attribute

PROMPT_DESCRIPTION = 'Features and qualities to include in the generated image, descriptive and succinct.' class-attribute instance-attribute

BaseTool

Bases: ActivityMixin, SerializableMixin, RunnableMixin['BaseTool'], ABC

Abstract class for all tools to inherit from for.

Attributes:

Name Type Description
name str

Tool name.

input_memory Optional[list[TaskMemory]]

TaskMemory available in tool activities. Gets automatically set if None.

output_memory Optional[dict[str, list[TaskMemory]]]

TaskMemory that activities write to be default. Gets automatically set if None.

install_dependencies_on_init bool

Determines whether dependencies from the tool requirements.txt file are installed in init.

dependencies_install_directory Optional[str]

Custom dependency install directory.

verbose bool

Determines whether tool operations (such as dependency installation) should be verbose.

off_prompt bool

Determines whether tool activity output goes to the output memory.

Source code in griptape/tools/base_tool.py
@define
class BaseTool(ActivityMixin, SerializableMixin, RunnableMixin["BaseTool"], ABC):
    """Abstract class for all tools to inherit from for.

    Attributes:
        name: Tool name.
        input_memory: TaskMemory available in tool activities. Gets automatically set if None.
        output_memory: TaskMemory that activities write to be default. Gets automatically set if None.
        install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init.
        dependencies_install_directory: Custom dependency install directory.
        verbose: Determines whether tool operations (such as dependency installation) should be verbose.
        off_prompt: Determines whether tool activity output goes to the output memory.
    """

    REQUIREMENTS_FILE = "requirements.txt"

    name: str = field(
        default=Factory(lambda self: self.__class__.__name__, takes_self=True),
        kw_only=True,
        metadata={"serializable": True},
    )
    input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True, metadata={"serializable": True})
    output_memory: Optional[dict[str, list[TaskMemory]]] = field(
        default=None, kw_only=True, metadata={"serializable": True}
    )
    install_dependencies_on_init: bool = field(default=True, kw_only=True, metadata={"serializable": True})
    dependencies_install_directory: Optional[str] = field(default=None, kw_only=True, metadata={"serializable": True})
    verbose: bool = field(default=False, kw_only=True, metadata={"serializable": True})
    off_prompt: bool = field(default=False, kw_only=True, metadata={"serializable": True})

    def __attrs_post_init__(self) -> None:
        if (
            self.install_dependencies_on_init
            and self.has_requirements
            and not self.are_requirements_met(self.requirements_path)
        ):
            self.install_dependencies(os.environ.copy())

    @output_memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
    def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
        if output_memory:
            for activity_name, memory_list in output_memory.items():
                if not self.find_activity(activity_name):
                    raise ValueError(f"activity {activity_name} doesn't exist")
                if memory_list is None:
                    raise ValueError(f"memory list for activity '{activity_name}' can't be None")

                output_memory_names = [memory.name for memory in memory_list]

                if len(output_memory_names) > len(set(output_memory_names)):
                    raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

    @property
    def requirements_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE)

    @property
    def abs_file_path(self) -> str:
        return os.path.abspath(inspect.getfile(self.__class__))

    @property
    def abs_dir_path(self) -> str:
        return os.path.dirname(self.abs_file_path)

    @property
    def has_requirements(self) -> bool:
        return os.path.exists(self.requirements_path)

    # This method has to remain a method and can't be decorated with @property because
    # of the max depth recursion issue in `self.activities`.
    def schema(self) -> dict:
        full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

        return full_schema.json_schema(f"{self.name} ToolAction Schema")

    def activity_schemas(self) -> list[Schema]:
        schemas = []

        for activity in self.activities():
            schema_dict: dict[Literal | schema.Optional, Any] = {
                Literal("name"): self.name,
                Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
            }

            activity_schema = self.activity_schema(activity)
            # If no schema is defined, we just make `input` optional instead of omitting it.
            # This works better with lower-end models that may accidentally pass in an empty dict.
            if activity_schema is None:
                schema_dict[schema.Optional("input")] = {}
            else:
                schema_dict[Literal("input")] = activity_schema.schema

            schemas.append(Schema(schema_dict))

        return schemas

    def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact:
        try:
            output = self.before_run(activity, subtask, action)

            output = self.try_run(activity, subtask, action, output)

            output = self.after_run(activity, subtask, action, output)
        except Exception as e:
            logging.error(traceback.format_exc())
            output = ErrorArtifact(str(e), exception=e)

        return output

    def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]:
        super().before_run()

        return action.input

    @observable(tags=["Tool.run()"])
    def try_run(
        self,
        activity: Callable,
        subtask: ActionsSubtask,
        action: ToolAction,
        value: Optional[dict],
    ) -> BaseArtifact:
        activity_result = activity(deepcopy(value))

        if isinstance(activity_result, BaseArtifact):
            result = activity_result
        else:
            logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

            if activity_result is None:
                result = InfoArtifact("Tool returned an empty value")
            else:
                result = InfoArtifact(activity_result)

        return result

    def after_run(
        self,
        activity: Callable,
        subtask: ActionsSubtask,
        action: ToolAction,
        value: BaseArtifact,
    ) -> BaseArtifact:
        super().after_run()

        if self.output_memory:
            output_memories = self.output_memory[getattr(activity, "name")] or []
            for memory in output_memories:
                value = memory.process_output(activity, subtask, value)

            return value
        else:
            return value

    def validate(self) -> bool:
        if not os.path.exists(self.requirements_path):
            raise Exception(f"{self.REQUIREMENTS_FILE} not found")
        return True

    def tool_dir(self) -> str:
        class_file = inspect.getfile(self.__class__)

        return os.path.dirname(os.path.abspath(class_file))

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        env = env or {}

        command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

        if self.dependencies_install_directory is None:
            command.extend(["-U"])
        else:
            command.extend(["-t", self.dependencies_install_directory])

        subprocess.run(
            command,
            env=env,
            cwd=self.tool_dir(),
            stdout=None if self.verbose else subprocess.DEVNULL,
            stderr=None if self.verbose else subprocess.DEVNULL,
        )

    def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
        if self.input_memory:
            return next((m for m in self.input_memory if m.name == memory_name), None)
        else:
            return None

    def to_native_tool_name(self, activity: Callable) -> str:
        """Converts a Tool's name and an Activity into to a native tool name.

        The native tool name is a combination of the Tool's name and the Activity's name.
        The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

        Args:
            activity: Activity to convert

        Returns:
            str: Native tool name.
        """
        tool_name = self.name
        if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None:
            raise ValueError("Tool name can only contain letters and numbers.")

        activity_name = self.activity_name(activity)
        if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None:
            raise ValueError("Activity name can only contain letters, numbers, and underscores.")

        return f"{tool_name}_{activity_name}"

    def are_requirements_met(self, requirements_path: str) -> bool:
        requirements = Path(requirements_path).read_text().splitlines()

        try:
            for requirement in requirements:
                importlib.metadata.version(requirement)
            return True
        except importlib.metadata.PackageNotFoundError:
            return False

REQUIREMENTS_FILE = 'requirements.txt' class-attribute instance-attribute

abs_dir_path: str property

abs_file_path: str property

dependencies_install_directory: Optional[str] = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

has_requirements: bool property

input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

install_dependencies_on_init: bool = field(default=True, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

name: str = field(default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

off_prompt: bool = field(default=False, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

output_memory: Optional[dict[str, list[TaskMemory]]] = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

requirements_path: str property

verbose: bool = field(default=False, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None:
    if (
        self.install_dependencies_on_init
        and self.has_requirements
        and not self.are_requirements_met(self.requirements_path)
    ):
        self.install_dependencies(os.environ.copy())

activity_schemas()

Source code in griptape/tools/base_tool.py
def activity_schemas(self) -> list[Schema]:
    schemas = []

    for activity in self.activities():
        schema_dict: dict[Literal | schema.Optional, Any] = {
            Literal("name"): self.name,
            Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
        }

        activity_schema = self.activity_schema(activity)
        # If no schema is defined, we just make `input` optional instead of omitting it.
        # This works better with lower-end models that may accidentally pass in an empty dict.
        if activity_schema is None:
            schema_dict[schema.Optional("input")] = {}
        else:
            schema_dict[Literal("input")] = activity_schema.schema

        schemas.append(Schema(schema_dict))

    return schemas

after_run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
def after_run(
    self,
    activity: Callable,
    subtask: ActionsSubtask,
    action: ToolAction,
    value: BaseArtifact,
) -> BaseArtifact:
    super().after_run()

    if self.output_memory:
        output_memories = self.output_memory[getattr(activity, "name")] or []
        for memory in output_memories:
            value = memory.process_output(activity, subtask, value)

        return value
    else:
        return value

are_requirements_met(requirements_path)

Source code in griptape/tools/base_tool.py
def are_requirements_met(self, requirements_path: str) -> bool:
    requirements = Path(requirements_path).read_text().splitlines()

    try:
        for requirement in requirements:
            importlib.metadata.version(requirement)
        return True
    except importlib.metadata.PackageNotFoundError:
        return False

before_run(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]:
    super().before_run()

    return action.input

find_input_memory(memory_name)

Source code in griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
    if self.input_memory:
        return next((m for m in self.input_memory if m.name == memory_name), None)
    else:
        return None

install_dependencies(env=None)

Source code in griptape/tools/base_tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    env = env or {}

    command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

    if self.dependencies_install_directory is None:
        command.extend(["-U"])
    else:
        command.extend(["-t", self.dependencies_install_directory])

    subprocess.run(
        command,
        env=env,
        cwd=self.tool_dir(),
        stdout=None if self.verbose else subprocess.DEVNULL,
        stderr=None if self.verbose else subprocess.DEVNULL,
    )

run(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact:
    try:
        output = self.before_run(activity, subtask, action)

        output = self.try_run(activity, subtask, action, output)

        output = self.after_run(activity, subtask, action, output)
    except Exception as e:
        logging.error(traceback.format_exc())
        output = ErrorArtifact(str(e), exception=e)

    return output

schema()

Source code in griptape/tools/base_tool.py
def schema(self) -> dict:
    full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

    return full_schema.json_schema(f"{self.name} ToolAction Schema")

to_native_tool_name(activity)

Converts a Tool's name and an Activity into to a native tool name.

The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

Parameters:

Name Type Description Default
activity Callable

Activity to convert

required

Returns:

Name Type Description
str str

Native tool name.

Source code in griptape/tools/base_tool.py
def to_native_tool_name(self, activity: Callable) -> str:
    """Converts a Tool's name and an Activity into to a native tool name.

    The native tool name is a combination of the Tool's name and the Activity's name.
    The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

    Args:
        activity: Activity to convert

    Returns:
        str: Native tool name.
    """
    tool_name = self.name
    if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None:
        raise ValueError("Tool name can only contain letters and numbers.")

    activity_name = self.activity_name(activity)
    if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None:
        raise ValueError("Activity name can only contain letters, numbers, and underscores.")

    return f"{tool_name}_{activity_name}"

tool_dir()

Source code in griptape/tools/base_tool.py
def tool_dir(self) -> str:
    class_file = inspect.getfile(self.__class__)

    return os.path.dirname(os.path.abspath(class_file))

try_run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
@observable(tags=["Tool.run()"])
def try_run(
    self,
    activity: Callable,
    subtask: ActionsSubtask,
    action: ToolAction,
    value: Optional[dict],
) -> BaseArtifact:
    activity_result = activity(deepcopy(value))

    if isinstance(activity_result, BaseArtifact):
        result = activity_result
    else:
        logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

        if activity_result is None:
            result = InfoArtifact("Tool returned an empty value")
        else:
            result = InfoArtifact(activity_result)

    return result

validate()

Source code in griptape/tools/base_tool.py
def validate(self) -> bool:
    if not os.path.exists(self.requirements_path):
        raise Exception(f"{self.REQUIREMENTS_FILE} not found")
    return True

validate_output_memory(_, output_memory)

Source code in griptape/tools/base_tool.py
@output_memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
    if output_memory:
        for activity_name, memory_list in output_memory.items():
            if not self.find_activity(activity_name):
                raise ValueError(f"activity {activity_name} doesn't exist")
            if memory_list is None:
                raise ValueError(f"memory list for activity '{activity_name}' can't be None")

            output_memory_names = [memory.name for memory in memory_list]

            if len(output_memory_names) > len(set(output_memory_names)):
                raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

CalculatorTool

Bases: BaseTool

Source code in griptape/tools/calculator/tool.py
class CalculatorTool(BaseTool):
    @activity(
        config={
            "description": "Can be used for computing simple numerical or algebraic calculations in Python",
            "schema": Schema(
                {
                    Literal(
                        "expression",
                        description="Arithmetic expression parsable in pure Python. Single line only. "
                        "Don't use variables. Don't use any imports or external libraries",
                    ): str,
                },
            ),
        },
    )
    def calculate(self, params: dict) -> BaseArtifact:
        import numexpr  # pyright: ignore[reportMissingImports]

        try:
            expression = params["values"]["expression"]

            return TextArtifact(numexpr.evaluate(expression))
        except Exception as e:
            return ErrorArtifact(f"error calculating: {e}")

calculate(params)

Source code in griptape/tools/calculator/tool.py
@activity(
    config={
        "description": "Can be used for computing simple numerical or algebraic calculations in Python",
        "schema": Schema(
            {
                Literal(
                    "expression",
                    description="Arithmetic expression parsable in pure Python. Single line only. "
                    "Don't use variables. Don't use any imports or external libraries",
                ): str,
            },
        ),
    },
)
def calculate(self, params: dict) -> BaseArtifact:
    import numexpr  # pyright: ignore[reportMissingImports]

    try:
        expression = params["values"]["expression"]

        return TextArtifact(numexpr.evaluate(expression))
    except Exception as e:
        return ErrorArtifact(f"error calculating: {e}")

ComputerTool

Bases: BaseTool

Source code in griptape/tools/computer/tool.py
@define
class ComputerTool(BaseTool):
    local_workdir: Optional[str] = field(default=None, kw_only=True)
    container_workdir: str = field(default="/griptape", kw_only=True)
    env_vars: dict = field(factory=dict, kw_only=True)
    dockerfile_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True),
        kw_only=True,
    )
    requirements_txt_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True),
        kw_only=True,
    )
    docker_client: DockerClient = field(
        default=Factory(lambda self: self.default_docker_client(), takes_self=True),
        kw_only=True,
    )

    _tempdir: Optional[tempfile.TemporaryDirectory] = field(default=None, kw_only=True)

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()

        if self.local_workdir:
            Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
        else:
            self._tempdir = tempfile.TemporaryDirectory()
            self.local_workdir = self._tempdir.name

    @docker_client.validator  # pyright: ignore[reportAttributeAccessIssue]
    def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None:
        if not docker_client:
            raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        super().install_dependencies(env)

        self.remove_existing_container(self.container_name(self))
        self.build_image(self)

    @activity(
        config={
            "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
            " files in the file system. If you need to use code output use `print` statements. "
            "You have access to the following external Python libraries: "
            "{{ _self.dependencies() }}",
            "schema": Schema(
                {
                    Literal("code", description="Python code to execute"): str,
                    Literal(
                        "filename",
                        description="name of the file to put the Python code in before executing it",
                    ): str,
                },
            ),
        },
    )
    def execute_code(self, params: dict) -> BaseArtifact:
        code = params["values"]["code"]
        filename = params["values"]["filename"]

        return self.execute_code_in_container(filename, code)

    @activity(
        config={
            "description": "Can be used to execute shell commands in Linux",
            "schema": Schema({Literal("command", description="shell command to execute"): str}),
        },
    )
    def execute_command(self, params: dict) -> BaseArtifact:
        command = params["values"]["command"]

        return self.execute_command_in_container(command)

    def execute_command_in_container(self, command: str) -> BaseArtifact:
        from docker.models.containers import Container

        try:
            binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None

            container = self.docker_client.containers.run(  # pyright: ignore[reportCallIssue]
                self.image_name(self),
                environment=self.env_vars,
                command=command,
                name=self.container_name(self),
                volumes=binds,  # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]].
                stdout=True,
                stderr=True,
                detach=True,
            )

            if isinstance(container, Container):
                container.wait()

                stderr = container.logs(stdout=False, stderr=True).decode().strip()
                stdout = container.logs(stdout=True, stderr=False).decode().strip()

                container.stop()
                container.remove()

                if stderr:
                    return ErrorArtifact(stderr)
                else:
                    return TextArtifact(stdout)
            else:
                return ErrorArtifact("error running container")
        except Exception as e:
            return ErrorArtifact(f"error executing command: {e}")

    def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
        container_file_path = os.path.join(self.container_workdir, filename)

        if self.local_workdir:
            tempdir = None
            local_workdir = self.local_workdir
        else:
            tempdir = tempfile.TemporaryDirectory()
            local_workdir = tempdir.name

        local_file_path = os.path.join(local_workdir, filename)

        try:
            Path(local_file_path).write_text(code)

            return self.execute_command_in_container(f"python {container_file_path}")
        except Exception as e:
            return ErrorArtifact(f"error executing code: {e}")
        finally:
            if tempdir:
                tempdir.cleanup()

    def default_docker_client(self) -> Optional[DockerClient]:
        import docker

        try:
            return docker.from_env()
        except Exception as e:
            logging.error(e)

            return None

    def image_name(self, tool: BaseTool) -> str:
        import stringcase  # pyright: ignore[reportMissingImports]

        return f"{stringcase.snakecase(tool.name)}_image"

    def container_name(self, tool: BaseTool) -> str:
        import stringcase  # pyright: ignore[reportMissingImports]

        return f"{stringcase.snakecase(tool.name)}_container"

    def remove_existing_container(self, name: str) -> None:
        from docker.errors import NotFound
        from docker.models.containers import Container

        try:
            existing_container = self.docker_client.containers.get(name)
            if isinstance(existing_container, Container):
                existing_container.remove(force=True)

                logging.info("Removed existing container %s", name)
        except NotFound:
            pass

    def build_image(self, tool: BaseTool) -> None:
        with tempfile.TemporaryDirectory() as temp_dir:
            shutil.copy(self.dockerfile_path, temp_dir)
            shutil.copy(self.requirements_txt_path, temp_dir)

            image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

            if isinstance(image, tuple):
                logging.info("Built image: %s", image[0].short_id)

    def dependencies(self) -> list[str]:
        with open(self.requirements_txt_path) as file:
            return [line.strip() for line in file]

    def __del__(self) -> None:
        if self._tempdir:
            self._tempdir.cleanup()

container_workdir: str = field(default='/griptape', kw_only=True) class-attribute instance-attribute

docker_client: DockerClient = field(default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True) class-attribute instance-attribute

dockerfile_path: str = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True) class-attribute instance-attribute

env_vars: dict = field(factory=dict, kw_only=True) class-attribute instance-attribute

local_workdir: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_txt_path: str = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()

    if self.local_workdir:
        Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
    else:
        self._tempdir = tempfile.TemporaryDirectory()
        self.local_workdir = self._tempdir.name

__del__()

Source code in griptape/tools/computer/tool.py
def __del__(self) -> None:
    if self._tempdir:
        self._tempdir.cleanup()

build_image(tool)

Source code in griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None:
    with tempfile.TemporaryDirectory() as temp_dir:
        shutil.copy(self.dockerfile_path, temp_dir)
        shutil.copy(self.requirements_txt_path, temp_dir)

        image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

        if isinstance(image, tuple):
            logging.info("Built image: %s", image[0].short_id)

container_name(tool)

Source code in griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str:
    import stringcase  # pyright: ignore[reportMissingImports]

    return f"{stringcase.snakecase(tool.name)}_container"

default_docker_client()

Source code in griptape/tools/computer/tool.py
def default_docker_client(self) -> Optional[DockerClient]:
    import docker

    try:
        return docker.from_env()
    except Exception as e:
        logging.error(e)

        return None

dependencies()

Source code in griptape/tools/computer/tool.py
def dependencies(self) -> list[str]:
    with open(self.requirements_txt_path) as file:
        return [line.strip() for line in file]

execute_code(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
        " files in the file system. If you need to use code output use `print` statements. "
        "You have access to the following external Python libraries: "
        "{{ _self.dependencies() }}",
        "schema": Schema(
            {
                Literal("code", description="Python code to execute"): str,
                Literal(
                    "filename",
                    description="name of the file to put the Python code in before executing it",
                ): str,
            },
        ),
    },
)
def execute_code(self, params: dict) -> BaseArtifact:
    code = params["values"]["code"]
    filename = params["values"]["filename"]

    return self.execute_code_in_container(filename, code)

execute_code_in_container(filename, code)

Source code in griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
    container_file_path = os.path.join(self.container_workdir, filename)

    if self.local_workdir:
        tempdir = None
        local_workdir = self.local_workdir
    else:
        tempdir = tempfile.TemporaryDirectory()
        local_workdir = tempdir.name

    local_file_path = os.path.join(local_workdir, filename)

    try:
        Path(local_file_path).write_text(code)

        return self.execute_command_in_container(f"python {container_file_path}")
    except Exception as e:
        return ErrorArtifact(f"error executing code: {e}")
    finally:
        if tempdir:
            tempdir.cleanup()

execute_command(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute shell commands in Linux",
        "schema": Schema({Literal("command", description="shell command to execute"): str}),
    },
)
def execute_command(self, params: dict) -> BaseArtifact:
    command = params["values"]["command"]

    return self.execute_command_in_container(command)

execute_command_in_container(command)

Source code in griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact:
    from docker.models.containers import Container

    try:
        binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None

        container = self.docker_client.containers.run(  # pyright: ignore[reportCallIssue]
            self.image_name(self),
            environment=self.env_vars,
            command=command,
            name=self.container_name(self),
            volumes=binds,  # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]].
            stdout=True,
            stderr=True,
            detach=True,
        )

        if isinstance(container, Container):
            container.wait()

            stderr = container.logs(stdout=False, stderr=True).decode().strip()
            stdout = container.logs(stdout=True, stderr=False).decode().strip()

            container.stop()
            container.remove()

            if stderr:
                return ErrorArtifact(stderr)
            else:
                return TextArtifact(stdout)
        else:
            return ErrorArtifact("error running container")
    except Exception as e:
        return ErrorArtifact(f"error executing command: {e}")

image_name(tool)

Source code in griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str:
    import stringcase  # pyright: ignore[reportMissingImports]

    return f"{stringcase.snakecase(tool.name)}_image"

install_dependencies(env=None)

Source code in griptape/tools/computer/tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    super().install_dependencies(env)

    self.remove_existing_container(self.container_name(self))
    self.build_image(self)

remove_existing_container(name)

Source code in griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None:
    from docker.errors import NotFound
    from docker.models.containers import Container

    try:
        existing_container = self.docker_client.containers.get(name)
        if isinstance(existing_container, Container):
            existing_container.remove(force=True)

            logging.info("Removed existing container %s", name)
    except NotFound:
        pass

validate_docker_client(_, docker_client)

Source code in griptape/tools/computer/tool.py
@docker_client.validator  # pyright: ignore[reportAttributeAccessIssue]
def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None:
    if not docker_client:
        raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

DateTimeTool

Bases: BaseTool

Source code in griptape/tools/date_time/tool.py
class DateTimeTool(BaseTool):
    @activity(config={"description": "Can be used to return current date and time."})
    def get_current_datetime(self) -> BaseArtifact:
        try:
            current_datetime = datetime.now()

            return TextArtifact(str(current_datetime))
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

    @activity(
        config={
            "description": "Can be used to return a relative date and time.",
            "schema": Schema(
                {
                    Literal(
                        "relative_date_string",
                        description='Relative date in English. For example, "now EST", "20 minutes ago", '
                        '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                    ): str,
                },
            ),
        },
    )
    def get_relative_datetime(self, params: dict) -> BaseArtifact:
        from dateparser import parse

        try:
            date_string = params["values"]["relative_date_string"]
            relative_datetime = parse(date_string)

            if relative_datetime:
                return TextArtifact(str(relative_datetime))
            else:
                return ErrorArtifact("invalid date string")
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

get_current_datetime()

Source code in griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."})
def get_current_datetime(self) -> BaseArtifact:
    try:
        current_datetime = datetime.now()

        return TextArtifact(str(current_datetime))
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

get_relative_datetime(params)

Source code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to return a relative date and time.",
        "schema": Schema(
            {
                Literal(
                    "relative_date_string",
                    description='Relative date in English. For example, "now EST", "20 minutes ago", '
                    '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                ): str,
            },
        ),
    },
)
def get_relative_datetime(self, params: dict) -> BaseArtifact:
    from dateparser import parse

    try:
        date_string = params["values"]["relative_date_string"]
        relative_datetime = parse(date_string)

        if relative_datetime:
            return TextArtifact(str(relative_datetime))
        else:
            return ErrorArtifact("invalid date string")
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

EmailTool

Bases: BaseTool

Tool for working with email.

Attributes:

Name Type Description
username Optional[str]

Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com.

password Optional[str]

Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password.

email_max_retrieve_count Optional[int]

Used to limit the number of messages retrieved during any given activities.

smtp_host Optional[str]

Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the send activity.

smtp_port Optional[int]

Port of the SMTP server. Example: 465. Required when using the send activity.

smtp_use_ssl bool

Whether to use SSL when sending email via the SMTP protocol.

smtp_user Optional[str]

Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the send activity.

smtp_password Optional[str]

Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the send activity.

imap_url Optional[str]

Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the retrieve activity.

imap_user Optional[str]

Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the retrieve activity.

imap_password Optional[str]

Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the retrieve activity.

mailboxes dict[str, Optional[str]]

Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the retrieve activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}

email_loader EmailLoader

Instance of EmailLoader.

Source code in griptape/tools/email/tool.py
@define
class EmailTool(BaseTool):
    """Tool for working with email.

    Attributes:
        username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol.
            Example: bender@futurama.com.
        password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail,
            this would be an App Password.
        email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities.
        smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the `send` activity.
        smtp_port: Port of the SMTP server. Example: 465. Required when using the `send` activity.
        smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol.
        smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the `send` activity.
        smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the `send` activity.
        imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the `retrieve` activity.
        imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the `retrieve` activity.
        imap_password: Password to retrieve email via the IMAP protocol.  Overrides password for IMAP only. Required when using the `retrieve` activity.
        mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the `retrieve` activity.
            Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}
        email_loader: Instance of `EmailLoader`.
    """

    username: Optional[str] = field(default=None, kw_only=True)
    password: Optional[str] = field(default=None, kw_only=True)
    email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True)
    smtp_host: Optional[str] = field(default=None, kw_only=True)
    smtp_port: Optional[int] = field(default=None, kw_only=True)
    smtp_use_ssl: bool = field(default=True, kw_only=True)
    smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    imap_url: Optional[str] = field(default=None, kw_only=True)
    imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    mailboxes: dict[str, Optional[str]] = field(default=None, kw_only=True)
    email_loader: EmailLoader = field(
        default=Factory(
            lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password),
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to retrieve emails."
            "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
            "schema": Schema(
                {
                    Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                    schema.Optional(
                        Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"),
                    ): str,
                    schema.Optional(
                        Literal("search_criteria", description="Optional search criteria to filter emails by key"),
                    ): str,
                    schema.Optional(Literal("max_count", description="Optional max email count")): int,
                },
            ),
        },
    )
    def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
        if self.mailboxes is None:
            return ErrorArtifact("mailboxes is required")

        values = params["values"]
        max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

        return self.email_loader.load(
            EmailLoader.EmailQuery(
                label=values["label"],
                key=values.get("key"),
                search_criteria=values.get("search_criteria"),
                max_count=max_count,
            ),
        )

    @activity(
        config={
            "description": "Can be used to send emails",
            "schema": Schema(
                {
                    Literal("to", description="Recipient's email address"): str,
                    Literal("subject", description="Email subject"): str,
                    Literal("body", description="Email body"): str,
                },
            ),
        },
    )
    def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        if self.smtp_user is None:
            return ErrorArtifact("smtp_user is required")
        if self.smtp_password is None:
            return ErrorArtifact("smtp_password is required")
        if self.smtp_host is None:
            return ErrorArtifact("smtp_host is required")
        if self.smtp_port is None:
            return ErrorArtifact("smtp_port is required")

        msg = MIMEText(values["body"])
        msg["Subject"] = values["subject"]
        msg["From"] = self.smtp_user
        msg["To"] = values["to"]

        try:
            with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
                client.login(self.smtp_user, self.smtp_password)
                client.sendmail(msg["From"], [msg["To"]], msg.as_string())
                return InfoArtifact("email was successfully sent")
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error sending email: {e}")

    def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL:
        if self.smtp_use_ssl:
            return smtplib.SMTP_SSL(smtp_host, smtp_port)
        else:
            return smtplib.SMTP(smtp_host, smtp_port)

email_loader: EmailLoader = field(default=Factory(lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True) class-attribute instance-attribute

email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute

imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

imap_url: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

mailboxes: dict[str, Optional[str]] = field(default=None, kw_only=True) class-attribute instance-attribute

password: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_host: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

smtp_port: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_use_ssl: bool = field(default=True, kw_only=True) class-attribute instance-attribute

smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

username: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

retrieve(params)

Source code in griptape/tools/email/tool.py
@activity(
    config={
        "description": "Can be used to retrieve emails."
        "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
        "schema": Schema(
            {
                Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                schema.Optional(
                    Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"),
                ): str,
                schema.Optional(
                    Literal("search_criteria", description="Optional search criteria to filter emails by key"),
                ): str,
                schema.Optional(Literal("max_count", description="Optional max email count")): int,
            },
        ),
    },
)
def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
    if self.mailboxes is None:
        return ErrorArtifact("mailboxes is required")

    values = params["values"]
    max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

    return self.email_loader.load(
        EmailLoader.EmailQuery(
            label=values["label"],
            key=values.get("key"),
            search_criteria=values.get("search_criteria"),
            max_count=max_count,
        ),
    )

send(params)

Source code in griptape/tools/email/tool.py
@activity(
    config={
        "description": "Can be used to send emails",
        "schema": Schema(
            {
                Literal("to", description="Recipient's email address"): str,
                Literal("subject", description="Email subject"): str,
                Literal("body", description="Email body"): str,
            },
        ),
    },
)
def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    if self.smtp_user is None:
        return ErrorArtifact("smtp_user is required")
    if self.smtp_password is None:
        return ErrorArtifact("smtp_password is required")
    if self.smtp_host is None:
        return ErrorArtifact("smtp_host is required")
    if self.smtp_port is None:
        return ErrorArtifact("smtp_port is required")

    msg = MIMEText(values["body"])
    msg["Subject"] = values["subject"]
    msg["From"] = self.smtp_user
    msg["To"] = values["to"]

    try:
        with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
            client.login(self.smtp_user, self.smtp_password)
            client.sendmail(msg["From"], [msg["To"]], msg.as_string())
            return InfoArtifact("email was successfully sent")
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error sending email: {e}")

ExtractionTool

Bases: BaseTool, RuleMixin

Tool for using an Extraction Engine.

Attributes:

Name Type Description
extraction_engine BaseExtractionEngine

ExtractionEngine.

Source code in griptape/tools/extraction/tool.py
@define(kw_only=True)
class ExtractionTool(BaseTool, RuleMixin):
    """Tool for using an Extraction Engine.

    Attributes:
        extraction_engine: `ExtractionEngine`.
    """

    extraction_engine: BaseExtractionEngine = field()

    @activity(
        config={
            "description": "Can be used extract structured text from data.",
            "schema": Schema(
                {
                    Literal("data"): Or(
                        str,
                        Schema(
                            {
                                "memory_name": str,
                                "artifact_namespace": str,
                            }
                        ),
                    ),
                }
            ),
        },
    )
    def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
        data = params["values"]["data"]

        if isinstance(data, str):
            artifacts = ListArtifact([TextArtifact(data)])
        else:
            memory = self.find_input_memory(data["memory_name"])
            artifact_namespace = data["artifact_namespace"]

            if memory is not None:
                artifacts = memory.load_artifacts(artifact_namespace)
            else:
                return ErrorArtifact("memory not found")

        return self.extraction_engine.extract_artifacts(artifacts)

extraction_engine: BaseExtractionEngine = field() class-attribute instance-attribute

extract(params)

Source code in griptape/tools/extraction/tool.py
@activity(
    config={
        "description": "Can be used extract structured text from data.",
        "schema": Schema(
            {
                Literal("data"): Or(
                    str,
                    Schema(
                        {
                            "memory_name": str,
                            "artifact_namespace": str,
                        }
                    ),
                ),
            }
        ),
    },
)
def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
    data = params["values"]["data"]

    if isinstance(data, str):
        artifacts = ListArtifact([TextArtifact(data)])
    else:
        memory = self.find_input_memory(data["memory_name"])
        artifact_namespace = data["artifact_namespace"]

        if memory is not None:
            artifacts = memory.load_artifacts(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

    return self.extraction_engine.extract_artifacts(artifacts)

FileManagerTool

Bases: BaseTool

FileManagerTool is a tool that can be used to list, load, and save files.

Attributes:

Name Type Description
file_manager_driver BaseFileManagerDriver

File Manager Driver to use to list, load, and save files.

Source code in griptape/tools/file_manager/tool.py
@define
class FileManagerTool(BaseTool):
    """FileManagerTool is a tool that can be used to list, load, and save files.

    Attributes:
        file_manager_driver: File Manager Driver to use to list, load, and save files.
    """

    file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True)

    loaders: dict[str, loaders.BaseLoader] = field(
        default=Factory(
            lambda self: {
                "application/pdf": loaders.PdfLoader(file_manager_driver=self.file_manager_driver),
                "text/csv": loaders.CsvLoader(file_manager_driver=self.file_manager_driver),
                "text": loaders.TextLoader(file_manager_driver=self.file_manager_driver),
                "image": loaders.ImageLoader(file_manager_driver=self.file_manager_driver),
                "application/octet-stream": BlobLoader(file_manager_driver=self.file_manager_driver),
            },
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to list files on disk",
            "schema": Schema(
                {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str},
            ),
        },
    )
    def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        path = params["values"]["path"]
        return self.file_manager_driver.list_files(path)

    @activity(
        config={
            "description": "Can be used to load files from disk",
            "schema": Schema(
                {
                    Literal(
                        "paths",
                        description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                    ): Schema([str]),
                },
            ),
        },
    )
    def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
        paths = params["values"]["paths"]
        artifacts = []

        for path in paths:
            abs_path = os.path.join(self.file_manager_driver.workdir, path)
            mime_type = get_mime_type(abs_path)
            loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key)))

            artifact = loader.load(path)
            if isinstance(artifact, ListArtifact):
                artifacts.extend(artifact.value)
            else:
                artifacts.append(artifact)
        return ListArtifact(artifacts)

    @activity(
        config={
            "description": "Can be used to save memory artifacts to disk",
            "schema": Schema(
                {
                    Literal(
                        "dir_name",
                        description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                    ): str,
                    Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                },
            ),
        },
    )
    def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
        dir_name = params["values"]["dir_name"]
        file_name = params["values"]["file_name"]
        memory_name = params["values"]["memory_name"]
        artifact_namespace = params["values"]["artifact_namespace"]

        memory = self.find_input_memory(params["values"]["memory_name"])
        if not memory:
            return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

        list_artifact = memory.load_artifacts(artifact_namespace)

        if len(list_artifact) == 0:
            return ErrorArtifact(
                f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts",
            )

        for artifact in list_artifact.value:
            formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
            try:
                value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text()
                self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value)
            except FileNotFoundError:
                return ErrorArtifact("Path not found")
            except IsADirectoryError:
                return ErrorArtifact("Path is a directory")
            except NotADirectoryError:
                return ErrorArtifact("Not a directory")
            except Exception as e:
                return ErrorArtifact(f"Failed to load file: {str(e)}")

        return InfoArtifact("Successfully saved memory artifacts to disk")

    @activity(
        config={
            "description": "Can be used to save content to a file",
            "schema": Schema(
                {
                    Literal(
                        "path",
                        description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                    ): str,
                    "content": str,
                },
            ),
        },
    )
    def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
        path = params["values"]["path"]
        content = params["values"]["content"]
        return self.file_manager_driver.save_file(path, content)

file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) class-attribute instance-attribute

loaders: dict[str, loaders.BaseLoader] = field(default=Factory(lambda self: {'application/pdf': loaders.PdfLoader(file_manager_driver=self.file_manager_driver), 'text/csv': loaders.CsvLoader(file_manager_driver=self.file_manager_driver), 'text': loaders.TextLoader(file_manager_driver=self.file_manager_driver), 'image': loaders.ImageLoader(file_manager_driver=self.file_manager_driver), 'application/octet-stream': BlobLoader(file_manager_driver=self.file_manager_driver)}, takes_self=True), kw_only=True) class-attribute instance-attribute

list_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to list files on disk",
        "schema": Schema(
            {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str},
        ),
    },
)
def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    path = params["values"]["path"]
    return self.file_manager_driver.list_files(path)

load_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to load files from disk",
        "schema": Schema(
            {
                Literal(
                    "paths",
                    description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                ): Schema([str]),
            },
        ),
    },
)
def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
    paths = params["values"]["paths"]
    artifacts = []

    for path in paths:
        abs_path = os.path.join(self.file_manager_driver.workdir, path)
        mime_type = get_mime_type(abs_path)
        loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key)))

        artifact = loader.load(path)
        if isinstance(artifact, ListArtifact):
            artifacts.extend(artifact.value)
        else:
            artifacts.append(artifact)
    return ListArtifact(artifacts)

save_content_to_file(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save content to a file",
        "schema": Schema(
            {
                Literal(
                    "path",
                    description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                ): str,
                "content": str,
            },
        ),
    },
)
def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
    path = params["values"]["path"]
    content = params["values"]["content"]
    return self.file_manager_driver.save_file(path, content)

save_memory_artifacts_to_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save memory artifacts to disk",
        "schema": Schema(
            {
                Literal(
                    "dir_name",
                    description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                ): str,
                Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                "memory_name": str,
                "artifact_namespace": str,
            },
        ),
    },
)
def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
    dir_name = params["values"]["dir_name"]
    file_name = params["values"]["file_name"]
    memory_name = params["values"]["memory_name"]
    artifact_namespace = params["values"]["artifact_namespace"]

    memory = self.find_input_memory(params["values"]["memory_name"])
    if not memory:
        return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

    list_artifact = memory.load_artifacts(artifact_namespace)

    if len(list_artifact) == 0:
        return ErrorArtifact(
            f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts",
        )

    for artifact in list_artifact.value:
        formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
        try:
            value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text()
            self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value)
        except FileNotFoundError:
            return ErrorArtifact("Path not found")
        except IsADirectoryError:
            return ErrorArtifact("Path is a directory")
        except NotADirectoryError:
            return ErrorArtifact("Not a directory")
        except Exception as e:
            return ErrorArtifact(f"Failed to load file: {str(e)}")

    return InfoArtifact("Successfully saved memory artifacts to disk")

GriptapeCloudToolTool

Bases: BaseGriptapeCloudTool

Runs a Griptape Cloud hosted Tool.

Attributes:

Name Type Description
tool_id str

The ID of the tool to run.

Source code in griptape/tools/griptape_cloud_tool/tool.py
@define()
class GriptapeCloudToolTool(BaseGriptapeCloudTool):
    """Runs a Griptape Cloud hosted Tool.

    Attributes:
        tool_id: The ID of the tool to run.
    """

    tool_id: str = field(kw_only=True)

    def __attrs_post_init__(self) -> None:
        self._init_activities()

    def _init_activities(self) -> None:
        schema = self._get_schema()
        tool_name, activity_schemas = self._parse_schema(schema)

        if self.name == self.__class__.__name__:
            self.name = tool_name

        for activity_name, (description, activity_schema) in activity_schemas.items():
            activity_handler = self._create_activity_handler(activity_name, description, activity_schema)

            setattr(self, activity_name, MethodType(activity_handler, self))

    def _get_schema(self) -> dict:
        response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers)

        response.raise_for_status()
        schema = response.json()

        if not isinstance(schema, dict):
            raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}")

        if "error" in schema and "tool_run_id" in schema:
            raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}")

        return schema

    def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]:
        """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas."""
        activities = {}

        name = schema.get("info", {}).get("title")

        for path, path_info in schema.get("paths", {}).items():
            if not path.startswith("/activities"):
                continue
            for method, method_info in path_info.items():
                if "post" in method.lower():
                    activity_name = method_info["operationId"]
                    description = method_info.get("description", "")

                    activity_schema = self.__extract_schema_from_ref(
                        schema,
                        method_info.get("requestBody", {})
                        .get("content", {})
                        .get("application/json", {})
                        .get("schema", {}),
                    )

                    activities[activity_name] = (description, activity_schema)

        return name, activities

    def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema:
        """Extracts a schema from a $ref if present, resolving it into native schema properties."""
        if "$ref" in schema_ref:
            # Resolve the reference and retrieve the schema data
            ref_path = schema_ref["$ref"].split("/")[-1]
            schema_data = schema["components"]["schemas"].get(ref_path, {})
        else:
            # Use the provided schema directly if no $ref is found
            schema_data = schema_ref

        # Convert the schema_data dictionary into a Schema with its properties
        properties = {}
        for prop, prop_info in schema_data.get("properties", {}).items():
            prop_type = prop_info.get("type", "string")
            prop_description = prop_info.get("description", "")
            schema_prop = Literal(prop, description=prop_description)
            is_optional = prop not in schema_data.get("required", [])

            if is_optional:
                schema_prop = SchemaOptional(schema_prop)

            properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info)

        return Schema(properties)

    def _map_openapi_type_to_python(self, openapi_type: str, schema_info: Optional[dict] = None) -> type | list[type]:
        """Maps OpenAPI types to native Python types."""
        type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict}

        if openapi_type == "array" and schema_info is not None and "items" in schema_info:
            items_type = schema_info["items"].get("type", "string")
            return [self._map_openapi_type_to_python(items_type)]  # pyright: ignore[reportReturnType]
        else:
            return type_mapping.get(openapi_type, str)

    def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable:
        """Creates an activity handler method for the tool."""

        @activity(config={"name": activity_name, "description": description, "schema": activity_schema})
        def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any:
            return self._run_activity(activity_name, values)

        return activity_handler

    def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact:
        """Runs an activity on the tool with the provided parameters."""
        url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}")

        response = requests.post(url, json=params, headers=self.headers)

        response.raise_for_status()

        try:
            return BaseArtifact.from_dict(response.json())
        except ValueError:
            return TextArtifact(response.text)

tool_id: str = field(kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/griptape_cloud_tool/tool.py
def __attrs_post_init__(self) -> None:
    self._init_activities()

__extract_schema_from_ref(schema, schema_ref)

Extracts a schema from a $ref if present, resolving it into native schema properties.

Source code in griptape/tools/griptape_cloud_tool/tool.py
def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema:
    """Extracts a schema from a $ref if present, resolving it into native schema properties."""
    if "$ref" in schema_ref:
        # Resolve the reference and retrieve the schema data
        ref_path = schema_ref["$ref"].split("/")[-1]
        schema_data = schema["components"]["schemas"].get(ref_path, {})
    else:
        # Use the provided schema directly if no $ref is found
        schema_data = schema_ref

    # Convert the schema_data dictionary into a Schema with its properties
    properties = {}
    for prop, prop_info in schema_data.get("properties", {}).items():
        prop_type = prop_info.get("type", "string")
        prop_description = prop_info.get("description", "")
        schema_prop = Literal(prop, description=prop_description)
        is_optional = prop not in schema_data.get("required", [])

        if is_optional:
            schema_prop = SchemaOptional(schema_prop)

        properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info)

    return Schema(properties)

ImageQueryTool

Bases: BaseTool

Source code in griptape/tools/image_query/tool.py
@define
class ImageQueryTool(BaseTool):
    prompt_driver: BasePromptDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True)

    @activity(
        config={
            "description": "This tool can be used to query the contents of images on disk.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_paths", description="The paths to an image files on disk."): Schema([str]),
                },
            ),
        },
    )
    def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_paths = params["values"]["image_paths"]

        image_artifacts = []
        for image_path in image_paths:
            image_artifacts.append(self.image_loader.load(image_path))

        return cast(
            TextArtifact,
            self.prompt_driver.run(
                PromptStack.from_artifact(
                    ListArtifact([TextArtifact(query), *image_artifacts]),
                )
            ).to_artifact(),
        )

    @activity(
        config={
            "description": "This tool can be used to query the contents of images in memory.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_artifacts", description="Image artifact memory references."): [
                        {"image_artifact_namespace": str, "image_artifact_name": str},
                    ],
                    "memory_name": str,
                },
            ),
        },
    )
    def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_artifact_references = params["values"]["image_artifacts"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        image_artifacts = []
        for image_artifact_reference in image_artifact_references:
            try:
                image_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    ImageArtifact,
                )

                image_artifacts.append(cast(ImageArtifact, image_artifact))
            except ValueError:
                # If we're unable to parse the artifact as an ImageArtifact, attempt to
                # parse a BlobArtifact and load it as an ImageArtifact.
                blob_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    BlobArtifact,
                )

                image_artifacts.append(self.image_loader.load(blob_artifact.value))
            except Exception as e:
                return ErrorArtifact(str(e))

        return cast(
            TextArtifact,
            self.prompt_driver.run(
                PromptStack.from_artifact(
                    ListArtifact([TextArtifact(query), *image_artifacts]),
                )
            ).to_artifact(),
        )

image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True) class-attribute instance-attribute

prompt_driver: BasePromptDriver = field(kw_only=True) class-attribute instance-attribute

query_image_from_disk(params)

Source code in griptape/tools/image_query/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images on disk.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_paths", description="The paths to an image files on disk."): Schema([str]),
            },
        ),
    },
)
def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_paths = params["values"]["image_paths"]

    image_artifacts = []
    for image_path in image_paths:
        image_artifacts.append(self.image_loader.load(image_path))

    return cast(
        TextArtifact,
        self.prompt_driver.run(
            PromptStack.from_artifact(
                ListArtifact([TextArtifact(query), *image_artifacts]),
            )
        ).to_artifact(),
    )

query_images_from_memory(params)

Source code in griptape/tools/image_query/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images in memory.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_artifacts", description="Image artifact memory references."): [
                    {"image_artifact_namespace": str, "image_artifact_name": str},
                ],
                "memory_name": str,
            },
        ),
    },
)
def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_artifact_references = params["values"]["image_artifacts"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    image_artifacts = []
    for image_artifact_reference in image_artifact_references:
        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                ImageArtifact,
            )

            image_artifacts.append(cast(ImageArtifact, image_artifact))
        except ValueError:
            # If we're unable to parse the artifact as an ImageArtifact, attempt to
            # parse a BlobArtifact and load it as an ImageArtifact.
            blob_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                BlobArtifact,
            )

            image_artifacts.append(self.image_loader.load(blob_artifact.value))
        except Exception as e:
            return ErrorArtifact(str(e))

    return cast(
        TextArtifact,
        self.prompt_driver.run(
            PromptStack.from_artifact(
                ListArtifact([TextArtifact(query), *image_artifacts]),
            )
        ).to_artifact(),
    )

InpaintingImageGenerationTool

Bases: BaseImageGenerationTool

A tool that can be used to generate prompted inpaintings of an image.

Attributes:

Name Type Description
image_generation_driver BaseImageGenerationDriver

The image generation driver used to generate the image.

output_dir BaseImageGenerationDriver

If provided, the generated image will be written to disk in output_dir.

output_file BaseImageGenerationDriver

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/inpainting_image_generation/tool.py
@define
class InpaintingImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted inpaintings of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Modifies an image within a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                },
            ),
        },
    )
    def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        return self._generate_inpainting(
            prompt, negative_prompt, cast(ImageArtifact, input_artifact), cast(ImageArtifact, mask_artifact)
        )

    @activity(
        config={
            "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "image_artifact_name": str,
                    "mask_artifact_namespace": str,
                    "mask_artifact_name": str,
                },
            ),
        },
    )
    def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_namespace,
                image_artifact_name,
                ImageArtifact,
            )
            mask_artifact = load_artifact_from_memory(
                memory,
                mask_artifact_namespace,
                mask_artifact_name,
                ImageArtifact,
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_inpainting(
            prompt, negative_prompt, cast(ImageArtifact, image_artifact), cast(ImageArtifact, mask_artifact)
        )

    def _generate_inpainting(
        self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
    ) -> ImageArtifact:
        output_artifact = self.image_generation_driver.run_image_inpainting(
            prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) class-attribute instance-attribute

image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

image_inpainting_from_file(params)

Source code in griptape/tools/inpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image within a specified mask area using image and mask files.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
                Literal("mask_file", description="The path to mask image file."): str,
            },
        ),
    },
)
def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_file = params["values"]["image_file"]
    mask_file = params["values"]["mask_file"]

    input_artifact = self.image_loader.load(image_file)
    mask_artifact = self.image_loader.load(mask_file)

    return self._generate_inpainting(
        prompt, negative_prompt, cast(ImageArtifact, input_artifact), cast(ImageArtifact, mask_artifact)
    )

image_inpainting_from_memory(params)

Source code in griptape/tools/inpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                "memory_name": str,
                "image_artifact_namespace": str,
                "image_artifact_name": str,
                "mask_artifact_namespace": str,
                "mask_artifact_name": str,
            },
        ),
    },
)
def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_artifact_namespace = params["values"]["image_artifact_namespace"]
    image_artifact_name = params["values"]["image_artifact_name"]
    mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
    mask_artifact_name = params["values"]["mask_artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(
            memory,
            image_artifact_namespace,
            image_artifact_name,
            ImageArtifact,
        )
        mask_artifact = load_artifact_from_memory(
            memory,
            mask_artifact_namespace,
            mask_artifact_name,
            ImageArtifact,
        )
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_inpainting(
        prompt, negative_prompt, cast(ImageArtifact, image_artifact), cast(ImageArtifact, mask_artifact)
    )

OutpaintingImageGenerationTool

Bases: BaseImageGenerationTool

A tool that can be used to generate prompted outpaintings of an image.

Attributes:

Name Type Description
image_generation_driver BaseImageGenerationDriver

The image generation driver used to generate the image.

output_dir BaseImageGenerationDriver

If provided, the generated image will be written to disk in output_dir.

output_file BaseImageGenerationDriver

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/outpainting_image_generation/tool.py
@define
class OutpaintingImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted outpaintings of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Modifies an image outside a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                },
            ),
        },
    )
    def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact)

    @activity(
        config={
            "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "mask_artifact_namespace": str,
                },
            ),
        },
    )
    def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_namespace,
                image_artifact_name,
                ImageArtifact,
            )
            mask_artifact = load_artifact_from_memory(
                memory,
                mask_artifact_namespace,
                mask_artifact_name,
                ImageArtifact,
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_outpainting(
            prompt, negative_prompt, cast(ImageArtifact, image_artifact), cast(ImageArtifact, mask_artifact)