Skip to content

Tools

__all__ = ['BaseTool', 'BaseAwsClient', 'AwsIamClient', 'AwsS3Client', 'BaseGoogleClient', 'GoogleGmailClient', 'GoogleDocsClient', 'GoogleCalendarClient', 'GoogleDriveClient', 'Calculator', 'WebSearch', 'WebScraper', 'SqlClient', 'EmailClient', 'RestApiClient', 'FileManager', 'VectorStoreClient', 'DateTime', 'TaskMemoryClient', 'Computer', 'OpenWeatherClient', 'PromptImageGenerationClient', 'VariationImageGenerationClient', 'InpaintingImageGenerationClient', 'OutpaintingImageGenerationClient', 'GriptapeCloudKnowledgeBaseClient', 'StructureRunClient', 'ImageQueryClient'] module-attribute

AwsIamClient

Bases: BaseAwsClient

Source code in griptape/tools/aws_iam_client/tool.py
@define
class AwsIamClient(BaseAwsClient):
    iam_client: Client = field(default=Factory(lambda self: self.session.client("iam"), takes_self=True), kw_only=True)

    @activity(
        config={
            "description": "Can be use to get a policy for an AWS IAM user.",
            "schema": Schema(
                {
                    Literal("user_name", description="Username of the AWS IAM user."): str,
                    Literal(
                        "policy_name",
                        description="PolicyName of the AWS IAM Policy embedded in the specified IAM user.",
                    ): str,
                }
            ),
        }
    )
    def get_user_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            policy = self.iam_client.get_user_policy(
                UserName=params["values"]["user_name"], PolicyName=params["values"]["policy_name"]
            )
            return TextArtifact(policy["PolicyDocument"])
        except Exception as e:
            return ErrorArtifact(f"error returning policy document: {e}")

    @activity(config={"description": "Can be used to list AWS MFA Devices"})
    def list_mfa_devices(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            devices = self.iam_client.list_mfa_devices()
            return ListArtifact([TextArtifact(str(d)) for d in devices["MFADevices"]])
        except Exception as e:
            return ErrorArtifact(f"error listing mfa devices: {e}")

    @activity(
        config={
            "description": "Can be used to list policies for a given IAM user.",
            "schema": Schema(
                {Literal("user_name", description="Username of the AWS IAM user for which to list policies."): str}
            ),
        }
    )
    def list_user_policies(self, params: dict) -> ListArtifact | ErrorArtifact:
        try:
            policies = self.iam_client.list_user_policies(UserName=params["values"]["user_name"])
            policy_names = policies["PolicyNames"]

            attached_policies = self.iam_client.list_attached_user_policies(UserName=params["values"]["user_name"])
            attached_policy_names = [p["PolicyName"] for p in attached_policies["AttachedPolicies"]]

            return ListArtifact([TextArtifact(str(p)) for p in policy_names + attached_policy_names])
        except Exception as e:
            return ErrorArtifact(f"error listing iam user policies: {e}")

    @activity(config={"description": "Can be used to list AWS IAM users."})
    def list_users(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            users = self.iam_client.list_users()
            return ListArtifact([TextArtifact(str(u)) for u in users["Users"]])
        except Exception as e:
            return ErrorArtifact(f"error listing s3 users: {e}")

iam_client: Client = field(default=Factory(lambda self: self.session.client('iam'), takes_self=True), kw_only=True) class-attribute instance-attribute

get_user_policy(params)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(
    config={
        "description": "Can be use to get a policy for an AWS IAM user.",
        "schema": Schema(
            {
                Literal("user_name", description="Username of the AWS IAM user."): str,
                Literal(
                    "policy_name",
                    description="PolicyName of the AWS IAM Policy embedded in the specified IAM user.",
                ): str,
            }
        ),
    }
)
def get_user_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        policy = self.iam_client.get_user_policy(
            UserName=params["values"]["user_name"], PolicyName=params["values"]["policy_name"]
        )
        return TextArtifact(policy["PolicyDocument"])
    except Exception as e:
        return ErrorArtifact(f"error returning policy document: {e}")

list_mfa_devices(_)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(config={"description": "Can be used to list AWS MFA Devices"})
def list_mfa_devices(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        devices = self.iam_client.list_mfa_devices()
        return ListArtifact([TextArtifact(str(d)) for d in devices["MFADevices"]])
    except Exception as e:
        return ErrorArtifact(f"error listing mfa devices: {e}")

list_user_policies(params)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(
    config={
        "description": "Can be used to list policies for a given IAM user.",
        "schema": Schema(
            {Literal("user_name", description="Username of the AWS IAM user for which to list policies."): str}
        ),
    }
)
def list_user_policies(self, params: dict) -> ListArtifact | ErrorArtifact:
    try:
        policies = self.iam_client.list_user_policies(UserName=params["values"]["user_name"])
        policy_names = policies["PolicyNames"]

        attached_policies = self.iam_client.list_attached_user_policies(UserName=params["values"]["user_name"])
        attached_policy_names = [p["PolicyName"] for p in attached_policies["AttachedPolicies"]]

        return ListArtifact([TextArtifact(str(p)) for p in policy_names + attached_policy_names])
    except Exception as e:
        return ErrorArtifact(f"error listing iam user policies: {e}")

list_users(_)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(config={"description": "Can be used to list AWS IAM users."})
def list_users(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        users = self.iam_client.list_users()
        return ListArtifact([TextArtifact(str(u)) for u in users["Users"]])
    except Exception as e:
        return ErrorArtifact(f"error listing s3 users: {e}")

AwsS3Client

Bases: BaseAwsClient

Source code in griptape/tools/aws_s3_client/tool.py
@define
class AwsS3Client(BaseAwsClient):
    s3_client: Client = field(default=Factory(lambda self: self.session.client("s3"), takes_self=True), kw_only=True)

    @activity(
        config={
            "description": "Can be used to get an access control list (ACL) of an AWS S3 bucket.",
            "schema": Schema(
                {
                    Literal(
                        "bucket_name",
                        description="The bucket name that contains the object for which to get the ACL information.",
                    ): str
                }
            ),
        }
    )
    def get_bucket_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            acl = self.s3_client.get_bucket_acl(Bucket=params["values"]["bucket_name"])
            return TextArtifact(acl)
        except Exception as e:
            return ErrorArtifact(f"error getting bucket acl: {e}")

    @activity(
        config={
            "description": "Can be used to get an AWS S3 bucket policy.",
            "schema": Schema(
                {Literal("bucket_name", description="The bucket name for which to get the bucket policy."): str}
            ),
        }
    )
    def get_bucket_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            policy = self.s3_client.get_bucket_policy(Bucket=params["values"]["bucket_name"])
            return TextArtifact(policy)
        except Exception as e:
            return ErrorArtifact(f"error getting bucket policy: {e}")

    @activity(
        config={
            "description": "Can be used to get an access control list (ACL) of an object in the AWS S3 bucket.",
            "schema": Schema(
                {
                    Literal("bucket_name", description="Name of the AWS S3 bucket for which to get an ACL."): str,
                    Literal("object_key", description="Key of the object for which to get the ACL information."): str,
                }
            ),
        }
    )
    def get_object_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            acl = self.s3_client.get_object_acl(
                Bucket=params["values"]["bucket_name"], Key=params["values"]["object_key"]
            )
            return TextArtifact(acl)
        except Exception as e:
            return ErrorArtifact(f"error getting object acl: {e}")

    @activity(config={"description": "Can be used to list all AWS S3 buckets."})
    def list_s3_buckets(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            buckets = self.s3_client.list_buckets()

            return ListArtifact([TextArtifact(str(b)) for b in buckets["Buckets"]])
        except Exception as e:
            return ErrorArtifact(f"error listing s3 buckets: {e}")

    @activity(
        config={
            "description": "Can be used to list all objects in an AWS S3 bucket.",
            "schema": Schema({Literal("bucket_name", description="The name of the S3 bucket to list."): str}),
        }
    )
    def list_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
        try:
            objects = self.s3_client.list_objects_v2(Bucket=params["values"]["bucket_name"])

            return ListArtifact([TextArtifact(str(o)) for o in objects["Contents"]])
        except Exception as e:
            return ErrorArtifact(f"error listing objects in bucket: {e}")

    @activity(
        config={
            "description": "Can be used to upload memory artifacts to an AWS S3 bucket",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    "bucket_name": str,
                    Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                }
            ),
        }
    )
    def upload_memory_artifacts_to_s3(self, params: dict) -> InfoArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        bucket_name = params["values"]["bucket_name"]
        object_key = params["values"]["object_key"]

        if memory:
            artifacts = memory.load_artifacts(artifact_namespace)

            if len(artifacts) == 0:
                return ErrorArtifact("no artifacts found")
            elif len(artifacts) == 1:
                try:
                    self._upload_object(bucket_name, object_key, artifacts.value[0].value)

                    return InfoArtifact("uploaded successfully")
                except Exception as e:
                    return ErrorArtifact(f"error uploading objects to the bucket: {e}")
            else:
                try:
                    for a in artifacts.value:
                        self._upload_object(bucket_name, object_key, a.value)

                    return InfoArtifact("uploaded successfully")
                except Exception as e:
                    return ErrorArtifact(f"error uploading objects to the bucket: {e}")
        else:
            return ErrorArtifact("memory not found")

    @activity(
        config={
            "description": "Can be used to upload content to an AWS S3 bucket",
            "schema": Schema(
                {
                    "bucket_name": str,
                    Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                    "content": str,
                }
            ),
        }
    )
    def upload_content_to_s3(self, params: dict) -> ErrorArtifact | InfoArtifact:
        content = params["values"]["content"]
        bucket_name = params["values"]["bucket_name"]
        object_key = params["values"]["object_key"]

        try:
            self._upload_object(bucket_name, object_key, content)

            return InfoArtifact("uploaded successfully")
        except Exception as e:
            return ErrorArtifact(f"error uploading objects to the bucket: {e}")

    @activity(
        config={
            "description": "Can be used to download objects from AWS S3",
            "schema": Schema(
                {
                    Literal("objects", description="A list of bucket name and object key pairs to download"): [
                        {
                            Literal(
                                "bucket_name", description="The name of the bucket to download the object from"
                            ): str,
                            Literal(
                                "object_key", description="The name of the object key to download from the bucket"
                            ): str,
                        }
                    ]
                }
            ),
        }
    )
    def download_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
        objects = params["values"]["objects"]
        artifacts = []
        for object_info in objects:
            try:
                obj = self.s3_client.get_object(Bucket=object_info["bucket_name"], Key=object_info["object_key"])

                content = obj["Body"].read()
                artifacts.append(BlobArtifact(content, name=object_info["object_key"]))

            except Exception as e:
                return ErrorArtifact(f"error downloading objects from bucket: {e}")

        return ListArtifact(artifacts)

    def _upload_object(self, bucket_name: str, object_name: str, value: Any) -> None:
        self.s3_client.create_bucket(Bucket=bucket_name)

        self.s3_client.upload_fileobj(
            Fileobj=io.BytesIO(value.encode() if isinstance(value, str) else value), Bucket=bucket_name, Key=object_name
        )

s3_client: Client = field(default=Factory(lambda self: self.session.client('s3'), takes_self=True), kw_only=True) class-attribute instance-attribute

download_objects(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to download objects from AWS S3",
        "schema": Schema(
            {
                Literal("objects", description="A list of bucket name and object key pairs to download"): [
                    {
                        Literal(
                            "bucket_name", description="The name of the bucket to download the object from"
                        ): str,
                        Literal(
                            "object_key", description="The name of the object key to download from the bucket"
                        ): str,
                    }
                ]
            }
        ),
    }
)
def download_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
    objects = params["values"]["objects"]
    artifacts = []
    for object_info in objects:
        try:
            obj = self.s3_client.get_object(Bucket=object_info["bucket_name"], Key=object_info["object_key"])

            content = obj["Body"].read()
            artifacts.append(BlobArtifact(content, name=object_info["object_key"]))

        except Exception as e:
            return ErrorArtifact(f"error downloading objects from bucket: {e}")

    return ListArtifact(artifacts)

get_bucket_acl(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an access control list (ACL) of an AWS S3 bucket.",
        "schema": Schema(
            {
                Literal(
                    "bucket_name",
                    description="The bucket name that contains the object for which to get the ACL information.",
                ): str
            }
        ),
    }
)
def get_bucket_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        acl = self.s3_client.get_bucket_acl(Bucket=params["values"]["bucket_name"])
        return TextArtifact(acl)
    except Exception as e:
        return ErrorArtifact(f"error getting bucket acl: {e}")

get_bucket_policy(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an AWS S3 bucket policy.",
        "schema": Schema(
            {Literal("bucket_name", description="The bucket name for which to get the bucket policy."): str}
        ),
    }
)
def get_bucket_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        policy = self.s3_client.get_bucket_policy(Bucket=params["values"]["bucket_name"])
        return TextArtifact(policy)
    except Exception as e:
        return ErrorArtifact(f"error getting bucket policy: {e}")

get_object_acl(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an access control list (ACL) of an object in the AWS S3 bucket.",
        "schema": Schema(
            {
                Literal("bucket_name", description="Name of the AWS S3 bucket for which to get an ACL."): str,
                Literal("object_key", description="Key of the object for which to get the ACL information."): str,
            }
        ),
    }
)
def get_object_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        acl = self.s3_client.get_object_acl(
            Bucket=params["values"]["bucket_name"], Key=params["values"]["object_key"]
        )
        return TextArtifact(acl)
    except Exception as e:
        return ErrorArtifact(f"error getting object acl: {e}")

list_objects(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to list all objects in an AWS S3 bucket.",
        "schema": Schema({Literal("bucket_name", description="The name of the S3 bucket to list."): str}),
    }
)
def list_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
    try:
        objects = self.s3_client.list_objects_v2(Bucket=params["values"]["bucket_name"])

        return ListArtifact([TextArtifact(str(o)) for o in objects["Contents"]])
    except Exception as e:
        return ErrorArtifact(f"error listing objects in bucket: {e}")

list_s3_buckets(_)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(config={"description": "Can be used to list all AWS S3 buckets."})
def list_s3_buckets(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        buckets = self.s3_client.list_buckets()

        return ListArtifact([TextArtifact(str(b)) for b in buckets["Buckets"]])
    except Exception as e:
        return ErrorArtifact(f"error listing s3 buckets: {e}")

upload_content_to_s3(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to upload content to an AWS S3 bucket",
        "schema": Schema(
            {
                "bucket_name": str,
                Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                "content": str,
            }
        ),
    }
)
def upload_content_to_s3(self, params: dict) -> ErrorArtifact | InfoArtifact:
    content = params["values"]["content"]
    bucket_name = params["values"]["bucket_name"]
    object_key = params["values"]["object_key"]

    try:
        self._upload_object(bucket_name, object_key, content)

        return InfoArtifact("uploaded successfully")
    except Exception as e:
        return ErrorArtifact(f"error uploading objects to the bucket: {e}")

upload_memory_artifacts_to_s3(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to upload memory artifacts to an AWS S3 bucket",
        "schema": Schema(
            {
                "memory_name": str,
                "artifact_namespace": str,
                "bucket_name": str,
                Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
            }
        ),
    }
)
def upload_memory_artifacts_to_s3(self, params: dict) -> InfoArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    bucket_name = params["values"]["bucket_name"]
    object_key = params["values"]["object_key"]

    if memory:
        artifacts = memory.load_artifacts(artifact_namespace)

        if len(artifacts) == 0:
            return ErrorArtifact("no artifacts found")
        elif len(artifacts) == 1:
            try:
                self._upload_object(bucket_name, object_key, artifacts.value[0].value)

                return InfoArtifact("uploaded successfully")
            except Exception as e:
                return ErrorArtifact(f"error uploading objects to the bucket: {e}")
        else:
            try:
                for a in artifacts.value:
                    self._upload_object(bucket_name, object_key, a.value)

                return InfoArtifact("uploaded successfully")
            except Exception as e:
                return ErrorArtifact(f"error uploading objects to the bucket: {e}")
    else:
        return ErrorArtifact("memory not found")

BaseAwsClient

Bases: BaseTool, ABC

Source code in griptape/tools/base_aws_client.py
@define
class BaseAwsClient(BaseTool, ABC):
    session: boto3.Session = field(kw_only=True)

    @activity(config={"description": "Can be used to get current AWS account and IAM principal."})
    def get_current_aws_identity(self, params: dict) -> BaseArtifact:
        try:
            session = self.session
            sts = session.client("sts")
            return TextArtifact(str(sts.get_caller_identity()))
        except Exception as e:
            return ErrorArtifact(f"error getting current aws caller identity: {e}")

session: boto3.Session = field(kw_only=True) class-attribute instance-attribute

get_current_aws_identity(params)

Source code in griptape/tools/base_aws_client.py
@activity(config={"description": "Can be used to get current AWS account and IAM principal."})
def get_current_aws_identity(self, params: dict) -> BaseArtifact:
    try:
        session = self.session
        sts = session.client("sts")
        return TextArtifact(str(sts.get_caller_identity()))
    except Exception as e:
        return ErrorArtifact(f"error getting current aws caller identity: {e}")

BaseGoogleClient

Bases: BaseTool, ABC

Source code in griptape/tools/base_google_client.py
@define
class BaseGoogleClient(BaseTool, ABC):
    DRIVE_FILE_SCOPES = ["https://www.googleapis.com/auth/drive.file"]

    DRIVE_AUTH_SCOPES = ["https://www.googleapis.com/auth/drive"]

    service_account_credentials: dict = field(kw_only=True)

    def _build_client(self, scopes: list[str], service_name: str, version: str, owner_email: str) -> Any:
        from google.oauth2 import service_account  # pyright: ignore
        from googleapiclient.discovery import build  # pyright: ignore

        credentials = service_account.Credentials.from_service_account_info(
            self.service_account_credentials, scopes=scopes
        )

        return build(serviceName=service_name, version=version, credentials=credentials.with_subject(owner_email))

    def _convert_path_to_file_id(self, service: Any, path: str) -> Optional[str]:
        parts = path.split("/")
        current_id = "root"

        for idx, part in enumerate(parts):
            if idx == len(parts) - 1:
                query = f"name='{part}' and '{current_id}' in parents"
            else:
                query = f"name='{part}' and '{current_id}' in parents and mimeType='application/vnd.google-apps.folder'"

            response = service.files().list(q=query).execute()
            files = response.get("files", [])

            if not files:
                if idx != len(parts) - 1:
                    folder_metadata = {
                        "name": part,
                        "mimeType": "application/vnd.google-apps.folder",
                        "parents": [current_id],
                    }
                    folder = service.files().create(body=folder_metadata, fields="id").execute()
                    current_id = folder.get("id")
                else:
                    current_id = None
            else:
                current_id = files[0]["id"]

        return current_id

DRIVE_AUTH_SCOPES = ['https://www.googleapis.com/auth/drive'] class-attribute instance-attribute

DRIVE_FILE_SCOPES = ['https://www.googleapis.com/auth/drive.file'] class-attribute instance-attribute

service_account_credentials: dict = field(kw_only=True) class-attribute instance-attribute

BaseTool

Bases: ActivityMixin, ABC

Abstract class for all tools to inherit from for.

Attributes:

Name Type Description
name str

Tool name.

input_memory Optional[list[TaskMemory]]

TaskMemory available in tool activities. Gets automatically set if None.

output_memory Optional[dict[str, list[TaskMemory]]]

TaskMemory that activities write to be default. Gets automatically set if None.

install_dependencies_on_init bool

Determines whether dependencies from the tool requirements.txt file are installed in init.

dependencies_install_directory Optional[str]

Custom dependency install directory.

verbose bool

Determines whether tool operations (such as dependency installation) should be verbose.

off_prompt bool

Determines whether tool activity output goes to the output memory.

Source code in griptape/tools/base_tool.py
@define
class BaseTool(ActivityMixin, ABC):
    """Abstract class for all tools to inherit from for.

    Attributes:
        name: Tool name.
        input_memory: TaskMemory available in tool activities. Gets automatically set if None.
        output_memory: TaskMemory that activities write to be default. Gets automatically set if None.
        install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init.
        dependencies_install_directory: Custom dependency install directory.
        verbose: Determines whether tool operations (such as dependency installation) should be verbose.
        off_prompt: Determines whether tool activity output goes to the output memory.
    """

    MANIFEST_FILE = "manifest.yml"
    REQUIREMENTS_FILE = "requirements.txt"

    name: str = field(default=Factory(lambda self: self.class_name, takes_self=True), kw_only=True)
    input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True)
    output_memory: Optional[dict[str, list[TaskMemory]]] = field(default=None, kw_only=True)
    install_dependencies_on_init: bool = field(default=True, kw_only=True)
    dependencies_install_directory: Optional[str] = field(default=None, kw_only=True)
    verbose: bool = field(default=False, kw_only=True)
    off_prompt: bool = field(default=True, kw_only=True)

    def __attrs_post_init__(self) -> None:
        if self.install_dependencies_on_init:
            self.install_dependencies(os.environ.copy())

    @output_memory.validator  # pyright: ignore
    def validate_output_memory(self, _, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
        if output_memory:
            for activity_name, memory_list in output_memory.items():
                if not self.find_activity(activity_name):
                    raise ValueError(f"activity {activity_name} doesn't exist")
                if memory_list is None:
                    raise ValueError(f"memory list for activity '{activity_name}' can't be None")

                output_memory_names = [memory.name for memory in memory_list]

                if len(output_memory_names) > len(set(output_memory_names)):
                    raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

    @property
    def class_name(self):
        return self.__class__.__name__

    @property
    def manifest_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.MANIFEST_FILE)

    @property
    def requirements_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE)

    @property
    def manifest(self) -> dict:
        with open(self.manifest_path) as yaml_file:
            return yaml.safe_load(yaml_file)

    @property
    def abs_file_path(self):
        return os.path.abspath(inspect.getfile(self.__class__))

    @property
    def abs_dir_path(self):
        return os.path.dirname(self.abs_file_path)

    # This method has to remain a method and can't be decorated with @property because
    # of the max depth recursion issue in `self.activities`.
    def schema(self) -> dict:
        full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

        return full_schema.json_schema(f"{self.name} Action Schema")

    def activity_schemas(self) -> list[Schema]:
        return [
            Schema(
                {
                    Literal("name"): self.name,
                    Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
                    **self.activity_to_input(
                        activity
                    ),  # Unpack the dictionary in order to only add the key-values if there are any
                }
            )
            for activity in self.activities()
        ]

    def execute(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> BaseArtifact:
        preprocessed_input = self.before_run(activity, subtask, action)
        output = self.run(activity, subtask, action, preprocessed_input)
        postprocessed_output = self.after_run(activity, subtask, action, output)

        return postprocessed_output

    def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> Optional[dict]:
        return action.input

    def run(
        self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: Optional[dict]
    ) -> BaseArtifact:
        activity_result = activity(value)

        if isinstance(activity_result, BaseArtifact):
            result = activity_result
        else:
            logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

            result = InfoArtifact(activity_result)

        return result

    def after_run(
        self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: BaseArtifact
    ) -> BaseArtifact:
        if value:
            if self.output_memory:
                output_memories = self.output_memory[getattr(activity, "name")] or []
                for memory in output_memories:
                    value = memory.process_output(activity, subtask, value)

                if isinstance(value, BaseArtifact):
                    return value
                else:
                    return TextArtifact(str(value))
            else:
                return value
        else:
            return InfoArtifact("Tool returned an empty value")

    def validate(self) -> bool:
        from griptape.utils import ManifestValidator

        if not os.path.exists(self.manifest_path):
            raise Exception(f"{self.MANIFEST_FILE} not found")

        if not os.path.exists(self.requirements_path):
            raise Exception(f"{self.REQUIREMENTS_FILE} not found")

        ManifestValidator().validate(self.manifest)

        return True

    def tool_dir(self):
        class_file = inspect.getfile(self.__class__)

        return os.path.dirname(os.path.abspath(class_file))

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        env = env if env else {}

        command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

        if self.dependencies_install_directory is None:
            command.extend(["-U"])
        else:
            command.extend(["-t", self.dependencies_install_directory])

        subprocess.run(
            command,
            env=env,
            cwd=self.tool_dir(),
            stdout=None if self.verbose else subprocess.DEVNULL,
            stderr=None if self.verbose else subprocess.DEVNULL,
        )

    def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
        if self.input_memory:
            return next((m for m in self.input_memory if m.name == memory_name), None)
        else:
            return None

MANIFEST_FILE = 'manifest.yml' class-attribute instance-attribute

REQUIREMENTS_FILE = 'requirements.txt' class-attribute instance-attribute

abs_dir_path property

abs_file_path property

class_name property

dependencies_install_directory: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True) class-attribute instance-attribute

install_dependencies_on_init: bool = field(default=True, kw_only=True) class-attribute instance-attribute

manifest: dict property

manifest_path: str property

name: str = field(default=Factory(lambda self: self.class_name, takes_self=True), kw_only=True) class-attribute instance-attribute

off_prompt: bool = field(default=True, kw_only=True) class-attribute instance-attribute

output_memory: Optional[dict[str, list[TaskMemory]]] = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_path: str property

verbose: bool = field(default=False, kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None:
    if self.install_dependencies_on_init:
        self.install_dependencies(os.environ.copy())

activity_schemas()

Source code in griptape/tools/base_tool.py
def activity_schemas(self) -> list[Schema]:
    return [
        Schema(
            {
                Literal("name"): self.name,
                Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
                **self.activity_to_input(
                    activity
                ),  # Unpack the dictionary in order to only add the key-values if there are any
            }
        )
        for activity in self.activities()
    ]

after_run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
def after_run(
    self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: BaseArtifact
) -> BaseArtifact:
    if value:
        if self.output_memory:
            output_memories = self.output_memory[getattr(activity, "name")] or []
            for memory in output_memories:
                value = memory.process_output(activity, subtask, value)

            if isinstance(value, BaseArtifact):
                return value
            else:
                return TextArtifact(str(value))
        else:
            return value
    else:
        return InfoArtifact("Tool returned an empty value")

before_run(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> Optional[dict]:
    return action.input

execute(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def execute(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> BaseArtifact:
    preprocessed_input = self.before_run(activity, subtask, action)
    output = self.run(activity, subtask, action, preprocessed_input)
    postprocessed_output = self.after_run(activity, subtask, action, output)

    return postprocessed_output

find_input_memory(memory_name)

Source code in griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
    if self.input_memory:
        return next((m for m in self.input_memory if m.name == memory_name), None)
    else:
        return None

install_dependencies(env=None)

Source code in griptape/tools/base_tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    env = env if env else {}

    command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

    if self.dependencies_install_directory is None:
        command.extend(["-U"])
    else:
        command.extend(["-t", self.dependencies_install_directory])

    subprocess.run(
        command,
        env=env,
        cwd=self.tool_dir(),
        stdout=None if self.verbose else subprocess.DEVNULL,
        stderr=None if self.verbose else subprocess.DEVNULL,
    )

run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
def run(
    self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: Optional[dict]
) -> BaseArtifact:
    activity_result = activity(value)

    if isinstance(activity_result, BaseArtifact):
        result = activity_result
    else:
        logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

        result = InfoArtifact(activity_result)

    return result

schema()

Source code in griptape/tools/base_tool.py
def schema(self) -> dict:
    full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

    return full_schema.json_schema(f"{self.name} Action Schema")

tool_dir()

Source code in griptape/tools/base_tool.py
def tool_dir(self):
    class_file = inspect.getfile(self.__class__)

    return os.path.dirname(os.path.abspath(class_file))

validate()

Source code in griptape/tools/base_tool.py
def validate(self) -> bool:
    from griptape.utils import ManifestValidator

    if not os.path.exists(self.manifest_path):
        raise Exception(f"{self.MANIFEST_FILE} not found")

    if not os.path.exists(self.requirements_path):
        raise Exception(f"{self.REQUIREMENTS_FILE} not found")

    ManifestValidator().validate(self.manifest)

    return True

validate_output_memory(_, output_memory)

Source code in griptape/tools/base_tool.py
@output_memory.validator  # pyright: ignore
def validate_output_memory(self, _, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
    if output_memory:
        for activity_name, memory_list in output_memory.items():
            if not self.find_activity(activity_name):
                raise ValueError(f"activity {activity_name} doesn't exist")
            if memory_list is None:
                raise ValueError(f"memory list for activity '{activity_name}' can't be None")

            output_memory_names = [memory.name for memory in memory_list]

            if len(output_memory_names) > len(set(output_memory_names)):
                raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

Calculator

Bases: BaseTool

Source code in griptape/tools/calculator/tool.py
class Calculator(BaseTool):
    @activity(
        config={
            "description": "Can be used for computing simple numerical or algebraic calculations in Python",
            "schema": Schema(
                {
                    Literal(
                        "expression",
                        description="Arithmetic expression parsable in pure Python. Single line only. "
                        "Don't use variables. Don't use any imports or external libraries",
                    ): str
                }
            ),
        }
    )
    def calculate(self, params: dict) -> BaseArtifact:
        import numexpr  # type: ignore

        try:
            expression = params["values"]["expression"]

            return TextArtifact(numexpr.evaluate(expression))
        except Exception as e:
            return ErrorArtifact(f"error calculating: {e}")

calculate(params)

Source code in griptape/tools/calculator/tool.py
@activity(
    config={
        "description": "Can be used for computing simple numerical or algebraic calculations in Python",
        "schema": Schema(
            {
                Literal(
                    "expression",
                    description="Arithmetic expression parsable in pure Python. Single line only. "
                    "Don't use variables. Don't use any imports or external libraries",
                ): str
            }
        ),
    }
)
def calculate(self, params: dict) -> BaseArtifact:
    import numexpr  # type: ignore

    try:
        expression = params["values"]["expression"]

        return TextArtifact(numexpr.evaluate(expression))
    except Exception as e:
        return ErrorArtifact(f"error calculating: {e}")

Computer

Bases: BaseTool

Source code in griptape/tools/computer/tool.py
@define
class Computer(BaseTool):
    local_workdir: Optional[str] = field(default=None, kw_only=True)
    container_workdir: str = field(default="/griptape", kw_only=True)
    env_vars: dict = field(factory=dict, kw_only=True)
    dockerfile_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True),
        kw_only=True,
    )
    requirements_txt_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True),
        kw_only=True,
    )
    docker_client: DockerClient = field(
        default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True
    )

    _tempdir: Optional[tempfile.TemporaryDirectory] = field(default=None, kw_only=True)

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()

        if self.local_workdir:
            Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
        else:
            self._tempdir = tempfile.TemporaryDirectory()
            self.local_workdir = self._tempdir.name

    @docker_client.validator  # pyright: ignore
    def validate_docker_client(self, _, docker_client: DockerClient) -> None:
        if not docker_client:
            raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        super().install_dependencies(env)

        self.remove_existing_container(self.container_name(self))
        self.build_image(self)

    @activity(
        config={
            "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
            " files in the file system. If you need to use code output use `print` statements. "
            "You have access to the following external Python libraries: "
            "{{ _self.dependencies() }}",
            "schema": Schema(
                {
                    Literal("code", description="Python code to execute"): str,
                    Literal(
                        "filename", description="name of the file to put the Python code in before executing it"
                    ): str,
                }
            ),
        }
    )
    def execute_code(self, params: dict) -> BaseArtifact:
        code = params["values"]["code"]
        filename = params["values"]["filename"]

        return self.execute_code_in_container(filename, code)

    @activity(
        config={
            "description": "Can be used to execute shell commands in Linux",
            "schema": Schema({Literal("command", description="shell command to execute"): str}),
        }
    )
    def execute_command(self, params: dict) -> BaseArtifact:
        command = params["values"]["command"]

        return self.execute_command_in_container(command)

    def execute_command_in_container(self, command: str) -> BaseArtifact:
        try:
            binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}}

            container = self.docker_client.containers.run(
                self.image_name(self),
                environment=self.env_vars,
                command=command,
                name=self.container_name(self),
                volumes=binds,
                stdout=True,
                stderr=True,
                detach=True,
            )

            if isinstance(container, Container):
                container.wait()

                stderr = container.logs(stdout=False, stderr=True).decode().strip()
                stdout = container.logs(stdout=True, stderr=False).decode().strip()

                container.stop()
                container.remove()

                if stderr:
                    return ErrorArtifact(stderr)
                else:
                    return TextArtifact(stdout)
            else:
                return ErrorArtifact("error running container")
        except Exception as e:
            return ErrorArtifact(f"error executing command: {e}")

    def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
        container_file_path = os.path.join(self.container_workdir, filename)

        if self.local_workdir:
            tempdir = None
            local_workdir = self.local_workdir
        else:
            tempdir = tempfile.TemporaryDirectory()
            local_workdir = tempdir.name

        local_file_path = os.path.join(local_workdir, filename)

        try:
            with open(local_file_path, "w") as f:
                f.write(code)

            return self.execute_command_in_container(f"python {container_file_path}")
        except Exception as e:
            return ErrorArtifact(f"error executing code: {e}")
        finally:
            if tempdir:
                tempdir.cleanup()

    def default_docker_client(self) -> Optional[DockerClient]:
        try:
            return docker.from_env()
        except Exception as e:
            logging.error(e)

            return None

    def image_name(self, tool: BaseTool) -> str:
        return f"{stringcase.snakecase(tool.name)}_image"

    def container_name(self, tool: BaseTool) -> str:
        return f"{stringcase.snakecase(tool.name)}_container"

    def remove_existing_container(self, name: str) -> None:
        try:
            existing_container = self.docker_client.containers.get(name)
            if isinstance(existing_container, Container):
                existing_container.remove(force=True)

                logging.info(f"Removed existing container: {name}")
        except NotFound:
            pass

    def build_image(self, tool: BaseTool) -> None:
        with tempfile.TemporaryDirectory() as temp_dir:
            shutil.copy(self.dockerfile_path, temp_dir)
            shutil.copy(self.requirements_txt_path, temp_dir)

            image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

            if isinstance(image, tuple):
                logging.info(f"Built image: {image[0].short_id}")

    def dependencies(self) -> list[str]:
        with open(self.requirements_txt_path) as file:
            return [line.strip() for line in file.readlines()]

    def __del__(self) -> None:
        if self._tempdir:
            self._tempdir.cleanup()

container_workdir: str = field(default='/griptape', kw_only=True) class-attribute instance-attribute

docker_client: DockerClient = field(default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True) class-attribute instance-attribute

dockerfile_path: str = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True) class-attribute instance-attribute

env_vars: dict = field(factory=dict, kw_only=True) class-attribute instance-attribute

local_workdir: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_txt_path: str = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()

    if self.local_workdir:
        Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
    else:
        self._tempdir = tempfile.TemporaryDirectory()
        self.local_workdir = self._tempdir.name

__del__()

Source code in griptape/tools/computer/tool.py
def __del__(self) -> None:
    if self._tempdir:
        self._tempdir.cleanup()

build_image(tool)

Source code in griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None:
    with tempfile.TemporaryDirectory() as temp_dir:
        shutil.copy(self.dockerfile_path, temp_dir)
        shutil.copy(self.requirements_txt_path, temp_dir)

        image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

        if isinstance(image, tuple):
            logging.info(f"Built image: {image[0].short_id}")

container_name(tool)

Source code in griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str:
    return f"{stringcase.snakecase(tool.name)}_container"

default_docker_client()

Source code in griptape/tools/computer/tool.py
def default_docker_client(self) -> Optional[DockerClient]:
    try:
        return docker.from_env()
    except Exception as e:
        logging.error(e)

        return None

dependencies()

Source code in griptape/tools/computer/tool.py
def dependencies(self) -> list[str]:
    with open(self.requirements_txt_path) as file:
        return [line.strip() for line in file.readlines()]

execute_code(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
        " files in the file system. If you need to use code output use `print` statements. "
        "You have access to the following external Python libraries: "
        "{{ _self.dependencies() }}",
        "schema": Schema(
            {
                Literal("code", description="Python code to execute"): str,
                Literal(
                    "filename", description="name of the file to put the Python code in before executing it"
                ): str,
            }
        ),
    }
)
def execute_code(self, params: dict) -> BaseArtifact:
    code = params["values"]["code"]
    filename = params["values"]["filename"]

    return self.execute_code_in_container(filename, code)

execute_code_in_container(filename, code)

Source code in griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
    container_file_path = os.path.join(self.container_workdir, filename)

    if self.local_workdir:
        tempdir = None
        local_workdir = self.local_workdir
    else:
        tempdir = tempfile.TemporaryDirectory()
        local_workdir = tempdir.name

    local_file_path = os.path.join(local_workdir, filename)

    try:
        with open(local_file_path, "w") as f:
            f.write(code)

        return self.execute_command_in_container(f"python {container_file_path}")
    except Exception as e:
        return ErrorArtifact(f"error executing code: {e}")
    finally:
        if tempdir:
            tempdir.cleanup()

execute_command(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute shell commands in Linux",
        "schema": Schema({Literal("command", description="shell command to execute"): str}),
    }
)
def execute_command(self, params: dict) -> BaseArtifact:
    command = params["values"]["command"]

    return self.execute_command_in_container(command)

execute_command_in_container(command)

Source code in griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact:
    try:
        binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}}

        container = self.docker_client.containers.run(
            self.image_name(self),
            environment=self.env_vars,
            command=command,
            name=self.container_name(self),
            volumes=binds,
            stdout=True,
            stderr=True,
            detach=True,
        )

        if isinstance(container, Container):
            container.wait()

            stderr = container.logs(stdout=False, stderr=True).decode().strip()
            stdout = container.logs(stdout=True, stderr=False).decode().strip()

            container.stop()
            container.remove()

            if stderr:
                return ErrorArtifact(stderr)
            else:
                return TextArtifact(stdout)
        else:
            return ErrorArtifact("error running container")
    except Exception as e:
        return ErrorArtifact(f"error executing command: {e}")

image_name(tool)

Source code in griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str:
    return f"{stringcase.snakecase(tool.name)}_image"

install_dependencies(env=None)

Source code in griptape/tools/computer/tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    super().install_dependencies(env)

    self.remove_existing_container(self.container_name(self))
    self.build_image(self)

remove_existing_container(name)

Source code in griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None:
    try:
        existing_container = self.docker_client.containers.get(name)
        if isinstance(existing_container, Container):
            existing_container.remove(force=True)

            logging.info(f"Removed existing container: {name}")
    except NotFound:
        pass

validate_docker_client(_, docker_client)

Source code in griptape/tools/computer/tool.py
@docker_client.validator  # pyright: ignore
def validate_docker_client(self, _, docker_client: DockerClient) -> None:
    if not docker_client:
        raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

DateTime

Bases: BaseTool

Source code in griptape/tools/date_time/tool.py
class DateTime(BaseTool):
    @activity(config={"description": "Can be used to return current date and time."})
    def get_current_datetime(self, _: dict) -> BaseArtifact:
        try:
            current_datetime = datetime.now()

            return TextArtifact(str(current_datetime))
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

    @activity(
        config={
            "description": "Can be used to return a relative date and time.",
            "schema": Schema(
                {
                    Literal(
                        "relative_date_string",
                        description='Relative date in English. For example, "now EST", "20 minutes ago", '
                        '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                    ): str
                }
            ),
        }
    )
    def get_relative_datetime(self, params: dict) -> BaseArtifact:
        from dateparser import parse

        try:
            date_string = params["values"]["relative_date_string"]
            relative_datetime = parse(date_string)

            if relative_datetime:
                return TextArtifact(str(relative_datetime))
            else:
                return ErrorArtifact("invalid date string")
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

get_current_datetime(_)

Source code in griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."})
def get_current_datetime(self, _: dict) -> BaseArtifact:
    try:
        current_datetime = datetime.now()

        return TextArtifact(str(current_datetime))
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

get_relative_datetime(params)

Source code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to return a relative date and time.",
        "schema": Schema(
            {
                Literal(
                    "relative_date_string",
                    description='Relative date in English. For example, "now EST", "20 minutes ago", '
                    '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                ): str
            }
        ),
    }
)
def get_relative_datetime(self, params: dict) -> BaseArtifact:
    from dateparser import parse

    try:
        date_string = params["values"]["relative_date_string"]
        relative_datetime = parse(date_string)

        if relative_datetime:
            return TextArtifact(str(relative_datetime))
        else:
            return ErrorArtifact("invalid date string")
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

EmailClient

Bases: BaseTool

Tool for working with email

Attributes:

Name Type Description
username Optional[str]

Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com.

password Optional[str]

Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password.

email_max_retrieve_count Optional[int]

Used to limit the number of messages retrieved during any given activities.

smtp_host Optional[str]

Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the send activity.

smtp_port Optional[int]

Port of the SMTP server. Example: 465. Required when using the send activity.

smtp_use_ssl bool

Whether to use SSL when sending email via the SMTP protocol.

smtp_user Optional[str]

Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the send activity.

smtp_password Optional[str]

Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the send activity.

imap_url Optional[str]

Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the retrieve activity.

imap_user Optional[str]

Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the retrieve activity.

imap_password Optional[str]

Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the retrieve activity.

mailboxes dict[str, Optional[str]]

Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the retrieve activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}

email_loader EmailLoader

Instance of EmailLoader.

Source code in griptape/tools/email_client/tool.py
@define
class EmailClient(BaseTool):
    """Tool for working with email

    Attributes:
        username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol.
            Example: bender@futurama.com.
        password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail,
            this would be an App Password.
        email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities.
        smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the `send` activity.
        smtp_port: Port of the SMTP server. Example: 465. Required when using the `send` activity.
        smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol.
        smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the `send` activity.
        smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the `send` activity.
        imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the `retrieve` activity.
        imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the `retrieve` activity.
        imap_password: Password to retrieve email via the IMAP protocol.  Overrides password for IMAP only. Required when using the `retrieve` activity.
        mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the `retrieve` activity.
            Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}
        email_loader: Instance of `EmailLoader`.
    """

    username: Optional[str] = field(default=None, kw_only=True)
    password: Optional[str] = field(default=None, kw_only=True)
    email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True)
    smtp_host: Optional[str] = field(default=None, kw_only=True)
    smtp_port: Optional[int] = field(default=None, kw_only=True)
    smtp_use_ssl: bool = field(default=True, kw_only=True)
    smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    imap_url: Optional[str] = field(default=None, kw_only=True)
    imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    mailboxes: dict[str, Optional[str]] = field(default=None, kw_only=True)
    email_loader: EmailLoader = field(
        default=Factory(
            lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password),
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to retrieve emails."
            "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
            "schema": Schema(
                {
                    Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                    schema.Optional(
                        Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'")
                    ): str,
                    schema.Optional(
                        Literal("search_criteria", description="Optional search criteria to filter emails by key")
                    ): str,
                    schema.Optional(Literal("max_count", description="Optional max email count")): int,
                }
            ),
        }
    )
    def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
        if self.mailboxes is None:
            return ErrorArtifact("mailboxes is required")

        values = params["values"]
        max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

        return self.email_loader.load(
            EmailLoader.EmailQuery(
                label=values["label"],
                key=values.get("key"),
                search_criteria=values.get("search_criteria"),
                max_count=max_count,
            )
        )

    @activity(
        config={
            "description": "Can be used to send emails",
            "schema": Schema(
                {
                    Literal("to", description="Recipient's email address"): str,
                    Literal("subject", description="Email subject"): str,
                    Literal("body", description="Email body"): str,
                }
            ),
        }
    )
    def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        if self.smtp_user is None:
            return ErrorArtifact("smtp_user is required")
        if self.smtp_password is None:
            return ErrorArtifact("smtp_password is required")
        if self.smtp_host is None:
            return ErrorArtifact("smtp_host is required")
        if self.smtp_port is None:
            return ErrorArtifact("smtp_port is required")

        msg = MIMEText(values["body"])
        msg["Subject"] = values["subject"]
        msg["From"] = self.smtp_user
        msg["To"] = values["to"]

        try:
            with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
                client.login(self.smtp_user, self.smtp_password)
                client.sendmail(msg["From"], [msg["To"]], msg.as_string())
                return InfoArtifact("email was successfully sent")
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error sending email: {e}")

    def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL:
        if self.smtp_use_ssl:
            return smtplib.SMTP_SSL(smtp_host, smtp_port)
        else:
            return smtplib.SMTP(smtp_host, smtp_port)

email_loader: EmailLoader = field(default=Factory(lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True) class-attribute instance-attribute

email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute

imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

imap_url: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

mailboxes: dict[str, Optional[str]] = field(default=None, kw_only=True) class-attribute instance-attribute

password: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_host: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

smtp_port: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_use_ssl: bool = field(default=True, kw_only=True) class-attribute instance-attribute

smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

username: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

retrieve(params)

Source code in griptape/tools/email_client/tool.py
@activity(
    config={
        "description": "Can be used to retrieve emails."
        "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
        "schema": Schema(
            {
                Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                schema.Optional(
                    Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'")
                ): str,
                schema.Optional(
                    Literal("search_criteria", description="Optional search criteria to filter emails by key")
                ): str,
                schema.Optional(Literal("max_count", description="Optional max email count")): int,
            }
        ),
    }
)
def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
    if self.mailboxes is None:
        return ErrorArtifact("mailboxes is required")

    values = params["values"]
    max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

    return self.email_loader.load(
        EmailLoader.EmailQuery(
            label=values["label"],
            key=values.get("key"),
            search_criteria=values.get("search_criteria"),
            max_count=max_count,
        )
    )

send(params)

Source code in griptape/tools/email_client/tool.py
@activity(
    config={
        "description": "Can be used to send emails",
        "schema": Schema(
            {
                Literal("to", description="Recipient's email address"): str,
                Literal("subject", description="Email subject"): str,
                Literal("body", description="Email body"): str,
            }
        ),
    }
)
def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    if self.smtp_user is None:
        return ErrorArtifact("smtp_user is required")
    if self.smtp_password is None:
        return ErrorArtifact("smtp_password is required")
    if self.smtp_host is None:
        return ErrorArtifact("smtp_host is required")
    if self.smtp_port is None:
        return ErrorArtifact("smtp_port is required")

    msg = MIMEText(values["body"])
    msg["Subject"] = values["subject"]
    msg["From"] = self.smtp_user
    msg["To"] = values["to"]

    try:
        with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
            client.login(self.smtp_user, self.smtp_password)
            client.sendmail(msg["From"], [msg["To"]], msg.as_string())
            return InfoArtifact("email was successfully sent")
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error sending email: {e}")

FileManager

Bases: BaseTool

FileManager is a tool that can be used to list, load, and save files.

Attributes:

Name Type Description
file_manager_driver BaseFileManagerDriver

File Manager Driver to use to list, load, and save files.

Source code in griptape/tools/file_manager/tool.py
@define
class FileManager(BaseTool):
    """
    FileManager is a tool that can be used to list, load, and save files.

    Attributes:
        file_manager_driver: File Manager Driver to use to list, load, and save files.
    """

    file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True)

    @activity(
        config={
            "description": "Can be used to list files on disk",
            "schema": Schema(
                {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}
            ),
        }
    )
    def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        path = params["values"]["path"]
        return self.file_manager_driver.list_files(path)

    @activity(
        config={
            "description": "Can be used to load files from disk",
            "schema": Schema(
                {
                    Literal(
                        "paths",
                        description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                    ): list
                }
            ),
        }
    )
    def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
        paths = params["values"]["paths"]
        artifacts = []

        for path in paths:
            artifact = self.file_manager_driver.load_file(path)
            if isinstance(artifact, ListArtifact):
                artifacts.extend(artifact.value)
            else:
                artifacts.append(artifact)
        return ListArtifact(artifacts)

    @activity(
        config={
            "description": "Can be used to save memory artifacts to disk",
            "schema": Schema(
                {
                    Literal(
                        "dir_name",
                        description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                    ): str,
                    Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                }
            ),
        }
    )
    def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
        dir_name = params["values"]["dir_name"]
        file_name = params["values"]["file_name"]
        memory_name = params["values"]["memory_name"]
        artifact_namespace = params["values"]["artifact_namespace"]

        memory = self.find_input_memory(params["values"]["memory_name"])
        if not memory:
            return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

        list_artifact = memory.load_artifacts(artifact_namespace)

        if len(list_artifact) == 0:
            return ErrorArtifact(
                f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts"
            )

        for artifact in list_artifact.value:
            formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
            result = self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), artifact.value)
            if isinstance(result, ErrorArtifact):
                return result

        return InfoArtifact("Successfully saved memory artifacts to disk")

    @activity(
        config={
            "description": "Can be used to save content to a file",
            "schema": Schema(
                {
                    Literal(
                        "path",
                        description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                    ): str,
                    "content": str,
                }
            ),
        }
    )
    def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
        path = params["values"]["path"]
        content = params["values"]["content"]
        return self.file_manager_driver.save_file(path, content)

file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) class-attribute instance-attribute

list_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to list files on disk",
        "schema": Schema(
            {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}
        ),
    }
)
def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    path = params["values"]["path"]
    return self.file_manager_driver.list_files(path)

load_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to load files from disk",
        "schema": Schema(
            {
                Literal(
                    "paths",
                    description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                ): list
            }
        ),
    }
)
def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
    paths = params["values"]["paths"]
    artifacts = []

    for path in paths:
        artifact = self.file_manager_driver.load_file(path)
        if isinstance(artifact, ListArtifact):
            artifacts.extend(artifact.value)
        else:
            artifacts.append(artifact)
    return ListArtifact(artifacts)

save_content_to_file(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save content to a file",
        "schema": Schema(
            {
                Literal(
                    "path",
                    description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                ): str,
                "content": str,
            }
        ),
    }
)
def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
    path = params["values"]["path"]
    content = params["values"]["content"]
    return self.file_manager_driver.save_file(path, content)

save_memory_artifacts_to_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save memory artifacts to disk",
        "schema": Schema(
            {
                Literal(
                    "dir_name",
                    description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                ): str,
                Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                "memory_name": str,
                "artifact_namespace": str,
            }
        ),
    }
)
def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
    dir_name = params["values"]["dir_name"]
    file_name = params["values"]["file_name"]
    memory_name = params["values"]["memory_name"]
    artifact_namespace = params["values"]["artifact_namespace"]

    memory = self.find_input_memory(params["values"]["memory_name"])
    if not memory:
        return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

    list_artifact = memory.load_artifacts(artifact_namespace)

    if len(list_artifact) == 0:
        return ErrorArtifact(
            f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts"
        )

    for artifact in list_artifact.value:
        formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
        result = self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), artifact.value)
        if isinstance(result, ErrorArtifact):
            return result

    return InfoArtifact("Successfully saved memory artifacts to disk")

GoogleCalendarClient

Bases: BaseGoogleClient

Source code in griptape/tools/google_cal/tool.py
@define
class GoogleCalendarClient(BaseGoogleClient):
    CREATE_EVENT_SCOPES = ["https://www.googleapis.com/auth/calendar"]

    GET_UPCOMING_EVENTS_SCOPES = ["https://www.googleapis.com/auth/calendar"]

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to get upcoming events from a google calendar",
            "schema": Schema(
                {
                    Literal("calendar_id", description="id of the google calendar such as 'primary'"): str,
                    Literal("max_events", description="maximum number of events to return"): int,
                }
            ),
        }
    )
    def get_upcoming_events(self, params: dict) -> ListArtifact | ErrorArtifact:
        values = params["values"]

        try:
            service = self._build_client(
                scopes=self.GET_UPCOMING_EVENTS_SCOPES,
                service_name="calendar",
                version="v3",
                owner_email=self.owner_email,
            )
            now = datetime.datetime.utcnow().isoformat() + "Z"

            events_result = (
                service.events()
                .list(
                    calendarId=values["calendar_id"],
                    timeMin=now,
                    maxResults=values["max_events"],
                    singleEvents=True,
                    orderBy="startTime",
                )
                .execute()
            )
            events = events_result.get("items", [])

            return ListArtifact([TextArtifact(str(e)) for e in events])
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error retrieving calendar events {e}")

    @activity(
        config={
            "description": "Can be used to create an event on a google calendar",
            "schema": Schema(
                {
                    Literal(
                        "start_datetime",
                        description="combined date-time value in string format according to RFC3399 "
                        "excluding the timezone for when the meeting starts",
                    ): str,
                    Literal(
                        "start_time_zone",
                        description="time zone in which the start time is specified in string format "
                        "according to IANA time zone data base name, such as 'Europe/Zurich'",
                    ): str,
                    Literal(
                        "end_datetime",
                        description="combined date-time value in string format according to RFC3399 "
                        "excluding the timezone for when the meeting ends",
                    ): str,
                    Literal(
                        "end_time_zone",
                        description="time zone in which the end time is specified in string format "
                        "according to IANA time zone data base name, such as 'Europe/Zurich'",
                    ): str,
                    Literal("title", description="title of the event"): str,
                    Literal("description", description="description of the event"): str,
                    Literal(
                        "attendees", description="list of the email addresses of attendees using 'email' as key"
                    ): list,
                    Optional(Literal("location", description="location of the event")): str,
                }
            ),
        }
    )
    def create_event(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]

        try:
            service = self._build_client(
                scopes=self.CREATE_EVENT_SCOPES, service_name="calendar", version="v3", owner_email=self.owner_email
            )

            event = {
                "summary": values["title"],
                "location": values.get("location"),
                "description": values["description"],
                "start": {"dateTime": values["start_datetime"], "timeZone": values["start_time_zone"]},
                "end": {"dateTime": values["end_datetime"], "timeZone": values["end_time_zone"]},
                "attendees": values["attendees"],
            }
            event = service.events().insert(calendarId="primary", body=event).execute()
            return InfoArtifact(f'A calendar event was successfully created. (Link:{event.get("htmlLink")})')
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error creating calendar event: {e}")

CREATE_EVENT_SCOPES = ['https://www.googleapis.com/auth/calendar'] class-attribute instance-attribute

GET_UPCOMING_EVENTS_SCOPES = ['https://www.googleapis.com/auth/calendar'] class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

create_event(params)

Source code in griptape/tools/google_cal/tool.py
@activity(
    config={
        "description": "Can be used to create an event on a google calendar",
        "schema": Schema(
            {
                Literal(
                    "start_datetime",
                    description="combined date-time value in string format according to RFC3399 "
                    "excluding the timezone for when the meeting starts",
                ): str,
                Literal(
                    "start_time_zone",
                    description="time zone in which the start time is specified in string format "
                    "according to IANA time zone data base name, such as 'Europe/Zurich'",
                ): str,
                Literal(
                    "end_datetime",
                    description="combined date-time value in string format according to RFC3399 "
                    "excluding the timezone for when the meeting ends",
                ): str,
                Literal(
                    "end_time_zone",
                    description="time zone in which the end time is specified in string format "
                    "according to IANA time zone data base name, such as 'Europe/Zurich'",
                ): str,
                Literal("title", description="title of the event"): str,
                Literal("description", description="description of the event"): str,
                Literal(
                    "attendees", description="list of the email addresses of attendees using 'email' as key"
                ): list,
                Optional(Literal("location", description="location of the event")): str,
            }
        ),
    }
)
def create_event(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]

    try:
        service = self._build_client(
            scopes=self.CREATE_EVENT_SCOPES, service_name="calendar", version="v3", owner_email=self.owner_email
        )

        event = {
            "summary": values["title"],
            "location": values.get("location"),
            "description": values["description"],
            "start": {"dateTime": values["start_datetime"], "timeZone": values["start_time_zone"]},
            "end": {"dateTime": values["end_datetime"], "timeZone": values["end_time_zone"]},
            "attendees": values["attendees"],
        }
        event = service.events().insert(calendarId="primary", body=event).execute()
        return InfoArtifact(f'A calendar event was successfully created. (Link:{event.get("htmlLink")})')
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error creating calendar event: {e}")

get_upcoming_events(params)

Source code in griptape/tools/google_cal/tool.py
@activity(
    config={
        "description": "Can be used to get upcoming events from a google calendar",
        "schema": Schema(
            {
                Literal("calendar_id", description="id of the google calendar such as 'primary'"): str,
                Literal("max_events", description="maximum number of events to return"): int,
            }
        ),
    }
)
def get_upcoming_events(self, params: dict) -> ListArtifact | ErrorArtifact:
    values = params["values"]

    try:
        service = self._build_client(
            scopes=self.GET_UPCOMING_EVENTS_SCOPES,
            service_name="calendar",
            version="v3",
            owner_email=self.owner_email,
        )
        now = datetime.datetime.utcnow().isoformat() + "Z"

        events_result = (
            service.events()
            .list(
                calendarId=values["calendar_id"],
                timeMin=now,
                maxResults=values["max_events"],
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )
        events = events_result.get("items", [])

        return ListArtifact([TextArtifact(str(e)) for e in events])
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error retrieving calendar events {e}")

GoogleDocsClient

Bases: BaseGoogleClient

Source code in griptape/tools/google_docs/tool.py
@define
class GoogleDocsClient(BaseGoogleClient):
    DOCS_SCOPES = ["https://www.googleapis.com/auth/documents"]

    DEFAULT_FOLDER_PATH = "root"

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to append text to a Google Doc.",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Destination file path of Google Doc in the POSIX format. "
                        "For example, 'foo/bar/baz.txt'",
                    ): str,
                    Literal("text", description="Text to be appended to the Google Doc."): str,
                }
            ),
        }
    )
    def append_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        text = values.get("text")

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            document_id = self._convert_path_to_file_id(drive_service, file_path)
            if document_id:
                doc = docs_service.documents().get(documentId=document_id).execute()
                content = doc["body"]["content"]
                last_text = content[-1]["paragraph"]["elements"][-1]["textRun"]["content"]
                append_index = content[-1]["endIndex"]
                if last_text.endswith("\n"):
                    append_index -= 1

                requests = [{"insertText": {"location": {"index": append_index}, "text": text}}]

                docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
                return InfoArtifact("text appended successfully")
            else:
                return ErrorArtifact(f"error appending to Google Doc, file not found for path {file_path}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error appending text to Google Doc with path {file_path}: {e}")

    @activity(
        config={
            "description": "Can be used to prepend text to a Google Doc",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Destination file path of Google Doc in the POSIX format. "
                        "For example, 'foo/bar/baz.txt'",
                    ): str,
                    Literal("text", description="Text to be prepended to the Google Doc."): str,
                }
            ),
        }
    )
    def prepend_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        text = values.get("text")

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            document_id = self._convert_path_to_file_id(drive_service, file_path)
            if document_id:
                doc = docs_service.documents().get(documentId=document_id).execute()

                if len(doc["body"]["content"]) == 1:
                    requests = [{"insertText": {"location": {"index": 1}, "text": text}}]
                else:
                    start_index = doc["body"]["content"][1]["startIndex"]
                    requests = [{"insertText": {"location": {"index": start_index}, "text": text}}]

                docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
                return InfoArtifact("text prepended successfully")
            else:
                return ErrorArtifact(f"error prepending to google doc, file not found for path {file_path}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error prepending text to Google Doc with path {file_path}: {e}")

    @activity(
        config={
            "description": "Can be used to create a new Google Doc and optionally save content to it.",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Name of the file to be created, which will be used to save content in.",
                    ): str,
                    Optional("content", default=None, description="Optional content to be saved in Google Doc."): str,
                    Optional(
                        "folder_path",
                        default=DEFAULT_FOLDER_PATH,
                        description="Path of the folder where the Google doc will be created.",
                    ): str,
                }
            ),
        }
    )
    def save_content_to_google_doc(self, params: dict) -> ErrorArtifact | InfoArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        content = values.get("content")
        folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            body = {"title": file_path}

            doc = docs_service.documents().create(body=body).execute()
            doc_id = doc["documentId"]

            if folder_path.lower() != self.DEFAULT_FOLDER_PATH:
                folder_id = self._convert_path_to_file_id(drive_service, folder_path)
                if folder_id:
                    drive_service.files().update(fileId=doc_id, addParents=folder_id, fields="id, parents").execute()
                else:
                    return ErrorArtifact(f"Error: Folder not found for path {folder_path}")

            if content:
                save_content_params = {"document_id": doc_id, "content": content}
                saved_document_id = self._save_to_doc(save_content_params)
                return InfoArtifact(f"Content has been successfully saved to Google Doc with ID: {saved_document_id}.")
            else:
                return InfoArtifact(f"Google Doc '{file_path}' created with ID: {doc_id}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"Error creating/saving Google Doc: {e}")

    @activity(
        config={
            "description": "Can be used to load content from memory and save it to a new Google Doc "
            "in the specified folder.",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    "file_name": str,
                    Optional(
                        "folder_path",
                        description="Path of the folder where the Google Doc should be saved.",
                        default=DEFAULT_FOLDER_PATH,
                    ): str,
                }
            ),
        }
    )
    def save_memory_artifacts_to_google_docs(self, params: dict) -> ErrorArtifact | InfoArtifact:
        values = params["values"]
        memory = self.find_input_memory(values["memory_name"])

        if memory:
            artifacts = memory.load_artifacts(values["artifact_namespace"])

            if artifacts:
                try:
                    file_path = values["file_name"]
                    content = "\n".join([a.value for a in artifacts])

                    save_params = {
                        "file_path": file_path,
                        "content": content,
                        "folder_path": values.get("folder_path", self.DEFAULT_FOLDER_PATH),
                    }

                    return self.save_content_to_google_doc(save_params)

                except Exception as e:
                    return ErrorArtifact(f"Error: {e}")

            else:
                return ErrorArtifact("no artifacts found")
        else:
            return ErrorArtifact("memory not found")

    def _save_to_doc(self, params: dict) -> str:
        service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )

        requests = [{"insertText": {"location": {"index": 1}, "text": params["content"]}}]
        service.documents().batchUpdate(documentId=params["document_id"], body={"requests": requests}).execute()
        return params["document_id"]

DEFAULT_FOLDER_PATH = 'root' class-attribute instance-attribute

DOCS_SCOPES = ['https://www.googleapis.com/auth/documents'] class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

append_text_to_google_doc(params)

Source code in griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to append text to a Google Doc.",
        "schema": Schema(
            {
                Literal(
                    "file_path",
                    description="Destination file path of Google Doc in the POSIX format. "
                    "For example, 'foo/bar/baz.txt'",
                ): str,
                Literal("text", description="Text to be appended to the Google Doc."): str,
            }
        ),
    }
)
def append_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    file_path = values.get("file_path")
    text = values.get("text")

    try:
        docs_service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )
        drive_service = self._build_client(
            scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
        )

        document_id = self._convert_path_to_file_id(drive_service, file_path)
        if document_id:
            doc = docs_service.documents().get(documentId=document_id).execute()
            content = doc["body"]["content"]
            last_text = content[-1]["paragraph"]["elements"][-1]["textRun"]["content"]
            append_index = content[-1]["endIndex"]
            if last_text.endswith("\n"):
                append_index -= 1

            requests = [{"insertText": {"location": {"index": append_index}, "text": text}}]

            docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
            return InfoArtifact("text appended successfully")
        else:
            return ErrorArtifact(f"error appending to Google Doc, file not found for path {file_path}")

    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error appending text to Google Doc with path {file_path}: {e}")

prepend_text_to_google_doc(params)

Source code in griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to prepend text to a Google Doc",
        "schema": Schema(
            {
                Literal(
                    "file_path",
                    description="Destination file path of Google Doc in the POSIX format. "
                    "For example, 'foo/bar/baz.txt'",
                ): str,
                Literal("text", description="Text to be prepended to the Google Doc."): str,
            }
        ),
    }
)
def prepend_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    file_path = values.get("file_path")
    text = values.get("text")

    try:
        docs_service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )
        drive_service = self._build_client(
            scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
        )

        document_id = self._convert_path_to_file_id(drive_service, file_path)
        if document_id:
            doc = docs_service.documents().get(documentId=document_id).execute()

            if len(doc["body"]["content"]) == 1:
                requests = [{"insertText": {"location": {"index": 1}, "text": text}}]
            else:
                start_index = doc["body"]["content"][1]["startIndex"]
                requests = [{"insertText": {"location": {"index": start_index}, "text": text}}]

            docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
            return InfoArtifact("text prepended successfully")
        else:
            return ErrorArtifact(f"error prepending to google doc, file not found for path {file_path}")

    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error prepending text to Google Doc with path {file_path}: {e}")

save_content_to_google_doc(params)

Source code in griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to create a new Google Doc and optionally save content to it.",
        "schema": Schema(
            {
                Literal(
                    "file_path",
                    description="Name of the file to be created, which will be used to save content in.",
                ): str,
                Optional("content", default=None, description="Optional content to be saved in Google Doc."): str,
                Optional(
                    "folder_path",
                    default=DEFAULT_FOLDER_PATH,
                    description="Path of the folder where the Google doc will be created.",
                ): str,
            }
        ),
    }
)
def save_content_to_google_doc(self, params: dict) -> ErrorArtifact | InfoArtifact:
    values = params["values"]
    file_path = values.get("file_path")
    content = values.get("content")
    folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

    try:
        docs_service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )
        drive_service = self._build_client(
            scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
        )

        body = {"title": file_path}

        doc = docs_service.documents().create(body=body).execute()
        doc_id = doc["documentId"]

        if folder_path.lower() != self.DEFAULT_FOLDER_PATH:
            folder_id = self._convert_path_to_file_id(drive_service, folder_path)
            if folder_id:
                drive_service.files().update(fileId=doc_id, addParents=folder_id, fields="id, parents").execute()
            else:
                return ErrorArtifact(f"Error: Folder not found for path {folder_path}")

        if content:
            save_content_params = {"document_id": doc_id, "content": content}
            saved_document_id = self._save_to_doc(save_content_params)
            return InfoArtifact(f"Content has been successfully saved to Google Doc with ID: {saved_document_id}.")
        else:
            return InfoArtifact(f"Google Doc '{file_path}' created with ID: {doc_id}")

    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"Error creating/saving Google Doc: {e}")

save_memory_artifacts_to_google_docs(params)

Source code in griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to load content from memory and save it to a new Google Doc "
        "in the specified folder.",
        "schema": Schema(
            {
                "memory_name": str,
                "artifact_namespace": str,
                "file_name": str,
                Optional(
                    "folder_path",
                    description="Path of the folder where the Google Doc should be saved.",
                    default=DEFAULT_FOLDER_PATH,
                ): str,
            }
        ),
    }
)
def save_memory_artifacts_to_google_docs(self, params: dict) -> ErrorArtifact | InfoArtifact:
    values = params["values"]
    memory = self.find_input_memory(values["memory_name"])

    if memory:
        artifacts = memory.load_artifacts(values["artifact_namespace"])

        if artifacts:
            try:
                file_path = values["file_name"]
                content = "\n".join([a.value for a in artifacts])

                save_params = {
                    "file_path": file_path,
                    "content": content,
                    "folder_path": values.get("folder_path", self.DEFAULT_FOLDER_PATH),
                }

                return self.save_content_to_google_doc(save_params)

            except Exception as e:
                return ErrorArtifact(f"Error: {e}")

        else:
            return ErrorArtifact("no artifacts found")
    else:
        return ErrorArtifact("memory not found")

GoogleDriveClient

Bases: BaseGoogleClient

Source code in griptape/tools/google_drive/tool.py
@define
class GoogleDriveClient(BaseGoogleClient):
    LIST_FILES_SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]

    GOOGLE_EXPORT_MIME_MAPPING = {
        "application/vnd.google-apps.document": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        "application/vnd.google-apps.spreadsheet": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "application/vnd.google-apps.presentation": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
    }

    DEFAULT_FOLDER_PATH = "root"

    SERVICE_NAME = "drive"

    SERVICE_VERSION = "v3"

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to list files in a specific Google Drive folder.",
            "schema": Schema(
                {
                    schema.Optional(
                        "folder_path",
                        default=DEFAULT_FOLDER_PATH,
                        description="Path of the Google Drive folder (like 'MainFolder/Subfolder1/Subfolder2') "
                        "from which files should be listed.",
                    ): str
                }
            ),
        }
    )
    def list_files(self, params: dict) -> ListArtifact | ErrorArtifact:
        values = params["values"]
        from google.auth.exceptions import MalformedError  # pyright: ignore

        folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

        try:
            service = self._build_client(
                self.LIST_FILES_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
            )

            if folder_path == self.DEFAULT_FOLDER_PATH:
                query = "mimeType != 'application/vnd.google-apps.folder' and 'root' in parents and trashed=false"
            else:
                folder_id = self._convert_path_to_file_id(service, folder_path)
                if folder_id:
                    query = f"'{folder_id}' in parents and trashed=false"
                else:
                    return ErrorArtifact(f"Could not find folder: {folder_path}")

            items = self._list_files(service, query)
            return ListArtifact([TextArtifact(i) for i in items])

        except MalformedError:
            return ErrorArtifact("error listing files due to malformed credentials")
        except Exception as e:
            return ErrorArtifact(f"error listing files from Google Drive: {e}")

    @activity(
        config={
            "description": "Can be used to save memory artifacts to Google Drive using folder paths",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    "file_name": str,
                    schema.Optional(
                        "folder_path",
                        description="Path of the Google Drive folder (like 'MainFolder/Subfolder1/Subfolder2') "
                        "where the file should be saved.",
                        default=DEFAULT_FOLDER_PATH,
                    ): str,
                }
            ),
        }
    )
    def save_memory_artifacts_to_drive(self, params: dict) -> ErrorArtifact | InfoArtifact:
        values = params["values"]
        memory = self.find_input_memory(values["memory_name"])
        file_name = values["file_name"]
        folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

        if memory:
            artifacts = memory.load_artifacts(values["artifact_namespace"])

            if artifacts:
                service = self._build_client(
                    self.DRIVE_FILE_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
                )

                if folder_path == self.DEFAULT_FOLDER_PATH:
                    folder_id = self.DEFAULT_FOLDER_PATH
                else:
                    folder_id = self._convert_path_to_file_id(service, folder_path)

                if folder_id:
                    try:
                        if len(artifacts) == 1:
                            self._save_to_drive(file_name, artifacts[0].value, folder_id)
                        else:
                            for a in artifacts:
                                self._save_to_drive(f"{a.name}-{file_name}", a.value, folder_id)

                        return InfoArtifact("saved successfully")

                    except Exception as e:
                        return ErrorArtifact(f"error saving file to Google Drive: {e}")
                else:
                    return ErrorArtifact(f"Could not find folder: {folder_path}")
            else:
                return ErrorArtifact("no artifacts found")
        else:
            return ErrorArtifact("memory not found")

    @activity(
        config={
            "description": "Can be used to save content to a file on Google Drive",
            "schema": Schema(
                {
                    Literal(
                        "path",
                        description="Destination file path on Google Drive in the POSIX format. "
                        "For example, 'foo/bar/baz.txt'",
                    ): str,
                    "content": str,
                }
            ),
        }
    )
    def save_content_to_drive(self, params: dict) -> ErrorArtifact | InfoArtifact:
        content = params["values"]["content"]
        filename = params["values"]["path"]

        try:
            self._save_to_drive(filename, content)

            return InfoArtifact("saved successfully")
        except Exception as e:
            return ErrorArtifact(f"error saving file to Google Drive: {e}")

    @activity(
        config={
            "description": "Can be used to download multiple files from Google Drive based on a provided list of paths",
            "schema": Schema(
                {
                    Literal(
                        "paths",
                        description="List of paths to files to be loaded in the POSIX format. "
                        "For example, ['foo/bar/file1.txt', 'foo/bar/file2.txt']",
                    ): [str]
                }
            ),
        }
    )
    def download_files(self, params: dict) -> ListArtifact | ErrorArtifact:
        from google.auth.exceptions import MalformedError  # pyright: ignore
        from googleapiclient.errors import HttpError  # pyright: ignore

        values = params["values"]
        downloaded_files = []

        try:
            service = self._build_client(
                self.LIST_FILES_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
            )

            for path in values["paths"]:
                file_id = self._convert_path_to_file_id(service, path)
                if file_id:
                    file_info = service.files().get(fileId=file_id).execute()
                    mime_type = file_info["mimeType"]

                    if mime_type in self.GOOGLE_EXPORT_MIME_MAPPING:
                        export_mime = self.GOOGLE_EXPORT_MIME_MAPPING[mime_type]
                        request = service.files().export_media(fileId=file_id, mimeType=export_mime)
                    else:
                        request = service.files().get_media(fileId=file_id)

                    downloaded_files.append(BlobArtifact(request.execute()))
                else:
                    logging.error(f"Could not find file: {path}")

            return ListArtifact(downloaded_files)
        except HttpError as e:
            return ErrorArtifact(f"error downloading file in Google Drive: {e}")
        except MalformedError:
            return ErrorArtifact("error downloading file due to malformed credentials")
        except Exception as e:
            return ErrorArtifact(f"error downloading file to Google Drive: {e}")

    @activity(
        config={
            "description": "Can search for files on Google Drive based on name or content",
            "schema": Schema(
                {
                    Literal(
                        "search_mode",
                        description="File search mode. Use 'name' to search in file name or "
                        "'content' to search in file content",
                    ): Or("name", "content"),
                    Literal(
                        "search_query",
                        description="Query to search for. If search_mode is 'name', it's the file name. If 'content', "
                        "it's the text within files.",
                    ): str,
                    schema.Optional(
                        "folder_path",
                        description="Path of the Google Drive folder (like 'MainFolder/Subfolder1/Subfolder2') "
                        "where the search should be performed.",
                        default=DEFAULT_FOLDER_PATH,
                    ): str,
                }
            ),
        }
    )
    def search_files(self, params: dict) -> ListArtifact | ErrorArtifact:
        from google.auth.exceptions import MalformedError  # pyright: ignore
        from googleapiclient.errors import HttpError  # pyright: ignore

        values = params["values"]

        search_mode = values["search_mode"]
        folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

        try:
            service = self._build_client(
                self.LIST_FILES_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
            )

            folder_id = None
            if folder_path == self.DEFAULT_FOLDER_PATH:
                folder_id = self.DEFAULT_FOLDER_PATH
            else:
                folder_id = self._convert_path_to_file_id(service, folder_path)

            if folder_id:
                query = None
                if search_mode == "name":
                    query = f"name='{values['search_query']}'"
                elif search_mode == "content":
                    query = f"fullText contains '{values['search_query']}'"
                else:
                    return ErrorArtifact(f"Invalid search mode: {search_mode}")

                query += " and trashed=false"
                if folder_id != self.DEFAULT_FOLDER_PATH:
                    query += f" and '{folder_id}' in parents"

                results = service.files().list(q=query).execute()
                items = results.get("files", [])
                return ListArtifact([TextArtifact(i) for i in items])
            else:
                return ErrorArtifact(f"Folder path {folder_path} not found")

        except HttpError as e:
            return ErrorArtifact(f"error searching for file in Google Drive: {e}")
        except MalformedError:
            return ErrorArtifact("error searching for file due to malformed credentials")
        except Exception as e:
            return ErrorArtifact(f"error searching file to Google Drive: {e}")

    @activity(
        config={
            "description": "Can be used to share a file with a specified user.",
            "schema": Schema(
                {
                    Literal("file_path", description="The path of the file to share"): str,
                    Literal("email_address", description="The email address of the user to share with"): str,
                    schema.Optional(
                        "role",
                        default="reader",
                        description="The role to give to the user, e.g., 'reader', 'writer', or 'commenter'",
                    ): Or("reader", "writer", "commenter"),
                }
            ),
        }
    )
    def share_file(self, params: dict) -> InfoArtifact | ErrorArtifact:
        from google.auth.exceptions import MalformedError  # pyright: ignore
        from googleapiclient.errors import HttpError  # pyright: ignore

        values = params["values"]
        file_path = values.get("file_path")
        email_address = values.get("email_address")
        role = values.get("role", "reader")

        try:
            service = self._build_client(
                scopes=self.DRIVE_AUTH_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            if file_path.lower() == self.DEFAULT_FOLDER_PATH:
                file_id = self.DEFAULT_FOLDER_PATH
            else:
                file_id = self._convert_path_to_file_id(service, file_path)

            if file_id:
                batch_update_permission_request_body = {"role": role, "type": "user", "emailAddress": email_address}
                request = service.permissions().create(
                    fileId=file_id, body=batch_update_permission_request_body, fields="id"
                )
                request.execute()
                return InfoArtifact(f"File at {file_path} shared with {email_address} as a {role}")
            else:
                return ErrorArtifact(f"error finding file at path: {file_path}")
        except HttpError as e:
            return ErrorArtifact(f"error sharing file due to http error: {e}")
        except MalformedError as e:
            return ErrorArtifact(f"error sharing file due to malformed credentials: {e}")
        except Exception as e:
            return ErrorArtifact(f"error sharing file: {e}")

    def _save_to_drive(
        self, filename: str, value: Any, parent_folder_id: Optional[str] = None
    ) -> InfoArtifact | ErrorArtifact:
        from googleapiclient.http import MediaIoBaseUpload  # pyright: ignore

        service = self._build_client(self.DRIVE_FILE_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email)

        if isinstance(value, str):
            value = value.encode()

        parts = filename.split("/")
        if len(parts) > 1:
            directory = "/".join(parts[:-1])
            parent_folder_id = self._convert_path_to_file_id(service, directory)
            if not parent_folder_id:
                return ErrorArtifact(f"Could not find folder: {directory}")
            filename = parts[-1]

        file_metadata = {"name": filename, "parents": []}
        if parent_folder_id:
            file_metadata["parents"] = [parent_folder_id]

        media = MediaIoBaseUpload(BytesIO(value), mimetype="application/octet-stream", resumable=True)

        file = service.files().create(body=file_metadata, media_body=media, fields="id").execute()
        return InfoArtifact(file)

    def _list_files(self, service: Any, query: str) -> list[dict]:
        items = []
        next_page_token = None

        while True:
            results = service.files().list(q=query, pageToken=next_page_token).execute()

            files = results.get("files", [])
            items.extend(files)

            next_page_token = results.get("nextPageToken")
            if not next_page_token:
                break

        return items

DEFAULT_FOLDER_PATH = 'root' class-attribute instance-attribute

GOOGLE_EXPORT_MIME_MAPPING = {'application/vnd.google-apps.document': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.google-apps.spreadsheet': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.google-apps.presentation': 'application/vnd.openxmlformats-officedocument.presentationml.presentation'} class-attribute instance-attribute

LIST_FILES_SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] class-attribute instance-attribute

SERVICE_NAME = 'drive' class-attribute instance-attribute

SERVICE_VERSION = 'v3' class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

download_files(params)

Source code in griptape/tools/google_drive/tool.py
@activity(
    config={
        "description": "Can be used to download multiple files from Google Drive based on a provided list of paths",
        "schema": Schema(
            {
                Literal(
                    "paths",
                    description="List of paths to files to be loaded in the POSIX format. "
                    "For example, ['foo/bar/file1.txt', 'foo/bar/file2.txt']",
                ): [str]
            }
        ),
    }
)
def download_files(self, params: dict) -> ListArtifact | ErrorArtifact:
    from google.auth.exceptions import MalformedError  # pyright: ignore
    from googleapiclient.errors import HttpError  # pyright: ignore

    values = params["values"]
    downloaded_files = []

    try:
        service = self._build_client(
            self.LIST_FILES_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
        )

        for path in values["paths"]:
            file_id = self._convert_path_to_file_id(service, path)
            if file_id:
                file_info = service.files().get(fileId=file_id).execute()
                mime_type = file_info["mimeType"]

                if mime_type in self.GOOGLE_EXPORT_MIME_MAPPING:
                    export_mime = self.GOOGLE_EXPORT_MIME_MAPPING[mime_type]
                    request = service.files().export_media(fileId=file_id, mimeType=export_mime)
                else:
                    request = service.files().get_media(fileId=file_id)

                downloaded_files.append(BlobArtifact(request.execute()))
            else:
                logging.error(f"Could not find file: {path}")

        return ListArtifact(downloaded_files)
    except HttpError as e:
        return ErrorArtifact(f"error downloading file in Google Drive: {e}")
    except MalformedError:
        return ErrorArtifact("error downloading file due to malformed credentials")
    except Exception as e:
        return ErrorArtifact(f"error downloading file to Google Drive: {e}")

list_files(params)

Source code in griptape/tools/google_drive/tool.py
@activity(
    config={
        "description": "Can be used to list files in a specific Google Drive folder.",
        "schema": Schema(
            {
                schema.Optional(
                    "folder_path",
                    default=DEFAULT_FOLDER_PATH,
                    description="Path of the Google Drive folder (like 'MainFolder/Subfolder1/Subfolder2') "
                    "from which files should be listed.",
                ): str
            }
        ),
    }
)
def list_files(self, params: dict) -> ListArtifact | ErrorArtifact:
    values = params["values"]
    from google.auth.exceptions import MalformedError  # pyright: ignore

    folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

    try:
        service = self._build_client(
            self.LIST_FILES_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
        )

        if folder_path == self.DEFAULT_FOLDER_PATH:
            query = "mimeType != 'application/vnd.google-apps.folder' and 'root' in parents and trashed=false"
        else:
            folder_id = self._convert_path_to_file_id(service, folder_path)
            if folder_id:
                query = f"'{folder_id}' in parents and trashed=false"
            else:
                return ErrorArtifact(f"Could not find folder: {folder_path}")

        items = self._list_files(service, query)
        return ListArtifact([TextArtifact(i) for i in items])

    except MalformedError:
        return ErrorArtifact("error listing files due to malformed credentials")
    except Exception as e:
        return ErrorArtifact(f"error listing files from Google Drive: {e}")

save_content_to_drive(params)

Source code in griptape/tools/google_drive/tool.py
@activity(
    config={
        "description": "Can be used to save content to a file on Google Drive",
        "schema": Schema(
            {
                Literal(
                    "path",
                    description="Destination file path on Google Drive in the POSIX format. "
                    "For example, 'foo/bar/baz.txt'",
                ): str,
                "content": str,
            }
        ),
    }
)
def save_content_to_drive(self, params: dict) -> ErrorArtifact | InfoArtifact:
    content = params["values"]["content"]
    filename = params["values"]["path"]

    try:
        self._save_to_drive(filename, content)

        return InfoArtifact("saved successfully")
    except Exception as e:
        return ErrorArtifact(f"error saving file to Google Drive: {e}")

save_memory_artifacts_to_drive(params)

Source code in griptape/tools/google_drive/tool.py
@activity(
    config={
        "description": "Can be used to save memory artifacts to Google Drive using folder paths",
        "schema": Schema(
            {
                "memory_name": str,
                "artifact_namespace": str,
                "file_name": str,
                schema.Optional(
                    "folder_path",
                    description="Path of the Google Drive folder (like 'MainFolder/Subfolder1/Subfolder2') "
                    "where the file should be saved.",
                    default=DEFAULT_FOLDER_PATH,
                ): str,
            }
        ),
    }
)
def save_memory_artifacts_to_drive(self, params: dict) -> ErrorArtifact | InfoArtifact:
    values = params["values"]
    memory = self.find_input_memory(values["memory_name"])
    file_name = values["file_name"]
    folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

    if memory:
        artifacts = memory.load_artifacts(values["artifact_namespace"])

        if artifacts:
            service = self._build_client(
                self.DRIVE_FILE_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
            )

            if folder_path == self.DEFAULT_FOLDER_PATH:
                folder_id = self.DEFAULT_FOLDER_PATH
            else:
                folder_id = self._convert_path_to_file_id(service, folder_path)

            if folder_id:
                try:
                    if len(artifacts) == 1:
                        self._save_to_drive(file_name, artifacts[0].value, folder_id)
                    else:
                        for a in artifacts:
                            self._save_to_drive(f"{a.name}-{file_name}", a.value, folder_id)

                    return InfoArtifact("saved successfully")

                except Exception as e:
                    return ErrorArtifact(f"error saving file to Google Drive: {e}")
            else:
                return ErrorArtifact(f"Could not find folder: {folder_path}")
        else:
            return ErrorArtifact("no artifacts found")
    else:
        return ErrorArtifact("memory not found")

search_files(params)

Source code in griptape/tools/google_drive/tool.py
@activity(
    config={
        "description": "Can search for files on Google Drive based on name or content",
        "schema": Schema(
            {
                Literal(
                    "search_mode",
                    description="File search mode. Use 'name' to search in file name or "
                    "'content' to search in file content",
                ): Or("name", "content"),
                Literal(
                    "search_query",
                    description="Query to search for. If search_mode is 'name', it's the file name. If 'content', "
                    "it's the text within files.",
                ): str,
                schema.Optional(
                    "folder_path",
                    description="Path of the Google Drive folder (like 'MainFolder/Subfolder1/Subfolder2') "
                    "where the search should be performed.",
                    default=DEFAULT_FOLDER_PATH,
                ): str,
            }
        ),
    }
)
def search_files(self, params: dict) -> ListArtifact | ErrorArtifact:
    from google.auth.exceptions import MalformedError  # pyright: ignore
    from googleapiclient.errors import HttpError  # pyright: ignore

    values = params["values"]

    search_mode = values["search_mode"]
    folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

    try:
        service = self._build_client(
            self.LIST_FILES_SCOPES, self.SERVICE_NAME, self.SERVICE_VERSION, self.owner_email
        )

        folder_id = None
        if folder_path == self.DEFAULT_FOLDER_PATH:
            folder_id = self.DEFAULT_FOLDER_PATH
        else:
            folder_id = self._convert_path_to_file_id(service, folder_path)

        if folder_id:
            query = None
            if search_mode == "name":
                query = f"name='{values['search_query']}'"
            elif search_mode == "content":
                query = f"fullText contains '{values['search_query']}'"
            else:
                return ErrorArtifact(f"Invalid search mode: {search_mode}")

            query += " and trashed=false"
            if folder_id != self.DEFAULT_FOLDER_PATH:
                query += f" and '{folder_id}' in parents"

            results = service.files().list(q=query).execute()
            items = results.get("files", [])
            return ListArtifact([TextArtifact(i) for i in items])
        else:
            return ErrorArtifact(f"Folder path {folder_path} not found")

    except HttpError as e:
        return ErrorArtifact(f"error searching for file in Google Drive: {e}")
    except MalformedError:
        return ErrorArtifact("error searching for file due to malformed credentials")
    except Exception as e:
        return ErrorArtifact(f"error searching file to Google Drive: {e}")

share_file(params)

Source code in griptape/tools/google_drive/tool.py
@activity(
    config={
        "description": "Can be used to share a file with a specified user.",
        "schema": Schema(
            {
                Literal("file_path", description="The path of the file to share"): str,
                Literal("email_address", description="The email address of the user to share with"): str,
                schema.Optional(
                    "role",
                    default="reader",
                    description="The role to give to the user, e.g., 'reader', 'writer', or 'commenter'",
                ): Or("reader", "writer", "commenter"),
            }
        ),
    }
)
def share_file(self, params: dict) -> InfoArtifact | ErrorArtifact:
    from google.auth.exceptions import MalformedError  # pyright: ignore
    from googleapiclient.errors import HttpError  # pyright: ignore

    values = params["values"]
    file_path = values.get("file_path")
    email_address = values.get("email_address")
    role = values.get("role", "reader")

    try:
        service = self._build_client(
            scopes=self.DRIVE_AUTH_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
        )

        if file_path.lower() == self.DEFAULT_FOLDER_PATH:
            file_id = self.DEFAULT_FOLDER_PATH
        else:
            file_id = self._convert_path_to_file_id(service, file_path)

        if file_id:
            batch_update_permission_request_body = {"role": role, "type": "user", "emailAddress": email_address}
            request = service.permissions().create(
                fileId=file_id, body=batch_update_permission_request_body, fields="id"
            )
            request.execute()
            return InfoArtifact(f"File at {file_path} shared with {email_address} as a {role}")
        else:
            return ErrorArtifact(f"error finding file at path: {file_path}")
    except HttpError as e:
        return ErrorArtifact(f"error sharing file due to http error: {e}")
    except MalformedError as e:
        return ErrorArtifact(f"error sharing file due to malformed credentials: {e}")
    except Exception as e:
        return ErrorArtifact(f"error sharing file: {e}")

GoogleGmailClient

Bases: BaseGoogleClient

Source code in griptape/tools/google_gmail/tool.py
@define
class GoogleGmailClient(BaseGoogleClient):
    CREATE_DRAFT_EMAIL_SCOPES = ["https://www.googleapis.com/auth/gmail.compose"]

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to create a draft email in GMail",
            "schema": Schema(
                {
                    Literal("to", description="email address which to send to"): str,
                    Literal("subject", description="subject of the email"): str,
                    Literal("body", description="body of the email"): str,
                }
            ),
        }
    )
    def create_draft_email(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]

        try:
            service = self._build_client(
                scopes=self.CREATE_DRAFT_EMAIL_SCOPES, service_name="gmail", version="v1", owner_email=self.owner_email
            )

            message = EmailMessage()
            message.set_content(values["body"])
            message["To"] = values["to"]
            message["From"] = self.owner_email
            message["Subject"] = values["subject"]

            encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
            create_message = {"message": {"raw": encoded_message}}
            draft = service.users().drafts().create(userId="me", body=create_message).execute()
            return InfoArtifact(f'An email draft was successfully created (ID: {draft["id"]})')

        except Exception as error:
            logging.error(error)
            return ErrorArtifact(f"error creating draft email: {error}")

CREATE_DRAFT_EMAIL_SCOPES = ['https://www.googleapis.com/auth/gmail.compose'] class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

create_draft_email(params)

Source code in griptape/tools/google_gmail/tool.py
@activity(
    config={
        "description": "Can be used to create a draft email in GMail",
        "schema": Schema(
            {
                Literal("to", description="email address which to send to"): str,
                Literal("subject", description="subject of the email"): str,
                Literal("body", description="body of the email"): str,
            }
        ),
    }
)
def create_draft_email(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]

    try:
        service = self._build_client(
            scopes=self.CREATE_DRAFT_EMAIL_SCOPES, service_name="gmail", version="v1", owner_email=self.owner_email
        )

        message = EmailMessage()
        message.set_content(values["body"])
        message["To"] = values["to"]
        message["From"] = self.owner_email
        message["Subject"] = values["subject"]

        encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
        create_message = {"message": {"raw": encoded_message}}
        draft = service.users().drafts().create(userId="me", body=create_message).execute()
        return InfoArtifact(f'An email draft was successfully created (ID: {draft["id"]})')

    except Exception as error:
        logging.error(error)
        return ErrorArtifact(f"error creating draft email: {error}")

GriptapeCloudKnowledgeBaseClient

Bases: BaseGriptapeCloudClient

Attributes:

Name Type Description
description Optional[str]

LLM-friendly knowledge base description.

knowledge_base_id str

ID of the Griptape Cloud Knowledge Base.

Source code in griptape/tools/griptape_cloud_knowledge_base_client/tool.py
@define
class GriptapeCloudKnowledgeBaseClient(BaseGriptapeCloudClient):
    """
    Attributes:
        description: LLM-friendly knowledge base description.
        knowledge_base_id: ID of the Griptape Cloud Knowledge Base.
    """

    description: Optional[str] = field(default=None, kw_only=True)
    knowledge_base_id: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to search a knowledge base with the following description: {{ _self._get_knowledge_base_description() }}",
            "schema": Schema(
                {Literal("query", description="A natural language search query to run against the knowledge base"): str}
            ),
        }
    )
    def query(self, params: dict) -> TextArtifact | ErrorArtifact:
        from requests import post, exceptions

        query = params["values"]["query"]
        url = urljoin(self.base_url.strip("/"), f"/api/knowledge-bases/{self.knowledge_base_id}/query")

        try:
            response = post(url, json={"query": query}, headers=self.headers)

            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    def _get_knowledge_base_description(self) -> str:
        from requests import get

        if self.description:
            return self.description
        else:
            url = urljoin(self.base_url.strip("/"), f"/api/knowledge-bases/{self.knowledge_base_id}/")

            response = get(url, headers=self.headers).json()
            if "description" in response:
                return response["description"]
            else:
                raise ValueError(f'Error getting Knowledge Base description: {response["message"]}')

description: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

knowledge_base_id: str = field(kw_only=True) class-attribute instance-attribute

query(params)

Source code in griptape/tools/griptape_cloud_knowledge_base_client/tool.py
@activity(
    config={
        "description": "Can be used to search a knowledge base with the following description: {{ _self._get_knowledge_base_description() }}",
        "schema": Schema(
            {Literal("query", description="A natural language search query to run against the knowledge base"): str}
        ),
    }
)
def query(self, params: dict) -> TextArtifact | ErrorArtifact:
    from requests import post, exceptions

    query = params["values"]["query"]
    url = urljoin(self.base_url.strip("/"), f"/api/knowledge-bases/{self.knowledge_base_id}/query")

    try:
        response = post(url, json={"query": query}, headers=self.headers)

        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

ImageQueryClient

Bases: BaseTool

Source code in griptape/tools/image_query_client/tool.py
@define
class ImageQueryClient(BaseTool):
    image_query_engine: ImageQueryEngine = field(kw_only=True)
    image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True)

    @activity(
        config={
            "description": "This tool can be used to query the contents of images on disk.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_paths", description="The paths to an image files on disk."): list[str],
                }
            ),
        }
    )
    def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_paths = params["values"]["image_paths"]

        image_artifacts = []
        for image_path in image_paths:
            with open(image_path, "rb") as f:
                image_artifacts.append(self.image_loader.load(f.read()))

        return self.image_query_engine.run(query, image_artifacts)

    @activity(
        config={
            "description": "This tool can be used to query the contents of images in memory.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_artifacts", description="Image artifact memory references."): [
                        {"image_artifact_namespace": str, "image_artifact_name": str}
                    ],
                    "memory_name": str,
                }
            ),
        }
    )
    def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_artifact_references = params["values"]["image_artifacts"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        image_artifacts = []
        for image_artifact_reference in image_artifact_references:
            try:
                image_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    ImageArtifact,
                )

                image_artifacts.append(cast(ImageArtifact, image_artifact))
            except ValueError:
                # If we're unable to parse the artifact as an ImageArtifact, attempt to
                # parse a BlobArtifact and load it as an ImageArtifact.
                blob_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    BlobArtifact,
                )

                image_artifacts.append(self.image_loader.load(blob_artifact.value))
            except Exception as e:
                return ErrorArtifact(str(e))

        return self.image_query_engine.run(query, image_artifacts)

image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True) class-attribute instance-attribute

image_query_engine: ImageQueryEngine = field(kw_only=True) class-attribute instance-attribute

query_image_from_disk(params)

Source code in griptape/tools/image_query_client/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images on disk.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_paths", description="The paths to an image files on disk."): list[str],
            }
        ),
    }
)
def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_paths = params["values"]["image_paths"]

    image_artifacts = []
    for image_path in image_paths:
        with open(image_path, "rb") as f:
            image_artifacts.append(self.image_loader.load(f.read()))

    return self.image_query_engine.run(query, image_artifacts)

query_images_from_memory(params)

Source code in griptape/tools/image_query_client/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images in memory.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_artifacts", description="Image artifact memory references."): [
                    {"image_artifact_namespace": str, "image_artifact_name": str}
                ],
                "memory_name": str,
            }
        ),
    }
)
def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_artifact_references = params["values"]["image_artifacts"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    image_artifacts = []
    for image_artifact_reference in image_artifact_references:
        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                ImageArtifact,
            )

            image_artifacts.append(cast(ImageArtifact, image_artifact))
        except ValueError:
            # If we're unable to parse the artifact as an ImageArtifact, attempt to
            # parse a BlobArtifact and load it as an ImageArtifact.
            blob_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                BlobArtifact,
            )

            image_artifacts.append(self.image_loader.load(blob_artifact.value))
        except Exception as e:
            return ErrorArtifact(str(e))

    return self.image_query_engine.run(query, image_artifacts)

InpaintingImageGenerationClient

Bases: ImageArtifactFileOutputMixin, BaseTool

A tool that can be used to generate prompted inpaintings of an image.

Attributes:

Name Type Description
engine InpaintingImageGenerationEngine

The inpainting image generation engine used to generate the image.

output_dir InpaintingImageGenerationEngine

If provided, the generated image will be written to disk in output_dir.

output_file InpaintingImageGenerationEngine

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/inpainting_image_generation_client/tool.py
@define
class InpaintingImageGenerationClient(ImageArtifactFileOutputMixin, BaseTool):
    """A tool that can be used to generate prompted inpaintings of an image.

    Attributes:
        engine: The inpainting image generation engine used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    engine: InpaintingImageGenerationEngine = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Can be used to modify an image within a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal(
                        "prompts",
                        description="A detailed list of features and descriptions to include in the generated image.",
                    ): list[str],
                    Literal(
                        "negative_prompts",
                        description="A detailed list of features and descriptions to avoid in the generated image.",
                    ): list[str],
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                }
            ),
        }
    )
    def image_inpainting_from_file(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
        prompts = params["values"]["prompts"]
        negative_prompts = params["values"]["negative_prompts"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        output_artifact = self.engine.run(
            prompts=prompts, negative_prompts=negative_prompts, image=input_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

    @activity(
        config={
            "description": "Can be used to modify an image within a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal(
                        "prompts",
                        description="A detailed list of features and descriptions to include in the generated image.",
                    ): list[str],
                    Literal(
                        "negative_prompts",
                        description="A detailed list of features and descriptions to avoid in the generated image.",
                    ): list[str],
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "image_artifact_name": str,
                    "mask_artifact_namespace": str,
                    "mask_artifact_name": str,
                }
            ),
        }
    )
    def image_inpainting_from_memory(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
        prompts = params["values"]["prompts"]
        negative_prompts = params["values"]["negative_prompts"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory, image_artifact_namespace, image_artifact_name, ImageArtifact
            )
            mask_artifact = load_artifact_from_memory(
                memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_inpainting(
            prompts, negative_prompts, cast(ImageArtifact, image_artifact), cast(ImageArtifact, mask_artifact)
        )

    def _generate_inpainting(
        self,
        prompts: list[str],
        negative_prompts: list[str],
        image_artifact: ImageArtifact,
        mask_artifact: ImageArtifact,
    ) -> ImageArtifact:
        output_artifact = self.engine.run(
            prompts=prompts, negative_prompts=negative_prompts, image=image_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

engine: InpaintingImageGenerationEngine = field(kw_only=True) class-attribute instance-attribute

image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

image_inpainting_from_file(params)

Source code in griptape/tools/inpainting_image_generation_client/tool.py
@activity(
    config={
        "description": "Can be used to modify an image within a specified mask area using image and mask files.",
        "schema": Schema(
            {
                Literal(
                    "prompts",
                    description="A detailed list of features and descriptions to include in the generated image.",
                ): list[str],
                Literal(
                    "negative_prompts",
                    description="A detailed list of features and descriptions to avoid in the generated image.",
                ): list[str],
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
                Literal("mask_file", description="The path to mask image file."): str,
            }
        ),
    }
)
def image_inpainting_from_file(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
    prompts = params["values"]["prompts"]
    negative_prompts = params["values"]["negative_prompts"]
    image_file = params["values"]["image_file"]
    mask_file = params["values"]["mask_file"]

    input_artifact = self.image_loader.load(image_file)
    mask_artifact = self.image_loader.load(mask_file)

    output_artifact = self.engine.run(
        prompts=prompts, negative_prompts=negative_prompts, image=input_artifact, mask=mask_artifact
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

image_inpainting_from_memory(params)

Source code in griptape/tools/inpainting_image_generation_client/tool.py
@activity(
    config={
        "description": "Can be used to modify an image within a specified mask area using image and mask artifacts in memory.",
        "schema": Schema(
            {
                Literal(
                    "prompts",
                    description="A detailed list of features and descriptions to include in the generated image.",
                ): list[str],
                Literal(
                    "negative_prompts",
                    description="A detailed list of features and descriptions to avoid in the generated image.",
                ): list[str],
                "memory_name": str,
                "image_artifact_namespace": str,
                "image_artifact_name": str,
                "mask_artifact_namespace": str,
                "mask_artifact_name": str,
            }
        ),
    }
)
def image_inpainting_from_memory(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
    prompts = params["values"]["prompts"]
    negative_prompts = params["values"]["negative_prompts"]
    image_artifact_namespace = params["values"]["image_artifact_namespace"]
    image_artifact_name = params["values"]["image_artifact_name"]
    mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
    mask_artifact_name = params["values"]["mask_artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(
            memory, image_artifact_namespace, image_artifact_name, ImageArtifact
        )
        mask_artifact = load_artifact_from_memory(
            memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact
        )
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_inpainting(
        prompts, negative_prompts, cast(ImageArtifact, image_artifact), cast(ImageArtifact, mask_artifact)
    )

OpenWeatherClient

Bases: BaseTool

Source code in griptape/tools/openweather_client/tool.py
@define
class OpenWeatherClient(BaseTool):
    BASE_URL = "https://api.openweathermap.org/data/3.0/onecall"
    GEOCODING_URL = "https://api.openweathermap.org/geo/1.0/direct"
    US_STATE_CODES = [
        "AL",
        "AK",
        "AZ",
        "AR",
        "CA",
        "CO",
        "CT",
        "DE",
        "FL",
        "GA",
        "HI",
        "ID",
        "IL",
        "IN",
        "IA",
        "KS",
        "KY",
        "LA",
        "ME",
        "MD",
        "MA",
        "MI",
        "MN",
        "MS",
        "MO",
        "MT",
        "NE",
        "NV",
        "NH",
        "NJ",
        "NM",
        "NY",
        "NC",
        "ND",
        "OH",
        "OK",
        "OR",
        "PA",
        "RI",
        "SC",
        "SD",
        "TN",
        "TX",
        "UT",
        "VT",
        "VA",
        "WA",
        "WV",
        "WI",
        "WY",
    ]
    api_key: str = field(kw_only=True)
    units: str = field(default="imperial", kw_only=True)

    @activity(
        config={
            "description": "Can be used to fetch the latitude and longitude for a given location.",
            "schema": Schema(
                {
                    Literal(
                        "location",
                        description="Location to fetch coordinates for. "
                        "For US cities, use the format 'city_name, state_code'. "
                        "For non-US cities, use 'city_name, country_code'. "
                        "For cities without specifying state or country, simply use 'city_name'.",
                    ): str
                }
            ),
        }
    )
    def get_coordinates_by_location(self, params: dict) -> InfoArtifact | ErrorArtifact:
        location = params["values"].get("location")
        coordinates = self._fetch_coordinates(location)
        if coordinates:
            lat, lon = coordinates
            return InfoArtifact(f"Coordinates for {location}: Latitude: {lat}, Longitude: {lon}")
        else:
            return ErrorArtifact(f"Error fetching coordinates for location: {location}")

    def _fetch_coordinates(self, location: str) -> Optional[tuple[float, Optional[float]]]:
        parts = location.split(",")
        if len(parts) == 2 and parts[1].strip() in self.US_STATE_CODES:
            location += ", US"
        request_params = {"q": location, "limit": 1, "appid": self.api_key}
        try:
            response = requests.get(self.GEOCODING_URL, params=request_params)
            if response.status_code == 200:
                data = response.json()
                if data and isinstance(data, list):
                    return data[0]["lat"], data[0]["lon"]
            else:
                logging.error(
                    f"Error fetching coordinates. HTTP Status Code: {response.status_code}. Response: {response.text}"
                )
        except Exception as e:
            logging.error(f"Error fetching coordinates: {e}")
        return None

    @activity(
        config={
            "description": "Can be used to fetch current weather data for a given location. "
            "Temperatures are returned in {{ _self.units }} by default.",
            "schema": Schema({Literal("location", description="Location to fetch weather data for."): str}),
        }
    )
    def get_current_weather_by_location(self, params: dict) -> ListArtifact | TextArtifact | ErrorArtifact:
        location = params["values"].get("location")
        coordinates = self._fetch_coordinates(location)
        if coordinates:
            lat, lon = coordinates
            request_params = {
                "lat": lat,
                "lon": lon,
                "exclude": "minutely,hourly,daily,alerts",
                "appid": self.api_key,
                "units": self.units,
            }
            return self._fetch_weather_data(request_params)
        else:
            return ErrorArtifact(f"Error fetching coordinates for location: {location}")

    @activity(
        config={
            "description": "Can be used to fetch hourly forecast for a given location up to 48 hours ahead. "
            "Temperatures are returned in {{ _self.units }} by default.",
            "schema": Schema({Literal("location", description="Location to fetch hourly forecast for."): str}),
        }
    )
    def get_hourly_forecast_by_location(self, params: dict) -> ListArtifact | TextArtifact | ErrorArtifact:
        location = params["values"].get("location")
        coordinates = self._fetch_coordinates(location)
        if coordinates:
            lat, lon = coordinates
            request_params = {
                "lat": lat,
                "lon": lon,
                "exclude": "minutely,current,daily,alerts",
                "appid": self.api_key,
                "units": self.units,
            }
            return self._fetch_weather_data(request_params)
        else:
            return ErrorArtifact(f"Error fetching coordinates for location: {location}")

    @activity(
        config={
            "description": "Can be used to fetch daily forecast for a given location up to 8 days ahead. "
            "Temperatures are returned in {{ _self.units }} by default.",
            "schema": Schema({Literal("location", description="Location to fetch daily forecast for."): str}),
        }
    )
    def get_daily_forecast_by_location(self, params: dict) -> ListArtifact | TextArtifact | ErrorArtifact:
        location = params["values"].get("location")
        coordinates = self._fetch_coordinates(location)
        if coordinates:
            lat, lon = coordinates
            request_params = {
                "lat": lat,
                "lon": lon,
                "exclude": "minutely,hourly,current,alerts",
                "appid": self.api_key,
                "units": self.units,
            }
            return self._fetch_weather_data(request_params)
        else:
            return ErrorArtifact(f"Error fetching coordinates for location: {location}")

    def _fetch_weather_data(self, request_params: dict) -> ListArtifact | TextArtifact | ErrorArtifact:
        try:
            response = requests.get(self.BASE_URL, params=request_params)
            if response.status_code == 200:
                data = response.json()
                if isinstance(data, list):
                    wrapped_data = [InfoArtifact(item) for item in data]
                    return ListArtifact(wrapped_data)
                else:
                    return TextArtifact(str(data))
            else:
                logging.error(f"Error fetching weather data. HTTP Status Code: {response.status_code}")
                return ErrorArtifact("Error fetching weather data from OpenWeather API")
        except Exception as e:
            logging.error(f"Error fetching weather data: {e}")
            return ErrorArtifact(f"Error fetching weather data: {e}")

BASE_URL = 'https://api.openweathermap.org/data/3.0/onecall' class-attribute instance-attribute

GEOCODING_URL = 'https://api.openweathermap.org/geo/1.0/direct' class-attribute instance-attribute

US_STATE_CODES = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'] class-attribute instance-attribute

api_key: str = field(kw_only=True) class-attribute instance-attribute

units: str = field(default='imperial', kw_only=True) class-attribute instance-attribute

get_coordinates_by_location(params)

Source code in griptape/tools/openweather_client/tool.py
@activity(
    config={
        "description": "Can be used to fetch the latitude and longitude for a given location.",
        "schema": Schema(
            {
                Literal(
                    "location",
                    description="Location to fetch coordinates for. "
                    "For US cities, use the format 'city_name, state_code'. "
                    "For non-US cities, use 'city_name, country_code'. "
                    "For cities without specifying state or country, simply use 'city_name'.",
                ): str
            }
        ),
    }
)
def get_coordinates_by_location(self, params: dict) -> InfoArtifact | ErrorArtifact:
    location = params["values"].get("location")
    coordinates = self._fetch_coordinates(location)
    if coordinates:
        lat, lon = coordinates
        return InfoArtifact(f"Coordinates for {location}: Latitude: {lat}, Longitude: {lon}")
    else:
        return ErrorArtifact(f"Error fetching coordinates for location: {location}")

get_current_weather_by_location(params)

Source code in griptape/tools/openweather_client/tool.py
@activity(
    config={
        "description": "Can be used to fetch current weather data for a given location. "
        "Temperatures are returned in {{ _self.units }} by default.",
        "schema": Schema({Literal("location", description="Location to fetch weather data for."): str}),
    }
)
def get_current_weather_by_location(self, params: dict) -> ListArtifact | TextArtifact | ErrorArtifact:
    location = params["values"].get("location")
    coordinates = self._fetch_coordinates(location)
    if coordinates:
        lat, lon = coordinates
        request_params = {
            "lat": lat,
            "lon": lon,
            "exclude": "minutely,hourly,daily,alerts",
            "appid": self.api_key,
            "units": self.units,
        }
        return self._fetch_weather_data(request_params)
    else:
        return ErrorArtifact(f"Error fetching coordinates for location: {location}")

get_daily_forecast_by_location(params)

Source code in griptape/tools/openweather_client/tool.py
@activity(
    config={
        "description": "Can be used to fetch daily forecast for a given location up to 8 days ahead. "
        "Temperatures are returned in {{ _self.units }} by default.",
        "schema": Schema({Literal("location", description="Location to fetch daily forecast for."): str}),
    }
)
def get_daily_forecast_by_location(self, params: dict) -> ListArtifact | TextArtifact | ErrorArtifact:
    location = params["values"].get("location")
    coordinates = self._fetch_coordinates(location)
    if coordinates:
        lat, lon = coordinates
        request_params = {
            "lat": lat,
            "lon": lon,
            "exclude": "minutely,hourly,current,alerts",
            "appid": self.api_key,
            "units": self.units,
        }
        return self._fetch_weather_data(request_params)
    else:
        return ErrorArtifact(f"Error fetching coordinates for location: {location}")

get_hourly_forecast_by_location(params)

Source code in griptape/tools/openweather_client/tool.py
@activity(
    config={
        "description": "Can be used to fetch hourly forecast for a given location up to 48 hours ahead. "
        "Temperatures are returned in {{ _self.units }} by default.",
        "schema": Schema({Literal("location", description="Location to fetch hourly forecast for."): str}),
    }
)
def get_hourly_forecast_by_location(self, params: dict) -> ListArtifact | TextArtifact | ErrorArtifact:
    location = params["values"].get("location")
    coordinates = self._fetch_coordinates(location)
    if coordinates:
        lat, lon = coordinates
        request_params = {
            "lat": lat,
            "lon": lon,
            "exclude": "minutely,current,daily,alerts",
            "appid": self.api_key,
            "units": self.units,
        }
        return self._fetch_weather_data(request_params)
    else:
        return ErrorArtifact(f"Error fetching coordinates for location: {location}")

OutpaintingImageGenerationClient

Bases: ImageArtifactFileOutputMixin, BaseTool

A tool that can be used to generate prompted outpaintings of an image.

Attributes:

Name Type Description
engine OutpaintingImageGenerationEngine

The outpainting image generation engine used to generate the image.

output_dir OutpaintingImageGenerationEngine

If provided, the generated image will be written to disk in output_dir.

output_file OutpaintingImageGenerationEngine

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/outpainting_image_generation_client/tool.py
@define
class OutpaintingImageGenerationClient(ImageArtifactFileOutputMixin, BaseTool):
    """A tool that can be used to generate prompted outpaintings of an image.

    Attributes:
        engine: The outpainting image generation engine used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    engine: OutpaintingImageGenerationEngine = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Can be used to modify an image outside a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal(
                        "prompts",
                        description="A detailed list of features and descriptions to include in the generated image.",
                    ): list[str],
                    Literal(
                        "negative_prompts",
                        description="A detailed list of features and descriptions to avoid in the generated image.",
                    ): list[str],
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                }
            ),
        }
    )
    def image_outpainting_from_file(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
        prompts = params["values"]["prompts"]
        negative_prompts = params["values"]["negative_prompts"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        return self._generate_outpainting(prompts, negative_prompts, input_artifact, mask_artifact)

    @activity(
        config={
            "description": "Can be used to modify an image outside a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal(
                        "prompts",
                        description="A detailed list of features and descriptions to include in the generated image.",
                    ): list[str],
                    Literal(
                        "negative_prompts",
                        description="A detailed list of features and descriptions to avoid in the generated image.",
                    ): list[str],
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "mask_artifact_namespace": str,
                }
            ),
        }
    )
    def image_outpainting_from_memory(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
        prompts = params["values"]["prompts"]
        negative_prompts = params["values"]["negative_prompts"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory, image_artifact_namespace, image_artifact_name, ImageArtifact
            )
            mask_artifact = load_artifact_from_memory(
                memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_outpainting(
            prompts, negative_prompts, cast(ImageArtifact, image_artifact), cast(ImageArtifact, mask_artifact)
        )

    def _generate_outpainting(
        self,
        prompts: list[str],
        negative_prompts: list[str],
        image_artifact: ImageArtifact,
        mask_artifact: ImageArtifact,
    ):
        output_artifact = self.engine.run(
            prompts=prompts, negative_prompts=negative_prompts, image=image_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

engine: OutpaintingImageGenerationEngine = field(kw_only=True) class-attribute instance-attribute

image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

image_outpainting_from_file(params)

Source code in griptape/tools/outpainting_image_generation_client/tool.py
@activity(
    config={
        "description": "Can be used to modify an image outside a specified mask area using image and mask files.",
        "schema": Schema(
            {
                Literal(
                    "prompts",
                    description="A detailed list of features and descriptions to include in the generated image.",
                ): list[str],
                Literal(
                    "negative_prompts",
                    description="A detailed list of features and descriptions to avoid in the generated image.",
                ): list[str],
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
                Literal("mask_file", description="The path to mask image file."): str,
            }
        ),
    }
)
def image_outpainting_from_file(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
    prompts = params["values"]["prompts"]
    negative_prompts = params["values"]["negative_prompts"]
    image_file = params["values"]["image_file"]
    mask_file = params["values"]["mask_file"]

    input_artifact = self.image_loader.load(image_file)
    mask_artifact = self.image_loader.load(mask_file)

    return self._generate_outpainting(prompts, negative_prompts, input_artifact, mask_artifact)

image_outpainting_from_memory(params)

Source code in griptape/tools/outpainting_image_generation_client/tool.py
@activity(
    config={
        "description": "Can be used to modify an image outside a specified mask area using image and mask artifacts in memory.",
        "schema": Schema(
            {
                Literal(
                    "prompts",
                    description="A detailed list of features and descriptions to include in the generated image.",
                ): list[str],
                Literal(
                    "negative_prompts",
                    description="A detailed list of features and descriptions to avoid in the generated image.",
                ): list[str],
                "memory_name": str,
                "image_artifact_namespace": str,
                "mask_artifact_namespace": str,
            }
        ),
    }
)
def image_outpainting_from_memory(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
    prompts = params["values"]["prompts"]
    negative_prompts = params["values"]["negative_prompts"]
    image_artifact_namespace = params["values"]["image_artifact_namespace"]
    image_artifact_name = params["values"]["image_artifact_name"]
    mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
    mask_artifact_name = params["values"]["mask_artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(
            memory, image_artifact_namespace, image_artifact_name, ImageArtifact
        )
        mask_artifact = load_artifact_from_memory(
            memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact
        )
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_outpainting(
        prompts, negative_prompts, cast(ImageArtifact, image_artifact), cast(ImageArtifact, mask_artifact)
    )

PromptImageGenerationClient

Bases: ImageArtifactFileOutputMixin, BaseTool

A tool that can be used to generate an image from a text prompt.

Attributes:

Name Type Description
engine PromptImageGenerationEngine

The prompt image generation engine used to generate the image.

output_dir PromptImageGenerationEngine

If provided, the generated image will be written to disk in output_dir.

output_file PromptImageGenerationEngine

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/prompt_image_generation_client/tool.py
@define
class PromptImageGenerationClient(ImageArtifactFileOutputMixin, BaseTool):
    """A tool that can be used to generate an image from a text prompt.

    Attributes:
        engine: The prompt image generation engine used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    engine: PromptImageGenerationEngine = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to generate an image from text prompts.",
            "schema": Schema(
                {
                    Literal(
                        "prompts",
                        description="A detailed list of features and descriptions to include in the generated image.",
                    ): list[str],
                    Literal(
                        "negative_prompts",
                        description="A detailed list of features and descriptions to avoid in the generated image.",
                    ): list[str],
                }
            ),
        }
    )
    def generate_image(self, params: dict[str, dict[str, list[str]]]) -> ImageArtifact | ErrorArtifact:
        prompts = params["values"]["prompts"]
        negative_prompts = params["values"]["negative_prompts"]

        output_artifact = self.engine.run(prompts=prompts, negative_prompts=negative_prompts)

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

engine: PromptImageGenerationEngine = field(kw_only=True) class-attribute instance-attribute

generate_image(params)

Source code in griptape/tools/prompt_image_generation_client/tool.py
@activity(
    config={
        "description": "Can be used to generate an image from text prompts.",
        "schema": Schema(
            {
                Literal(
                    "prompts",
                    description="A detailed list of features and descriptions to include in the generated image.",
                ): list[str],
                Literal(
                    "negative_prompts",
                    description="A detailed list of features and descriptions to avoid in the generated image.",
                ): list[str],
            }
        ),
    }
)
def generate_image(self, params: dict[str, dict[str, list[str]]]) -> ImageArtifact | ErrorArtifact:
    prompts = params["values"]["prompts"]
    negative_prompts = params["values"]["negative_prompts"]

    output_artifact = self.engine.run(prompts=prompts, negative_prompts=negative_prompts)

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

RestApiClient

Bases: BaseTool

Attributes:

Name Type Description
base_url str

The base url that will be used for the request.

path Optional[str]

The resource path that will be appended to base_url.

description str

A description of what the REST API does.

request_body_schema Optional[str]

A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests.

request_query_params_schema Optional[str]

A JSON schema string describing the available query parameters.

request_path_params_schema Optional[str]

A JSON schema string describing the available path parameters. The schema must describe an array of string values.

response_body_schema Optional[str]

A JSON schema string describing the response body.

request_headers Optional[Dict[str, str]]

Headers to include in the requests.

Source code in griptape/tools/rest_api_client/tool.py
@define
class RestApiClient(BaseTool):
    """
    Attributes:
        base_url: The base url that will be used for the request.
        path: The resource path that will be appended to base_url.
        description: A description of what the REST API does.
        request_body_schema: A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests.
        request_query_params_schema: A JSON schema string describing the available query parameters.
        request_path_params_schema: A JSON schema string describing the available path parameters. The schema must describe an array of string values.
        response_body_schema: A JSON schema string describing the response body.
        request_headers: Headers to include in the requests.
    """

    base_url: str = field(kw_only=True)
    path: Optional[str] = field(default=None, kw_only=True)
    description: str = field(kw_only=True)
    request_path_params_schema: Optional[str] = field(default=None, kw_only=True)
    request_query_params_schema: Optional[str] = field(default=None, kw_only=True)
    request_body_schema: Optional[str] = field(default=None, kw_only=True)
    response_body_schema: Optional[str] = field(default=None, kw_only=True)
    request_headers: Optional[Dict[str, str]] = field(default=None, kw_only=True)

    @property
    def full_url(self) -> str:
        return self._build_url(self.base_url, path=self.path)

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a put request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """
            ),
            "schema": Schema({Literal("body", description="The request body."): dict}),
        }
    )
    def put(self, params: dict) -> BaseArtifact:
        from requests import put, exceptions

        values = params["values"]
        base_url = self.base_url
        path = self.path
        body = values["body"]
        url = self._build_url(base_url, path=path)

        try:
            response = put(url, json=body, timeout=30, headers=self.request_headers)

            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a patch request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """
            ),
            "schema": Schema(
                {
                    Literal("path_params", description="The request path parameters."): list,
                    Literal("body", description="The request body."): dict,
                }
            ),
        }
    )
    def patch(self, params: dict) -> BaseArtifact:
        from requests import patch, exceptions

        values = params["values"]
        base_url = self.base_url
        path = self.path
        body = values["body"]
        path_params = values["path_params"]
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = patch(url, json=body, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a post request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """
            ),
            "schema": Schema({Literal("body", description="The request body."): dict}),
        }
    )
    def post(self, params: dict) -> BaseArtifact:
        from requests import post, exceptions

        values = params["values"]
        base_url = self.base_url
        path = self.path
        url = self._build_url(base_url, path=path)
        body = values["body"]

        try:
            response = post(url, json=body, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a get request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """
            ),
            "schema": schema.Optional(
                Schema(
                    {
                        schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                        schema.Optional(Literal("path_params", description="The request path parameters.")): list,
                    }
                )
            ),
        }
    )
    def get(self, params: dict) -> BaseArtifact:
        from requests import get, exceptions

        values = params["values"]
        base_url = self.base_url
        path = self.path

        query_params = {}
        path_params = []
        if values:
            query_params = values.get("query_params", {})
            path_params = values.get("path_params", [])
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = get(url, params=query_params, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a delete request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                """
            ),
            "schema": Schema(
                {
                    schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                    schema.Optional(Literal("path_params", description="The request path parameters.")): list,
                }
            ),
        }
    )
    def delete(self, params: dict) -> BaseArtifact:
        from requests import delete, exceptions

        values = params["values"]
        base_url = self.base_url
        path = self.path

        query_params = values.get("query_params", {})
        path_params = values.get("path_params", [])
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = delete(url, params=query_params, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    def _build_url(self, base_url, path=None, path_params=None):
        url = ""

        if path:
            url += path.strip("/")

        if path_params:
            url += f'/{str.join("/", map(str, path_params))}'

        return urljoin(base_url.strip("/"), url)

base_url: str = field(kw_only=True) class-attribute instance-attribute

description: str = field(kw_only=True) class-attribute instance-attribute

full_url: str property

path: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

request_body_schema: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

request_headers: Optional[Dict[str, str]] = field(default=None, kw_only=True) class-attribute instance-attribute

request_path_params_schema: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

request_query_params_schema: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

response_body_schema: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

delete(params)

Source code in griptape/tools/rest_api_client/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a delete request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            """
        ),
        "schema": Schema(
            {
                schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                schema.Optional(Literal("path_params", description="The request path parameters.")): list,
            }
        ),
    }
)
def delete(self, params: dict) -> BaseArtifact:
    from requests import delete, exceptions

    values = params["values"]
    base_url = self.base_url
    path = self.path

    query_params = values.get("query_params", {})
    path_params = values.get("path_params", [])
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = delete(url, params=query_params, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

get(params)

Source code in griptape/tools/rest_api_client/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a get request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """
        ),
        "schema": schema.Optional(
            Schema(
                {
                    schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                    schema.Optional(Literal("path_params", description="The request path parameters.")): list,
                }
            )
        ),
    }
)
def get(self, params: dict) -> BaseArtifact:
    from requests import get, exceptions

    values = params["values"]
    base_url = self.base_url
    path = self.path

    query_params = {}
    path_params = []
    if values:
        query_params = values.get("query_params", {})
        path_params = values.get("path_params", [])
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = get(url, params=query_params, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

patch(params)

Source code in griptape/tools/rest_api_client/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a patch request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """
        ),
        "schema": Schema(
            {
                Literal("path_params", description="The request path parameters."): list,
                Literal("body", description="The request body."): dict,
            }
        ),
    }
)
def patch(self, params: dict) -> BaseArtifact:
    from requests import patch, exceptions

    values = params["values"]
    base_url = self.base_url
    path = self.path
    body = values["body"]
    path_params = values["path_params"]
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = patch(url, json=body, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

post(params)

Source code in griptape/tools/rest_api_client/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a post request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """
        ),
        "schema": Schema({Literal("body", description="The request body."): dict}),
    }
)
def post(self, params: dict) -> BaseArtifact:
    from requests import post, exceptions

    values = params["values"]
    base_url = self.base_url
    path = self.path
    url = self._build_url(base_url, path=path)
    body = values["body"]

    try:
        response = post(url, json=body, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

put(params)

Source code in griptape/tools/rest_api_client/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a put request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """
        ),
        "schema": Schema({Literal("body", description="The request body."): dict}),
    }
)
def put(self, params: dict) -> BaseArtifact:
    from requests import put, exceptions

    values = params["values"]
    base_url = self.base_url
    path = self.path
    body = values["body"]
    url = self._build_url(base_url, path=path)

    try:
        response = put(url, json=body, timeout=30, headers=self.request_headers)

        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

SqlClient

Bases: BaseTool

Source code in griptape/tools/sql_client/tool.py
@define
class SqlClient(BaseTool):
    sql_loader: SqlLoader = field(kw_only=True)
    schema_name: Optional[str] = field(default=None, kw_only=True)
    table_name: str = field(kw_only=True)
    table_description: Optional[str] = field(default=None, kw_only=True)
    engine_name: Optional[str] = field(default=None, kw_only=True)

    @property
    def full_table_name(self) -> str:
        return f"{self.schema_name}.{self.table_name}" if self.schema_name else self.table_name

    @property
    def table_schema(self) -> Optional[str]:
        return self.sql_loader.sql_driver.get_table_schema(self.full_table_name, schema=self.schema_name)

    @activity(
        config={
            "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries "
            "in table {{ _self.full_table_name }}. "
            "Make sure the `SELECT` statement contains enough columns to get an answer without knowing "
            "the original question. "
            "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions "
            "to get better results. "
            "You can use JOINs if more tables are available in other tools.\n"
            "{{ _self.table_name }} schema: {{ _self.table_schema }}\n"
            "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}",
            "schema": Schema({"sql_query": str}),
        }
    )
    def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
        try:
            query = params["values"]["sql_query"]
            rows = self.sql_loader.load(query)
        except Exception as e:
            return ErrorArtifact(f"error executing query: {e}")

        if len(rows) > 0:
            return ListArtifact(rows)
        else:
            return InfoArtifact("No results found")

engine_name: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

full_table_name: str property

schema_name: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

sql_loader: SqlLoader = field(kw_only=True) class-attribute instance-attribute

table_description: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

table_name: str = field(kw_only=True) class-attribute instance-attribute

table_schema: Optional[str] property

execute_query(params)

Source code in griptape/tools/sql_client/tool.py
@activity(
    config={
        "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries "
        "in table {{ _self.full_table_name }}. "
        "Make sure the `SELECT` statement contains enough columns to get an answer without knowing "
        "the original question. "
        "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions "
        "to get better results. "
        "You can use JOINs if more tables are available in other tools.\n"
        "{{ _self.table_name }} schema: {{ _self.table_schema }}\n"
        "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}",
        "schema": Schema({"sql_query": str}),
    }
)
def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
    try:
        query = params["values"]["sql_query"]
        rows = self.sql_loader.load(query)
    except Exception as e:
        return ErrorArtifact(f"error executing query: {e}")

    if len(rows) > 0:
        return ListArtifact(rows)
    else:
        return InfoArtifact("No results found")

StructureRunClient

Bases: BaseTool

Attributes:

Name Type Description
description str

A description of what the Structure does.

driver BaseStructureRunDriver

Driver to run the Structure.

Source code in griptape/tools/structure_run_client/tool.py
@define
class StructureRunClient(BaseTool):
    """
    Attributes:
        description: A description of what the Structure does.
        driver: Driver to run the Structure.
    """

    description: str = field(kw_only=True)
    driver: BaseStructureRunDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to run a Griptape Structure with the following description: {{ self.description }}",
            "schema": Schema(
                {Literal("args", description="A list of string arguments to submit to the Structure Run"): list}
            ),
        }
    )
    def run_structure(self, params: dict) -> BaseArtifact:
        args: list[str] = params["values"]["args"]

        return self.driver.run(*[TextArtifact(arg) for arg in args])

description: str = field(kw_only=True) class-attribute instance-attribute

driver: BaseStructureRunDriver = field(kw_only=True) class-attribute instance-attribute

run_structure(params)

Source code in griptape/tools/structure_run_client/tool.py
@activity(
    config={
        "description": "Can be used to run a Griptape Structure with the following description: {{ self.description }}",
        "schema": Schema(
            {Literal("args", description="A list of string arguments to submit to the Structure Run"): list}
        ),
    }
)
def run_structure(self, params: dict) -> BaseArtifact:
    args: list[str] = params["values"]["args"]

    return self.driver.run(*[TextArtifact(arg) for arg in args])

TaskMemoryClient

Bases: BaseTool

Source code in griptape/tools/task_memory_client/tool.py
@define
class TaskMemoryClient(BaseTool):
    off_prompt: bool = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to summarize memory content",
            "schema": Schema({"memory_name": str, "artifact_namespace": str}),
        }
    )
    def summarize(self, params: dict) -> TextArtifact | InfoArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]

        if memory:
            return memory.summarize_namespace(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

    @activity(
        config={
            "description": "Can be used to search and query memory content",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    Literal(
                        "query",
                        description="A natural language search query in the form of a question with enough "
                        "contextual information for another person to understand what the query is about",
                    ): str,
                }
            ),
        }
    )
    def query(self, params: dict) -> TextArtifact | InfoArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        query = params["values"]["query"]

        if memory:
            return memory.query_namespace(namespace=artifact_namespace, query=query)
        else:
            return ErrorArtifact("memory not found")

off_prompt: bool = field(kw_only=True) class-attribute instance-attribute

query(params)

Source code in griptape/tools/task_memory_client/tool.py
@activity(
    config={
        "description": "Can be used to search and query memory content",
        "schema": Schema(
            {
                "memory_name": str,
                "artifact_namespace": str,
                Literal(
                    "query",
                    description="A natural language search query in the form of a question with enough "
                    "contextual information for another person to understand what the query is about",
                ): str,
            }
        ),
    }
)
def query(self, params: dict) -> TextArtifact | InfoArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    query = params["values"]["query"]

    if memory:
        return memory.query_namespace(namespace=artifact_namespace, query=query)
    else:
        return ErrorArtifact("memory not found")

summarize(params)

Source code in griptape/tools/task_memory_client/tool.py
@activity(
    config={
        "description": "Can be used to summarize memory content",
        "schema": Schema({"memory_name": str, "artifact_namespace": str}),
    }
)
def summarize(self, params: dict) -> TextArtifact | InfoArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]

    if memory:
        return memory.summarize_namespace(artifact_namespace)
    else:
        return ErrorArtifact("memory not found")

VariationImageGenerationClient

Bases: ImageArtifactFileOutputMixin, BaseTool

A tool that can be used to generate prompted variations of an image.

Attributes:

Name Type Description
engine VariationImageGenerationEngine

The variation image generation engine used to generate the image.

output_dir VariationImageGenerationEngine

If provided, the generated image will be written to disk in output_dir.

output_file VariationImageGenerationEngine

If provided, the generated image will be written to disk as output_file.

Source code in griptape/tools/variation_image_generation_client/tool.py
@define
class VariationImageGenerationClient(ImageArtifactFileOutputMixin, BaseTool):
    """A tool that can be used to generate prompted variations of an image.

    Attributes:
        engine: The variation image generation engine used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    engine: VariationImageGenerationEngine = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Can be used to generate a variation of a given input image file.",
            "schema": Schema(
                {
                    Literal(
                        "prompts",
                        description="A detailed list of features and descriptions to include in the generated image.",
                    ): list[str],
                    Literal(
                        "negative_prompts",
                        description="A detailed list of features and descriptions to avoid in the generated image.",
                    ): list[str],
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                }
            ),
        }
    )
    def image_variation_from_file(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
        prompts = params["values"]["prompts"]
        negative_prompts = params["values"]["negative_prompts"]
        image_file = params["values"]["image_file"]

        image_artifact = self.image_loader.load(image_file)
        if isinstance(image_artifact, ErrorArtifact):
            return image_artifact

        return self._generate_variation(prompts, negative_prompts, image_artifact)

    @activity(
        config={
            "description": "Can be used to generate a variation of a given input image artifact in memory.",
            "schema": Schema(
                {
                    Literal(
                        "prompts",
                        description="A detailed list of features and descriptions to include in the generated image.",
                    ): list[str],
                    Literal(
                        "negative_prompts",
                        description="A detailed list of features and descriptions to avoid in the generated image.",
                    ): list[str],
                    "memory_name": str,
                    "artifact_namespace": str,
                    "artifact_name": str,
                }
            ),
        }
    )
    def image_variation_from_memory(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
        prompts = params["values"]["prompts"]
        negative_prompts = params["values"]["negative_prompts"]
        artifact_namespace = params["values"]["artifact_namespace"]
        artifact_name = params["values"]["artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact)
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_variation(prompts, negative_prompts, cast(ImageArtifact, image_artifact))

    def _generate_variation(self, prompts: list[str], negative_prompts: list[str], image_artifact: ImageArtifact):
        output_artifact = self.engine.run(prompts=prompts, negative_prompts=negative_prompts, image=image_artifact)

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact

engine: VariationImageGenerationEngine = field(kw_only=True) class-attribute instance-attribute

image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

image_variation_from_file(params)

Source code in griptape/tools/variation_image_generation_client/tool.py
@activity(
    config={
        "description": "Can be used to generate a variation of a given input image file.",
        "schema": Schema(
            {
                Literal(
                    "prompts",
                    description="A detailed list of features and descriptions to include in the generated image.",
                ): list[str],
                Literal(
                    "negative_prompts",
                    description="A detailed list of features and descriptions to avoid in the generated image.",
                ): list[str],
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
            }
        ),
    }
)
def image_variation_from_file(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
    prompts = params["values"]["prompts"]
    negative_prompts = params["values"]["negative_prompts"]
    image_file = params["values"]["image_file"]

    image_artifact = self.image_loader.load(image_file)
    if isinstance(image_artifact, ErrorArtifact):
        return image_artifact

    return self._generate_variation(prompts, negative_prompts, image_artifact)

image_variation_from_memory(params)

Source code in griptape/tools/variation_image_generation_client/tool.py
@activity(
    config={
        "description": "Can be used to generate a variation of a given input image artifact in memory.",
        "schema": Schema(
            {
                Literal(
                    "prompts",
                    description="A detailed list of features and descriptions to include in the generated image.",
                ): list[str],
                Literal(
                    "negative_prompts",
                    description="A detailed list of features and descriptions to avoid in the generated image.",
                ): list[str],
                "memory_name": str,
                "artifact_namespace": str,
                "artifact_name": str,
            }
        ),
    }
)
def image_variation_from_memory(self, params: dict[str, Any]) -> ImageArtifact | ErrorArtifact:
    prompts = params["values"]["prompts"]
    negative_prompts = params["values"]["negative_prompts"]
    artifact_namespace = params["values"]["artifact_namespace"]
    artifact_name = params["values"]["artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact)
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_variation(prompts, negative_prompts, cast(ImageArtifact, image_artifact))

VectorStoreClient

Bases: BaseTool

Attributes:

Name Type Description
description str

LLM-friendly vector DB description.

namespace Optional[str]

Vector storage namespace.

query_engine VectorQueryEngine

BaseQueryEngine.

top_n int

Max number of results returned for the query engine query.

Source code in griptape/tools/vector_store_client/tool.py
@define
class VectorStoreClient(BaseTool):
    """
    Attributes:
        description: LLM-friendly vector DB description.
        namespace: Vector storage namespace.
        query_engine: `BaseQueryEngine`.
        top_n: Max number of results returned for the query engine query.
    """

    DEFAULT_TOP_N = 5

    description: str = field(kw_only=True)
    query_engine: VectorQueryEngine = field(kw_only=True)
    top_n: int = field(default=DEFAULT_TOP_N, kw_only=True)
    namespace: Optional[str] = field(default=None, kw_only=True)

    @activity(
        config={
            "description": "Can be used to search a vector database with the following description: {{ _self.description }}",
            "schema": Schema(
                {
                    Literal(
                        "query", description="A natural language search query to run against the vector database"
                    ): str
                }
            ),
        }
    )
    def search(self, params: dict) -> BaseArtifact:
        query = params["values"]["query"]

        try:
            return self.query_engine.query(query, top_n=self.top_n, namespace=self.namespace)
        except Exception as e:
            return ErrorArtifact(f"error querying vector store: {e}")

DEFAULT_TOP_N = 5 class-attribute instance-attribute

description: str = field(kw_only=True) class-attribute instance-attribute

namespace: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

query_engine: VectorQueryEngine = field(kw_only=True) class-attribute instance-attribute

top_n: int = field(default=DEFAULT_TOP_N, kw_only=True) class-attribute instance-attribute

search(params)

Source code in griptape/tools/vector_store_client/tool.py
@activity(
    config={
        "description": "Can be used to search a vector database with the following description: {{ _self.description }}",
        "schema": Schema(
            {
                Literal(
                    "query", description="A natural language search query to run against the vector database"
                ): str
            }
        ),
    }
)
def search(self, params: dict) -> BaseArtifact:
    query = params["values"]["query"]

    try:
        return self.query_engine.query(query, top_n=self.top_n, namespace=self.namespace)
    except Exception as e:
        return ErrorArtifact(f"error querying vector store: {e}")

WebScraper

Bases: BaseTool

Source code in griptape/tools/web_scraper/tool.py
@define
class WebScraper(BaseTool):
    web_loader: WebLoader = field(default=Factory(lambda: WebLoader()), kw_only=True)

    @activity(
        config={
            "description": "Can be used to browse a web page and load its content",
            "schema": Schema({Literal("url", description="Valid HTTP URL"): str}),
        }
    )
    def get_content(self, params: dict) -> ListArtifact | ErrorArtifact:
        url = params["values"]["url"]

        try:
            result = self.web_loader.load(url)
            if isinstance(result, ErrorArtifact):
                return result
            else:
                return ListArtifact(result)
        except Exception as e:
            return ErrorArtifact("Error getting page content: " + str(e))

web_loader: WebLoader = field(default=Factory(lambda: WebLoader()), kw_only=True) class-attribute instance-attribute

get_content(params)

Source code in griptape/tools/web_scraper/tool.py
@activity(
    config={
        "description": "Can be used to browse a web page and load its content",
        "schema": Schema({Literal("url", description="Valid HTTP URL"): str}),
    }
)
def get_content(self, params: dict) -> ListArtifact | ErrorArtifact:
    url = params["values"]["url"]

    try:
        result = self.web_loader.load(url)
        if isinstance(result, ErrorArtifact):
            return result
        else:
            return ListArtifact(result)
    except Exception as e:
        return ErrorArtifact("Error getting page content: " + str(e))

WebSearch

Bases: BaseTool

Source code in griptape/tools/web_search/tool.py
@define
class WebSearch(BaseTool):
    results_count: int = field(default=5, kw_only=True)
    google_api_lang: str = field(default="lang_en", kw_only=True)
    google_api_key: str = field(kw_only=True)
    google_api_search_id: str = field(kw_only=True)
    google_api_country: str = field(default="us", kw_only=True)

    @activity(
        config={
            "description": "Can be used for searching the web",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="Search engine request that returns a list of pages with titles, descriptions, and URLs",
                    ): str
                }
            ),
        }
    )
    def search(self, props: dict) -> ListArtifact | ErrorArtifact:
        query = props["values"]["query"]

        try:
            return ListArtifact([TextArtifact(str(result)) for result in self._search_google(query)])
        except Exception as e:
            return ErrorArtifact(f"error searching Google: {e}")

    def _search_google(self, query: str) -> list[dict]:
        url = (
            f"https://www.googleapis.com/customsearch/v1?"
            f"key={self.google_api_key}&"
            f"cx={self.google_api_search_id}&"
            f"q={query}&"
            f"start=0&"
            f"lr={self.google_api_lang}&"
            f"num={self.results_count}&"
            f"gl={self.google_api_country}"
        )
        response = requests.get(url)

        if response.status_code == 200:
            data = response.json()

            links = [{"url": r["link"], "title": r["title"], "description": r["snippet"]} for r in data["items"]]

            return links
        else:
            raise Exception(
                f"Google Search API returned an error with status code "
                f"{response.status_code} and reason '{response.reason}'"
            )

google_api_country: str = field(default='us', kw_only=True) class-attribute instance-attribute

google_api_key: str = field(kw_only=True) class-attribute instance-attribute

google_api_lang: str = field(default='lang_en', kw_only=True) class-attribute instance-attribute

google_api_search_id: str = field(kw_only=True) class-attribute instance-attribute

results_count: int = field(default=5, kw_only=True) class-attribute instance-attribute

search(props)

Source code in griptape/tools/web_search/tool.py
@activity(
    config={
        "description": "Can be used for searching the web",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="Search engine request that returns a list of pages with titles, descriptions, and URLs",
                ): str
            }
        ),
    }
)
def search(self, props: dict) -> ListArtifact | ErrorArtifact:
    query = props["values"]["query"]

    try:
        return ListArtifact([TextArtifact(str(result)) for result in self._search_google(query)])
    except Exception as e:
        return ErrorArtifact(f"error searching Google: {e}")