Skip to content

Tools

__all__ = ['BaseTool', 'BaseAwsClient', 'AwsIamClient', 'AwsS3Client', 'BaseGoogleClient', 'GoogleGmailClient', 'GoogleDocsClient', 'GoogleCalendarClient', 'GoogleDriveClient', 'Calculator', 'WebSearch', 'WebScraper', 'SqlClient', 'EmailClient', 'RestApiClient', 'FileManager', 'VectorStoreClient', 'DateTime', 'TaskMemoryClient', 'Computer', 'OpenWeatherClient', 'PromptImageGenerationClient', 'VariationImageGenerationClient', 'InpaintingImageGenerationClient', 'OutpaintingImageGenerationClient'] module-attribute

AwsIamClient

Bases: BaseAwsClient

Source code in griptape/griptape/tools/aws_iam_client/tool.py
@define
class AwsIamClient(BaseAwsClient):
    iam_client: boto3.client = field(
        default=Factory(lambda self: self.session.client("iam"), takes_self=True), kw_only=True
    )

    @activity(
        config={
            "description": "Can be use to get a policy for an AWS IAM user.",
            "schema": Schema(
                {
                    Literal("user_name", description="Username of the AWS IAM user."): str,
                    Literal(
                        "policy_name",
                        description="PolicyName of the AWS IAM Policy embedded in the specified IAM user.",
                    ): str,
                }
            ),
        }
    )
    def get_user_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            policy = self.iam_client.get_user_policy(
                UserName=params["values"]["user_name"], PolicyName=params["values"]["policy_name"]
            )
            return TextArtifact(policy["PolicyDocument"])
        except Exception as e:
            return ErrorArtifact(f"error returning policy document: {e}")

    @activity(config={"description": "Can be used to list AWS MFA Devices"})
    def list_mfa_devices(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            devices = self.iam_client.list_mfa_devices()
            return ListArtifact([TextArtifact(str(d)) for d in devices["MFADevices"]])
        except Exception as e:
            return ErrorArtifact(f"error listing mfa devices: {e}")

    @activity(
        config={
            "description": "Can be used to list policies for a given IAM user.",
            "schema": Schema(
                {Literal("user_name", description="Username of the AWS IAM user for which to list policies."): str}
            ),
        }
    )
    def list_user_policies(self, params: dict) -> ListArtifact | ErrorArtifact:
        try:
            policies = self.iam_client.list_user_policies(UserName=params["values"]["user_name"])
            policy_names = policies["PolicyNames"]

            attached_policies = self.iam_client.list_attached_user_policies(UserName=params["values"]["user_name"])
            attached_policy_names = [p["PolicyName"] for p in attached_policies["AttachedPolicies"]]

            return ListArtifact([TextArtifact(str(p)) for p in policy_names + attached_policy_names])
        except Exception as e:
            return ErrorArtifact(f"error listing iam user policies: {e}")

    @activity(config={"description": "Can be used to list AWS IAM users."})
    def list_users(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            users = self.iam_client.list_users()
            return ListArtifact([TextArtifact(str(u)) for u in users["Users"]])
        except Exception as e:
            return ErrorArtifact(f"error listing s3 users: {e}")

iam_client: boto3.client = field(default=Factory(lambda : self.session.client('iam'), takes_self=True), kw_only=True) class-attribute instance-attribute

get_user_policy(params)

Source code in griptape/griptape/tools/aws_iam_client/tool.py
@activity(
    config={
        "description": "Can be use to get a policy for an AWS IAM user.",
        "schema": Schema(
            {
                Literal("user_name", description="Username of the AWS IAM user."): str,
                Literal(
                    "policy_name",
                    description="PolicyName of the AWS IAM Policy embedded in the specified IAM user.",
                ): str,
            }
        ),
    }
)
def get_user_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        policy = self.iam_client.get_user_policy(
            UserName=params["values"]["user_name"], PolicyName=params["values"]["policy_name"]
        )
        return TextArtifact(policy["PolicyDocument"])
    except Exception as e:
        return ErrorArtifact(f"error returning policy document: {e}")

list_mfa_devices(_)

Source code in griptape/griptape/tools/aws_iam_client/tool.py
@activity(config={"description": "Can be used to list AWS MFA Devices"})
def list_mfa_devices(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        devices = self.iam_client.list_mfa_devices()
        return ListArtifact([TextArtifact(str(d)) for d in devices["MFADevices"]])
    except Exception as e:
        return ErrorArtifact(f"error listing mfa devices: {e}")

list_user_policies(params)

Source code in griptape/griptape/tools/aws_iam_client/tool.py
@activity(
    config={
        "description": "Can be used to list policies for a given IAM user.",
        "schema": Schema(
            {Literal("user_name", description="Username of the AWS IAM user for which to list policies."): str}
        ),
    }
)
def list_user_policies(self, params: dict) -> ListArtifact | ErrorArtifact:
    try:
        policies = self.iam_client.list_user_policies(UserName=params["values"]["user_name"])
        policy_names = policies["PolicyNames"]

        attached_policies = self.iam_client.list_attached_user_policies(UserName=params["values"]["user_name"])
        attached_policy_names = [p["PolicyName"] for p in attached_policies["AttachedPolicies"]]

        return ListArtifact([TextArtifact(str(p)) for p in policy_names + attached_policy_names])
    except Exception as e:
        return ErrorArtifact(f"error listing iam user policies: {e}")

list_users(_)

Source code in griptape/griptape/tools/aws_iam_client/tool.py
@activity(config={"description": "Can be used to list AWS IAM users."})
def list_users(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        users = self.iam_client.list_users()
        return ListArtifact([TextArtifact(str(u)) for u in users["Users"]])
    except Exception as e:
        return ErrorArtifact(f"error listing s3 users: {e}")

AwsS3Client

Bases: BaseAwsClient

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@define
class AwsS3Client(BaseAwsClient):
    s3_client: boto3.client = field(
        default=Factory(lambda self: self.session.client("s3"), takes_self=True), kw_only=True
    )

    @activity(
        config={
            "description": "Can be used to get an access control list (ACL) of an AWS S3 bucket.",
            "schema": Schema(
                {
                    Literal(
                        "bucket_name",
                        description="The bucket name that contains the object for which to get the ACL information.",
                    ): str
                }
            ),
        }
    )
    def get_bucket_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            acl = self.s3_client.get_bucket_acl(Bucket=params["values"]["bucket_name"])
            return TextArtifact(acl)
        except Exception as e:
            return ErrorArtifact(f"error getting bucket acl: {e}")

    @activity(
        config={
            "description": "Can be used to get an AWS S3 bucket policy.",
            "schema": Schema(
                {Literal("bucket_name", description="The bucket name for which to get the bucket policy."): str}
            ),
        }
    )
    def get_bucket_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            policy = self.s3_client.get_bucket_policy(Bucket=params["values"]["bucket_name"])
            return TextArtifact(policy)
        except Exception as e:
            return ErrorArtifact(f"error getting bucket policy: {e}")

    @activity(
        config={
            "description": "Can be used to get an access control list (ACL) of an object in the AWS S3 bucket.",
            "schema": Schema(
                {
                    Literal("bucket_name", description="Name of the AWS S3 bucket for which to get an ACL."): str,
                    Literal("object_key", description="Key of the object for which to get the ACL information."): str,
                }
            ),
        }
    )
    def get_object_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            acl = self.s3_client.get_object_acl(
                Bucket=params["values"]["bucket_name"], Key=params["values"]["object_key"]
            )
            return TextArtifact(acl)
        except Exception as e:
            return ErrorArtifact(f"error getting object acl: {e}")

    @activity(config={"description": "Can be used to list all AWS S3 buckets."})
    def list_s3_buckets(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            buckets = self.s3_client.list_buckets()

            return ListArtifact([TextArtifact(str(b)) for b in buckets["Buckets"]])
        except Exception as e:
            return ErrorArtifact(f"error listing s3 buckets: {e}")

    @activity(
        config={
            "description": "Can be used to list all objects in an AWS S3 bucket.",
            "schema": Schema({Literal("bucket_name", description="The name of the S3 bucket to list."): str}),
        }
    )
    def list_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
        try:
            objects = self.s3_client.list_objects_v2(Bucket=params["values"]["bucket_name"])

            return ListArtifact([TextArtifact(str(o)) for o in objects["Contents"]])
        except Exception as e:
            return ErrorArtifact(f"error listing objects in bucket: {e}")

    @activity(
        config={
            "description": "Can be used to upload memory artifacts to an AWS S3 bucket",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    "bucket_name": str,
                    Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                }
            ),
        }
    )
    def upload_memory_artifacts_to_s3(self, params: dict) -> InfoArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        bucket_name = params["values"]["bucket_name"]
        object_key = params["values"]["object_key"]

        if memory:
            artifacts = memory.load_artifacts(artifact_namespace)

            if len(artifacts) == 0:
                return ErrorArtifact("no artifacts found")
            elif len(artifacts) == 1:
                try:
                    self._upload_object(bucket_name, object_key, artifacts.value[0].value)

                    return InfoArtifact(f"uploaded successfully")
                except Exception as e:
                    return ErrorArtifact(f"error uploading objects to the bucket: {e}")
            else:
                try:
                    for a in artifacts.value:
                        self._upload_object(bucket_name, object_key, a.value)

                    return InfoArtifact(f"uploaded successfully")
                except Exception as e:
                    return ErrorArtifact(f"error uploading objects to the bucket: {e}")
        else:
            return ErrorArtifact("memory not found")

    @activity(
        config={
            "description": "Can be used to upload content to an AWS S3 bucket",
            "schema": Schema(
                {
                    "bucket_name": str,
                    Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                    "content": str,
                }
            ),
        }
    )
    def upload_content_to_s3(self, params: dict) -> ErrorArtifact | InfoArtifact:
        content = params["values"]["content"]
        bucket_name = params["values"]["bucket_name"]
        object_key = params["values"]["object_key"]

        try:
            self._upload_object(bucket_name, object_key, content)

            return InfoArtifact(f"uploaded successfully")
        except Exception as e:
            return ErrorArtifact(f"error uploading objects to the bucket: {e}")

    @activity(
        config={
            "description": "Can be used to download objects from AWS S3",
            "schema": Schema(
                {
                    Literal("objects", description="A list of bucket name and object key pairs to download"): [
                        {
                            Literal(
                                "bucket_name", description="The name of the bucket to download the object from"
                            ): str,
                            Literal(
                                "object_key", description="The name of the object key to download from the bucket"
                            ): str,
                        }
                    ]
                }
            ),
        }
    )
    def download_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
        objects = params["values"]["objects"]
        artifact = ListArtifact()
        for object_info in objects:
            try:
                obj = self.s3_client.get_object(Bucket=object_info["bucket_name"], Key=object_info["object_key"])

                content = obj["Body"].read()
                artifact.value.append(BlobArtifact(content))

            except Exception as e:
                return ErrorArtifact(f"error downloading objects from bucket: {e}")

        return artifact

    def _upload_object(self, bucket_name: str, object_name: str, value: any) -> None:
        self.s3_client.create_bucket(Bucket=bucket_name)

        self.s3_client.upload_fileobj(
            Fileobj=io.BytesIO(value.encode() if isinstance(value, str) else value), Bucket=bucket_name, Key=object_name
        )

s3_client: boto3.client = field(default=Factory(lambda : self.session.client('s3'), takes_self=True), kw_only=True) class-attribute instance-attribute

download_objects(params)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to download objects from AWS S3",
        "schema": Schema(
            {
                Literal("objects", description="A list of bucket name and object key pairs to download"): [
                    {
                        Literal(
                            "bucket_name", description="The name of the bucket to download the object from"
                        ): str,
                        Literal(
                            "object_key", description="The name of the object key to download from the bucket"
                        ): str,
                    }
                ]
            }
        ),
    }
)
def download_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
    objects = params["values"]["objects"]
    artifact = ListArtifact()
    for object_info in objects:
        try:
            obj = self.s3_client.get_object(Bucket=object_info["bucket_name"], Key=object_info["object_key"])

            content = obj["Body"].read()
            artifact.value.append(BlobArtifact(content))

        except Exception as e:
            return ErrorArtifact(f"error downloading objects from bucket: {e}")

    return artifact

get_bucket_acl(params)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an access control list (ACL) of an AWS S3 bucket.",
        "schema": Schema(
            {
                Literal(
                    "bucket_name",
                    description="The bucket name that contains the object for which to get the ACL information.",
                ): str
            }
        ),
    }
)
def get_bucket_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        acl = self.s3_client.get_bucket_acl(Bucket=params["values"]["bucket_name"])
        return TextArtifact(acl)
    except Exception as e:
        return ErrorArtifact(f"error getting bucket acl: {e}")

get_bucket_policy(params)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an AWS S3 bucket policy.",
        "schema": Schema(
            {Literal("bucket_name", description="The bucket name for which to get the bucket policy."): str}
        ),
    }
)
def get_bucket_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        policy = self.s3_client.get_bucket_policy(Bucket=params["values"]["bucket_name"])
        return TextArtifact(policy)
    except Exception as e:
        return ErrorArtifact(f"error getting bucket policy: {e}")

get_object_acl(params)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an access control list (ACL) of an object in the AWS S3 bucket.",
        "schema": Schema(
            {
                Literal("bucket_name", description="Name of the AWS S3 bucket for which to get an ACL."): str,
                Literal("object_key", description="Key of the object for which to get the ACL information."): str,
            }
        ),
    }
)
def get_object_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        acl = self.s3_client.get_object_acl(
            Bucket=params["values"]["bucket_name"], Key=params["values"]["object_key"]
        )
        return TextArtifact(acl)
    except Exception as e:
        return ErrorArtifact(f"error getting object acl: {e}")

list_objects(params)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to list all objects in an AWS S3 bucket.",
        "schema": Schema({Literal("bucket_name", description="The name of the S3 bucket to list."): str}),
    }
)
def list_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
    try:
        objects = self.s3_client.list_objects_v2(Bucket=params["values"]["bucket_name"])

        return ListArtifact([TextArtifact(str(o)) for o in objects["Contents"]])
    except Exception as e:
        return ErrorArtifact(f"error listing objects in bucket: {e}")

list_s3_buckets(_)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(config={"description": "Can be used to list all AWS S3 buckets."})
def list_s3_buckets(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        buckets = self.s3_client.list_buckets()

        return ListArtifact([TextArtifact(str(b)) for b in buckets["Buckets"]])
    except Exception as e:
        return ErrorArtifact(f"error listing s3 buckets: {e}")

upload_content_to_s3(params)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to upload content to an AWS S3 bucket",
        "schema": Schema(
            {
                "bucket_name": str,
                Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                "content": str,
            }
        ),
    }
)
def upload_content_to_s3(self, params: dict) -> ErrorArtifact | InfoArtifact:
    content = params["values"]["content"]
    bucket_name = params["values"]["bucket_name"]
    object_key = params["values"]["object_key"]

    try:
        self._upload_object(bucket_name, object_key, content)

        return InfoArtifact(f"uploaded successfully")
    except Exception as e:
        return ErrorArtifact(f"error uploading objects to the bucket: {e}")

upload_memory_artifacts_to_s3(params)

Source code in griptape/griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to upload memory artifacts to an AWS S3 bucket",
        "schema": Schema(
            {
                "memory_name": str,
                "artifact_namespace": str,
                "bucket_name": str,
                Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
            }
        ),
    }
)
def upload_memory_artifacts_to_s3(self, params: dict) -> InfoArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    bucket_name = params["values"]["bucket_name"]
    object_key = params["values"]["object_key"]

    if memory:
        artifacts = memory.load_artifacts(artifact_namespace)

        if len(artifacts) == 0:
            return ErrorArtifact("no artifacts found")
        elif len(artifacts) == 1:
            try:
                self._upload_object(bucket_name, object_key, artifacts.value[0].value)

                return InfoArtifact(f"uploaded successfully")
            except Exception as e:
                return ErrorArtifact(f"error uploading objects to the bucket: {e}")
        else:
            try:
                for a in artifacts.value:
                    self._upload_object(bucket_name, object_key, a.value)

                return InfoArtifact(f"uploaded successfully")
            except Exception as e:
                return ErrorArtifact(f"error uploading objects to the bucket: {e}")
    else:
        return ErrorArtifact("memory not found")

BaseAwsClient

Bases: BaseTool, ABC

Source code in griptape/griptape/tools/base_aws_client.py
@define
class BaseAwsClient(BaseTool, ABC):
    session: boto3.Session = field(kw_only=True)

    @activity(config={"description": "Can be used to get current AWS account and IAM principal."})
    def get_current_aws_identity(self, params: dict) -> BaseArtifact:
        try:
            session = self.session
            sts = session.client("sts")
            return TextArtifact(str(sts.get_caller_identity()))
        except Exception as e:
            return ErrorArtifact(f"error getting current aws caller identity: {e}")

session: boto3.Session = field(kw_only=True) class-attribute instance-attribute

get_current_aws_identity(params)

Source code in griptape/griptape/tools/base_aws_client.py
@activity(config={"description": "Can be used to get current AWS account and IAM principal."})
def get_current_aws_identity(self, params: dict) -> BaseArtifact:
    try:
        session = self.session
        sts = session.client("sts")
        return TextArtifact(str(sts.get_caller_identity()))
    except Exception as e:
        return ErrorArtifact(f"error getting current aws caller identity: {e}")

BaseGoogleClient

Bases: BaseTool, ABC

Source code in griptape/griptape/tools/base_google_client.py
@define
class BaseGoogleClient(BaseTool, ABC):
    DRIVE_FILE_SCOPES = ["https://www.googleapis.com/auth/drive.file"]

    DRIVE_AUTH_SCOPES = ["https://www.googleapis.com/auth/drive"]

    service_account_credentials: dict = field(kw_only=True)

    def _build_client(self, scopes: list[str], service_name: str, version: str, owner_email: str) -> Any:
        from google.oauth2 import service_account
        from googleapiclient.discovery import build

        credentials = service_account.Credentials.from_service_account_info(
            self.service_account_credentials, scopes=scopes
        )

        return build(serviceName=service_name, version=version, credentials=credentials.with_subject(owner_email))

    def _convert_path_to_file_id(self, service: Any, path: str) -> Optional[str]:
        parts = path.split("/")
        current_id = "root"

        for idx, part in enumerate(parts):
            if idx == len(parts) - 1:
                query = f"name='{part}' and '{current_id}' in parents"
            else:
                query = f"name='{part}' and '{current_id}' in parents and mimeType='application/vnd.google-apps.folder'"

            response = service.files().list(q=query).execute()
            files = response.get("files", [])

            if not files:
                if idx != len(parts) - 1:
                    folder_metadata = {
                        "name": part,
                        "mimeType": "application/vnd.google-apps.folder",
                        "parents": [current_id],
                    }
                    folder = service.files().create(body=folder_metadata, fields="id").execute()
                    current_id = folder.get("id")
                else:
                    current_id = None
            else:
                current_id = files[0]["id"]

        return current_id

DRIVE_AUTH_SCOPES = ['https://www.googleapis.com/auth/drive'] class-attribute instance-attribute

DRIVE_FILE_SCOPES = ['https://www.googleapis.com/auth/drive.file'] class-attribute instance-attribute

service_account_credentials: dict = field(kw_only=True) class-attribute instance-attribute

BaseTool

Bases: ActivityMixin, ABC

Abstract class for all tools to inherit from for.

Attributes:

Name Type Description
name str

Tool name.

input_memory list[TaskMemory] | None

TaskMemory available in tool activities. Gets automatically set if None.

output_memory dict[str, list[TaskMemory]] | None

TaskMemory that activities write to be default. Gets automatically set if None.

install_dependencies_on_init bool

Determines whether dependencies from the tool requirements.txt file are installed in init.

dependencies_install_directory str | None

Custom dependency install directory.

verbose bool

Determines whether tool operations (such as dependency installation) should be verbose.

off_prompt bool

Determines whether tool activity output goes to the output memory.

Source code in griptape/griptape/tools/base_tool.py
@define
class BaseTool(ActivityMixin, ABC):
    """Abstract class for all tools to inherit from for.

    Attributes:
        name: Tool name.
        input_memory: TaskMemory available in tool activities. Gets automatically set if None.
        output_memory: TaskMemory that activities write to be default. Gets automatically set if None.
        install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init.
        dependencies_install_directory: Custom dependency install directory.
        verbose: Determines whether tool operations (such as dependency installation) should be verbose.
        off_prompt: Determines whether tool activity output goes to the output memory.
    """

    MANIFEST_FILE = "manifest.yml"
    REQUIREMENTS_FILE = "requirements.txt"

    name: str = field(default=Factory(lambda self: self.class_name, takes_self=True), kw_only=True)
    input_memory: list[TaskMemory] | None = field(default=None, kw_only=True)
    output_memory: dict[str, list[TaskMemory]] | None = field(default=None, kw_only=True)
    install_dependencies_on_init: bool = field(default=True, kw_only=True)
    dependencies_install_directory: str | None = field(default=None, kw_only=True)
    verbose: bool = field(default=False, kw_only=True)
    off_prompt: bool = field(default=True, kw_only=True)

    def __attrs_post_init__(self) -> None:
        if self.install_dependencies_on_init:
            self.install_dependencies(os.environ.copy())

    @output_memory.validator  # pyright: ignore
    def validate_output_memory(self, _, output_memory: dict[str, list[TaskMemory]] | None) -> None:
        if output_memory:
            for activity_name, memory_list in output_memory.items():
                if not self.find_activity(activity_name):
                    raise ValueError(f"activity {activity_name} doesn't exist")

                output_memory_names = [memory.name for memory in memory_list]

                if len(output_memory_names) > len(set(output_memory_names)):
                    raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

    @property
    def class_name(self):
        return self.__class__.__name__

    @property
    def manifest_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.MANIFEST_FILE)

    @property
    def requirements_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE)

    @property
    def manifest(self) -> dict:
        with open(self.manifest_path) as yaml_file:
            return yaml.safe_load(yaml_file)

    @property
    def abs_file_path(self):
        return os.path.abspath(inspect.getfile(self.__class__))

    @property
    def abs_dir_path(self):
        return os.path.dirname(self.abs_file_path)

    # This method has to remain a method and can't be decorated with @property because
    # of the max depth recursion issue in `self.activities`.
    def schema(self) -> dict:
        action_schemas = [
            Schema(
                {
                    Literal("name"): self.name,
                    Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
                    Literal("input"): {"values": activity.config["schema"]} if self.activity_schema(activity) else {},
                }
            )
            for activity in self.activities()
        ]
        full_schema = Schema(Or(*action_schemas), description=f"{self.name} action schema.")

        return full_schema.json_schema(f"{self.name} Action Schema")

    def execute(self, activity: Callable, subtask: ActionSubtask) -> BaseArtifact:
        preprocessed_input = self.before_run(activity, subtask.action_input)
        output = self.run(activity, subtask, preprocessed_input)
        postprocessed_output = self.after_run(activity, subtask, output)

        return postprocessed_output

    def before_run(self, activity: Callable, value: dict | None) -> dict | None:
        return value

    def run(self, activity: Callable, subtask: ActionSubtask, value: dict | None) -> BaseArtifact:
        activity_result = activity(value)

        if isinstance(activity_result, BaseArtifact):
            result = activity_result
        else:
            logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

            result = InfoArtifact(activity_result)

        return result

    def after_run(self, activity: Callable, subtask: ActionSubtask, value: BaseArtifact) -> BaseArtifact:
        if value:
            if self.output_memory:
                for memory in activity.__self__.output_memory.get(activity.name, []):
                    value = memory.process_output(activity, subtask, value)

                if isinstance(value, BaseArtifact):
                    return value
                else:
                    return TextArtifact(str(value))
            else:
                return value
        else:
            return InfoArtifact("Tool returned an empty value")

    def validate(self) -> bool:
        from griptape.utils import ManifestValidator

        if not os.path.exists(self.manifest_path):
            raise Exception(f"{self.MANIFEST_FILE} not found")

        if not os.path.exists(self.requirements_path):
            raise Exception(f"{self.REQUIREMENTS_FILE} not found")

        ManifestValidator().validate(self.manifest)

        return True

    def tool_dir(self):
        class_file = inspect.getfile(self.__class__)

        return os.path.dirname(os.path.abspath(class_file))

    def install_dependencies(self, env: dict[str, str] | None = None) -> None:
        env = env if env else {}

        command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

        if self.dependencies_install_directory is None:
            command.extend(["-U"])
        else:
            command.extend(["-t", self.dependencies_install_directory])

        subprocess.run(
            command,
            env=env,
            cwd=self.tool_dir(),
            stdout=None if self.verbose else subprocess.DEVNULL,
            stderr=None if self.verbose else subprocess.DEVNULL,
        )

    def find_input_memory(self, memory_name: str) -> TaskMemory | None:
        if self.input_memory:
            return next((m for m in self.input_memory if m.name == memory_name), None)
        else:
            return None

MANIFEST_FILE = 'manifest.yml' class-attribute instance-attribute

REQUIREMENTS_FILE = 'requirements.txt' class-attribute instance-attribute

abs_dir_path property

abs_file_path property

class_name property

dependencies_install_directory: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

input_memory: list[TaskMemory] | None = field(default=None, kw_only=True) class-attribute instance-attribute

install_dependencies_on_init: bool = field(default=True, kw_only=True) class-attribute instance-attribute

manifest: dict property

manifest_path: str property

name: str = field(default=Factory(lambda : self.class_name, takes_self=True), kw_only=True) class-attribute instance-attribute

off_prompt: bool = field(default=True, kw_only=True) class-attribute instance-attribute

output_memory: dict[str, list[TaskMemory]] | None = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_path: str property

verbose: bool = field(default=False, kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None:
    if self.install_dependencies_on_init:
        self.install_dependencies(os.environ.copy())

after_run(activity, subtask, value)

Source code in griptape/griptape/tools/base_tool.py
def after_run(self, activity: Callable, subtask: ActionSubtask, value: BaseArtifact) -> BaseArtifact:
    if value:
        if self.output_memory:
            for memory in activity.__self__.output_memory.get(activity.name, []):
                value = memory.process_output(activity, subtask, value)

            if isinstance(value, BaseArtifact):
                return value
            else:
                return TextArtifact(str(value))
        else:
            return value
    else:
        return InfoArtifact("Tool returned an empty value")

before_run(activity, value)

Source code in griptape/griptape/tools/base_tool.py
def before_run(self, activity: Callable, value: dict | None) -> dict | None:
    return value

execute(activity, subtask)

Source code in griptape/griptape/tools/base_tool.py
def execute(self, activity: Callable, subtask: ActionSubtask) -> BaseArtifact:
    preprocessed_input = self.before_run(activity, subtask.action_input)
    output = self.run(activity, subtask, preprocessed_input)
    postprocessed_output = self.after_run(activity, subtask, output)

    return postprocessed_output

find_input_memory(memory_name)

Source code in griptape/griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> TaskMemory | None:
    if self.input_memory:
        return next((m for m in self.input_memory if m.name == memory_name), None)
    else:
        return None

install_dependencies(env=None)

Source code in griptape/griptape/tools/base_tool.py
def install_dependencies(self, env: dict[str, str] | None = None) -> None:
    env = env if env else {}

    command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

    if self.dependencies_install_directory is None:
        command.extend(["-U"])
    else:
        command.extend(["-t", self.dependencies_install_directory])

    subprocess.run(
        command,
        env=env,
        cwd=self.tool_dir(),
        stdout=None if self.verbose else subprocess.DEVNULL,
        stderr=None if self.verbose else subprocess.DEVNULL,
    )

run(activity, subtask, value)

Source code in griptape/griptape/tools/base_tool.py
def run(self, activity: Callable, subtask: ActionSubtask, value: dict | None) -> BaseArtifact:
    activity_result = activity(value)

    if isinstance(activity_result, BaseArtifact):
        result = activity_result
    else:
        logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

        result = InfoArtifact(activity_result)

    return result

schema()

Source code in griptape/griptape/tools/base_tool.py
def schema(self) -> dict:
    action_schemas = [
        Schema(
            {
                Literal("name"): self.name,
                Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
                Literal("input"): {"values": activity.config["schema"]} if self.activity_schema(activity) else {},
            }
        )
        for activity in self.activities()
    ]
    full_schema = Schema(Or(*action_schemas), description=f"{self.name} action schema.")

    return full_schema.json_schema(f"{self.name} Action Schema")

tool_dir()

Source code in griptape/griptape/tools/base_tool.py
def tool_dir(self):
    class_file = inspect.getfile(self.__class__)

    return os.path.dirname(os.path.abspath(class_file))

validate()

Source code in griptape/griptape/tools/base_tool.py
def validate(self) -> bool:
    from griptape.utils import ManifestValidator

    if not os.path.exists(self.manifest_path):
        raise Exception(f"{self.MANIFEST_FILE} not found")

    if not os.path.exists(self.requirements_path):
        raise Exception(f"{self.REQUIREMENTS_FILE} not found")

    ManifestValidator().validate(self.manifest)

    return True

validate_output_memory(_, output_memory)

Source code in griptape/griptape/tools/base_tool.py
@output_memory.validator  # pyright: ignore
def validate_output_memory(self, _, output_memory: dict[str, list[TaskMemory]] | None) -> None:
    if output_memory:
        for activity_name, memory_list in output_memory.items():
            if not self.find_activity(activity_name):
                raise ValueError(f"activity {activity_name} doesn't exist")

            output_memory_names = [memory.name for memory in memory_list]

            if len(output_memory_names) > len(set(output_memory_names)):
                raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

Calculator

Bases: BaseTool

Source code in griptape/griptape/tools/calculator/tool.py
class Calculator(BaseTool):
    @activity(
        config={
            "description": "Can be used for making simple numeric or algebraic calculations in Python",
            "schema": Schema(
                {
                    Literal(
                        "expression",
                        description="Arithmetic expression parsable in pure Python. Single line only. Don't use any "
                        "imports or external libraries",
                    ): str
                }
            ),
        }
    )
    def calculate(self, params: dict) -> BaseArtifact:
        import numexpr

        try:
            expression = params["values"]["expression"]

            return TextArtifact(numexpr.evaluate(expression))
        except Exception as e:
            return ErrorArtifact(f"error calculating: {e}")

calculate(params)

Source code in griptape/griptape/tools/calculator/tool.py
@activity(
    config={
        "description": "Can be used for making simple numeric or algebraic calculations in Python",
        "schema": Schema(
            {
                Literal(
                    "expression",
                    description="Arithmetic expression parsable in pure Python. Single line only. Don't use any "
                    "imports or external libraries",
                ): str
            }
        ),
    }
)
def calculate(self, params: dict) -> BaseArtifact:
    import numexpr

    try:
        expression = params["values"]["expression"]

        return TextArtifact(numexpr.evaluate(expression))
    except Exception as e:
        return ErrorArtifact(f"error calculating: {e}")

Computer

Bases: BaseTool

Source code in griptape/griptape/tools/computer/tool.py
@define
class Computer(BaseTool):
    local_workdir: str | None = field(default=None, kw_only=True)
    container_workdir: str = field(default="/griptape", kw_only=True)
    env_vars: dict = field(factory=dict, kw_only=True)
    dockerfile_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True),
        kw_only=True,
    )
    requirements_txt_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True),
        kw_only=True,
    )
    docker_client: DockerClient = field(
        default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True
    )

    __tempdir: tempfile.TemporaryDirectory | None = field(default=None, kw_only=True)

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()

        if self.local_workdir:
            Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
        else:
            self.__tempdir = tempfile.TemporaryDirectory()
            self.local_workdir = self.__tempdir.name

    @docker_client.validator  # pyright: ignore
    def validate_docker_client(self, _, docker_client: DockerClient) -> None:
        if not docker_client:
            raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

    def install_dependencies(self, env: dict[str, str] | None = None) -> None:
        super().install_dependencies(env)

        self.remove_existing_container(self.container_name(self))
        self.build_image(self)

    @activity(
        config={
            "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
            " files in the file system. If you need to use code output use `print` statements. "
            "You have access to the following external Python libraries: "
            "{{ _self.dependencies() }}",
            "schema": Schema(
                {
                    Literal("code", description="Python code to execute"): str,
                    Literal(
                        "filename", description="name of the file to put the Python code in before executing it"
                    ): str,
                }
            ),
        }
    )
    def execute_code(self, params: dict) -> BaseArtifact:
        code = params["values"]["code"]
        filename = params["values"]["filename"]

        return self.execute_code_in_container(filename, code)

    @activity(
        config={
            "description": "Can be used to execute shell commands in Linux",
            "schema": Schema({Literal("command", description="shell command to execute"): str}),
        }
    )
    def execute_command(self, params: dict) -> BaseArtifact:
        command = params["values"]["command"]

        return self.execute_command_in_container(command)

    def execute_command_in_container(self, command: str) -> BaseArtifact:
        try:
            binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}}

            container = self.docker_client.containers.run(
                self.image_name(self),
                environment=self.env_vars,
                command=command,
                name=self.container_name(self),
                volumes=binds,
                stdout=True,
                stderr=True,
                detach=True,
            )

            container.wait()

            stderr = container.logs(stdout=False, stderr=True).decode().strip()
            stdout = container.logs(stdout=True, stderr=False).decode().strip()

            container.stop()
            container.remove()

            if stderr:
                return ErrorArtifact(stderr)
            else:
                return TextArtifact(stdout)
        except Exception as e:
            return ErrorArtifact(f"error executing command: {e}")

    def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
        container_file_path = os.path.join(self.container_workdir, filename)

        if self.local_workdir:
            tempdir = None
            local_workdir = self.local_workdir
        else:
            tempdir = tempfile.TemporaryDirectory()
            local_workdir = tempdir.name

        local_file_path = os.path.join(local_workdir, filename)

        try:
            with open(local_file_path, "w") as f:
                f.write(code)

            return self.execute_command_in_container(f"python {container_file_path}")
        except Exception as e:
            return ErrorArtifact(f"error executing code: {e}")
        finally:
            if tempdir:
                tempdir.cleanup()

    def default_docker_client(self) -> DockerClient | None:
        try:
            return docker.from_env()
        except Exception as e:
            logging.error(e)

            return None

    def image_name(self, tool: BaseTool) -> str:
        return f"{stringcase.snakecase(tool.name)}_image"

    def container_name(self, tool: BaseTool) -> str:
        return f"{stringcase.snakecase(tool.name)}_container"

    def remove_existing_container(self, name: str) -> None:
        try:
            existing_container = self.docker_client.containers.get(name)
            existing_container.remove(force=True)

            logging.info(f"Removed existing container: {name}")
        except NotFound:
            pass

    def build_image(self, tool: BaseTool) -> None:
        with tempfile.TemporaryDirectory() as temp_dir:
            shutil.copy(self.dockerfile_path, temp_dir)
            shutil.copy(self.requirements_txt_path, temp_dir)

            image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

            response = [line for line in image]

            logging.info(f"Built image: {response[0].short_id}")

    def dependencies(self) -> list[str]:
        with open(self.requirements_txt_path) as file:
            return [line.strip() for line in file.readlines()]

    def __del__(self) -> None:
        if self.__tempdir:
            self.__tempdir.cleanup()

__tempdir: tempfile.TemporaryDirectory | None = field(default=None, kw_only=True) class-attribute instance-attribute

container_workdir: str = field(default='/griptape', kw_only=True) class-attribute instance-attribute

docker_client: DockerClient = field(default=Factory(lambda : self.default_docker_client(), takes_self=True), kw_only=True) class-attribute instance-attribute

dockerfile_path: str = field(default=Factory(lambda : f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True) class-attribute instance-attribute

env_vars: dict = field(factory=dict, kw_only=True) class-attribute instance-attribute

local_workdir: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_txt_path: str = field(default=Factory(lambda : f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()

    if self.local_workdir:
        Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
    else:
        self.__tempdir = tempfile.TemporaryDirectory()
        self.local_workdir = self.__tempdir.name

__del__()

Source code in griptape/griptape/tools/computer/tool.py
def __del__(self) -> None:
    if self.__tempdir:
        self.__tempdir.cleanup()

build_image(tool)

Source code in griptape/griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None:
    with tempfile.TemporaryDirectory() as temp_dir:
        shutil.copy(self.dockerfile_path, temp_dir)
        shutil.copy(self.requirements_txt_path, temp_dir)

        image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

        response = [line for line in image]

        logging.info(f"Built image: {response[0].short_id}")

container_name(tool)

Source code in griptape/griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str:
    return f"{stringcase.snakecase(tool.name)}_container"

default_docker_client()

Source code in griptape/griptape/tools/computer/tool.py
def default_docker_client(self) -> DockerClient | None:
    try:
        return docker.from_env()
    except Exception as e:
        logging.error(e)

        return None

dependencies()

Source code in griptape/griptape/tools/computer/tool.py
def dependencies(self) -> list[str]:
    with open(self.requirements_txt_path) as file:
        return [line.strip() for line in file.readlines()]

execute_code(params)

Source code in griptape/griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
        " files in the file system. If you need to use code output use `print` statements. "
        "You have access to the following external Python libraries: "
        "{{ _self.dependencies() }}",
        "schema": Schema(
            {
                Literal("code", description="Python code to execute"): str,
                Literal(
                    "filename", description="name of the file to put the Python code in before executing it"
                ): str,
            }
        ),
    }
)
def execute_code(self, params: dict) -> BaseArtifact:
    code = params["values"]["code"]
    filename = params["values"]["filename"]

    return self.execute_code_in_container(filename, code)

execute_code_in_container(filename, code)

Source code in griptape/griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
    container_file_path = os.path.join(self.container_workdir, filename)

    if self.local_workdir:
        tempdir = None
        local_workdir = self.local_workdir
    else:
        tempdir = tempfile.TemporaryDirectory()
        local_workdir = tempdir.name

    local_file_path = os.path.join(local_workdir, filename)

    try:
        with open(local_file_path, "w") as f:
            f.write(code)

        return self.execute_command_in_container(f"python {container_file_path}")
    except Exception as e:
        return ErrorArtifact(f"error executing code: {e}")
    finally:
        if tempdir:
            tempdir.cleanup()

execute_command(params)

Source code in griptape/griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute shell commands in Linux",
        "schema": Schema({Literal("command", description="shell command to execute"): str}),
    }
)
def execute_command(self, params: dict) -> BaseArtifact:
    command = params["values"]["command"]

    return self.execute_command_in_container(command)

execute_command_in_container(command)

Source code in griptape/griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact:
    try:
        binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}}

        container = self.docker_client.containers.run(
            self.image_name(self),
            environment=self.env_vars,
            command=command,
            name=self.container_name(self),
            volumes=binds,
            stdout=True,
            stderr=True,
            detach=True,
        )

        container.wait()

        stderr = container.logs(stdout=False, stderr=True).decode().strip()
        stdout = container.logs(stdout=True, stderr=False).decode().strip()

        container.stop()
        container.remove()

        if stderr:
            return ErrorArtifact(stderr)
        else:
            return TextArtifact(stdout)
    except Exception as e:
        return ErrorArtifact(f"error executing command: {e}")

image_name(tool)

Source code in griptape/griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str:
    return f"{stringcase.snakecase(tool.name)}_image"

install_dependencies(env=None)

Source code in griptape/griptape/tools/computer/tool.py
def install_dependencies(self, env: dict[str, str] | None = None) -> None:
    super().install_dependencies(env)

    self.remove_existing_container(self.container_name(self))
    self.build_image(self)

remove_existing_container(name)

Source code in griptape/griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None:
    try:
        existing_container = self.docker_client.containers.get(name)
        existing_container.remove(force=True)

        logging.info(f"Removed existing container: {name}")
    except NotFound:
        pass

validate_docker_client(_, docker_client)

Source code in griptape/griptape/tools/computer/tool.py
@docker_client.validator  # pyright: ignore
def validate_docker_client(self, _, docker_client: DockerClient) -> None:
    if not docker_client:
        raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

DateTime

Bases: BaseTool

Source code in griptape/griptape/tools/date_time/tool.py
class DateTime(BaseTool):
    @activity(config={"description": "Can be used to return current date and time."})
    def get_current_datetime(self, _: dict) -> BaseArtifact:
        try:
            current_datetime = datetime.now()

            return TextArtifact(str(current_datetime))
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

    @activity(
        config={
            "description": "Can be used to return a relative date and time.",
            "schema": Schema(
                {
                    Literal(
                        "relative_date_string",
                        description='Relative date in English. For example, "now EST", "20 minutes ago", '
                        '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                    ): str
                }
            ),
        }
    )
    def get_relative_datetime(self, params: dict) -> BaseArtifact:
        from dateparser import parse

        try:
            date_string = params["values"]["relative_date_string"]
            relative_datetime = parse(date_string)

            if relative_datetime:
                return TextArtifact(str(relative_datetime))
            else:
                return ErrorArtifact("invalid date string")
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

get_current_datetime(_)

Source code in griptape/griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."})
def get_current_datetime(self, _: dict) -> BaseArtifact:
    try:
        current_datetime = datetime.now()

        return TextArtifact(str(current_datetime))
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

get_relative_datetime(params)

Source code in griptape/griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to return a relative date and time.",
        "schema": Schema(
            {
                Literal(
                    "relative_date_string",
                    description='Relative date in English. For example, "now EST", "20 minutes ago", '
                    '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                ): str
            }
        ),
    }
)
def get_relative_datetime(self, params: dict) -> BaseArtifact:
    from dateparser import parse

    try:
        date_string = params["values"]["relative_date_string"]
        relative_datetime = parse(date_string)

        if relative_datetime:
            return TextArtifact(str(relative_datetime))
        else:
            return ErrorArtifact("invalid date string")
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

EmailClient

Bases: BaseTool

Tool for working with email

Attributes:

Name Type Description
username str | None

Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com

password str | None

Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password.

email_max_retrieve_count int | None

Used to limit the number of messages retrieved during any given activities.

smtp_host str | None

Hostname or url of the SMTP server. Example: smtp.gmail.com

smtp_port int | None

Port of the SMTP server. Example: 465

smtp_use_ssl bool

Whether to use SSL when sending email via the SMTP protocol.

smtp_user str | None

Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only.

smtp_password str | None

Password to send email via the SMTP protocol. Overrides password for SMTP only.

imap_url str | None

Hostname or url of the IMAP server. Example: imap.gmail.com

imap_user str | None

Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only.

imap_password str | None

Password to retrieve email via the IMAP protocol. Overrides password for IMAP only.

mailboxes dict[str, str] | None

Descriptions of mailboxes available for retrieving email via the IMAP protocol. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}

email_loader EmailLoader

Used to retrieve email.

Source code in griptape/griptape/tools/email_client/tool.py
@define
class EmailClient(BaseTool):
    """Tool for working with email

    Attributes:
        username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol.
            Example: bender@futurama.com
        password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail,
            this would be an App Password.
        email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities.
        smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com
        smtp_port: Port of the SMTP server. Example: 465
        smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol.
        smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only.
        smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only.
        imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com
        imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only.
        imap_password: Password to retrieve email via the IMAP protocol.  Overrides password for IMAP only.
        mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol.
            Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}
        email_loader: Used to retrieve email.
    """

    username: str | None = field(default=None, kw_only=True)
    password: str | None = field(default=None, kw_only=True)
    email_max_retrieve_count: int | None = field(default=None, kw_only=True)
    smtp_host: str | None = field(default=None, kw_only=True)
    smtp_port: int | None = field(default=None, kw_only=True)
    smtp_use_ssl: bool = field(default=True, kw_only=True)
    smtp_user: str | None = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    smtp_password: str | None = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    imap_url: str | None = field(default=None, kw_only=True)
    imap_user: str | None = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    imap_password: str | None = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    mailboxes: dict[str, str] | None = field(default=None, kw_only=True)
    email_loader: EmailLoader = field(
        default=Factory(
            lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password),
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to retrieve emails."
            "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
            "schema": Schema(
                {
                    Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                    schema.Optional(
                        Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'")
                    ): str,
                    schema.Optional(
                        Literal("search_criteria", description="Optional search criteria to filter emails by key")
                    ): str,
                    schema.Optional(Literal("max_count", description="Optional max email count")): int,
                }
            ),
        }
    )
    def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
        values = params["values"]
        max_count = int(values["max_count"]) if values.get("max_count") else self.email_max_retrieve_count

        return self.email_loader.load(
            EmailLoader.EmailQuery(
                label=values["label"],
                key=values.get("key"),
                search_criteria=values.get("search_criteria"),
                max_count=max_count,
            )
        )

    @activity(
        config={
            "description": "Can be used to send emails",
            "schema": Schema(
                {
                    Literal("to", description="Recipient's email address"): str,
                    Literal("subject", description="Email subject"): str,
                    Literal("body", description="Email body"): str,
                }
            ),
        }
    )
    def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]

        msg = MIMEText(values["body"])
        msg["Subject"] = values["subject"]
        msg["From"] = self.smtp_user
        msg["To"] = values["to"]

        try:
            with self._create_smtp_client() as client:
                client.login(self.smtp_user, self.smtp_password)
                client.sendmail(msg["From"], [msg["To"]], msg.as_string())
                return InfoArtifact("email was successfully sent")
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error sending email: {e}")

    def _create_smtp_client(self) -> smtplib.SMTP | smtplib.SMTP_SSL:
        smtp_host = self.smtp_host
        smtp_port = int(self.smtp_port)

        if self.smtp_use_ssl:
            return smtplib.SMTP_SSL(smtp_host, smtp_port)
        else:
            return smtplib.SMTP(smtp_host, smtp_port)

email_loader: EmailLoader = field(default=Factory(lambda : EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True) class-attribute instance-attribute

email_max_retrieve_count: int | None = field(default=None, kw_only=True) class-attribute instance-attribute

imap_password: str | None = field(default=Factory(lambda : self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

imap_url: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

imap_user: str | None = field(default=Factory(lambda : self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

mailboxes: dict[str, str] | None = field(default=None, kw_only=True) class-attribute instance-attribute

password: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_host: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_password: str | None = field(default=Factory(lambda : self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

smtp_port: int | None = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_use_ssl: bool = field(default=True, kw_only=True) class-attribute instance-attribute

smtp_user: str | None = field(default=Factory(lambda : self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

username: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

retrieve(params)

Source code in griptape/griptape/tools/email_client/tool.py
@activity(
    config={
        "description": "Can be used to retrieve emails."
        "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
        "schema": Schema(
            {
                Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                schema.Optional(
                    Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'")
                ): str,
                schema.Optional(
                    Literal("search_criteria", description="Optional search criteria to filter emails by key")
                ): str,
                schema.Optional(Literal("max_count", description="Optional max email count")): int,
            }
        ),
    }
)
def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
    values = params["values"]
    max_count = int(values["max_count"]) if values.get("max_count") else self.email_max_retrieve_count

    return self.email_loader.load(
        EmailLoader.EmailQuery(
            label=values["label"],
            key=values.get("key"),
            search_criteria=values.get("search_criteria"),
            max_count=max_count,
        )
    )

send(params)

Source code in griptape/griptape/tools/email_client/tool.py
@activity(
    config={
        "description": "Can be used to send emails",
        "schema": Schema(
            {
                Literal("to", description="Recipient's email address"): str,
                Literal("subject", description="Email subject"): str,
                Literal("body", description="Email body"): str,
            }
        ),
    }
)
def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]

    msg = MIMEText(values["body"])
    msg["Subject"] = values["subject"]
    msg["From"] = self.smtp_user
    msg["To"] = values["to"]

    try:
        with self._create_smtp_client() as client:
            client.login(self.smtp_user, self.smtp_password)
            client.sendmail(msg["From"], [msg["To"]], msg.as_string())
            return InfoArtifact("email was successfully sent")
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error sending email: {e}")

FileManager

Bases: BaseTool

FileManager is a tool that can be used to load and save files.

Attributes:

Name Type Description
workdir str

The absolute directory to load files from and save files to.

loaders dict[str, BaseLoader]

Dictionary of file extensions and matching loaders to use when loading files in load_files_from_disk.

default_loader BaseLoader

The loader to use when loading files in load_files_from_disk without any matching loader in loaders.

save_file_encoding str | None

The encoding to use when saving files to disk.

Source code in griptape/griptape/tools/file_manager/tool.py
@define
class FileManager(BaseTool):
    """
    FileManager is a tool that can be used to load and save files.

    Attributes:
        workdir: The absolute directory to load files from and save files to.
        loaders: Dictionary of file extensions and matching loaders to use when loading files in load_files_from_disk.
        default_loader: The loader to use when loading files in load_files_from_disk without any matching loader in `loaders`.
        save_file_encoding: The encoding to use when saving files to disk.
    """

    workdir: str = field(default=Factory(lambda: os.getcwd()), kw_only=True)
    default_loader: BaseLoader = field(default=Factory(lambda: FileLoader()))
    loaders: dict[str, BaseLoader] = field(
        default=Factory(
            lambda: {
                "pdf": PdfLoader(),
                "csv": CsvLoader(),
                "txt": TextLoader(),
                "html": TextLoader(),
                "json": TextLoader(),
                "yaml": TextLoader(),
                "xml": TextLoader(),
            }
        ),
        kw_only=True,
    )
    save_file_encoding: str | None = field(default=None, kw_only=True)

    @workdir.validator  # pyright: ignore
    def validate_workdir(self, _, workdir: str) -> None:
        if not Path(workdir).is_absolute():
            raise ValueError("workdir has to be absolute absolute")

    @activity(
        config={
            "description": "Can be used to load files from disk",
            "schema": Schema(
                {
                    Literal(
                        "paths",
                        description="Paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                    ): []
                }
            ),
        }
    )
    def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
        list_artifact = ListArtifact()

        for path in params["values"]["paths"]:
            full_path = Path(os.path.join(self.workdir, path))
            extension = path.split(".")[-1]
            loader = self.loaders.get(extension) or self.default_loader
            result = loader.load(full_path)

            if isinstance(result, list):
                list_artifact.value.extend(result)
            elif isinstance(result, BaseArtifact):
                list_artifact.value.append(result)
            else:
                logging.warning(f"Unknown loader return type for file {path}")

        return list_artifact

    @activity(
        config={
            "description": "Can be used to save memory artifacts to disk",
            "schema": Schema(
                {
                    Literal(
                        "dir_name",
                        description="Destination directory name on disk in the POSIX format. For example, 'foo/bar'",
                    ): str,
                    Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                }
            ),
        }
    )
    def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        dir_name = params["values"]["dir_name"]
        file_name = params["values"]["file_name"]

        if memory:
            list_artifact = memory.load_artifacts(artifact_namespace)

            if len(list_artifact) == 0:
                return ErrorArtifact("no artifacts found")
            elif len(list_artifact) == 1:
                try:
                    self._save_to_disk(os.path.join(self.workdir, dir_name, file_name), list_artifact.value[0].value)

                    return InfoArtifact(f"saved successfully")
                except Exception as e:
                    return ErrorArtifact(f"error writing file to disk: {e}")
            else:
                try:
                    for a in list_artifact.value:
                        self._save_to_disk(os.path.join(self.workdir, dir_name, f"{a.name}-{file_name}"), a.to_text())

                    return InfoArtifact(f"saved successfully")
                except Exception as e:
                    return ErrorArtifact(f"error writing file to disk: {e}")
        else:
            return ErrorArtifact("memory not found")

    @activity(
        config={
            "description": "Can be used to save content to a file",
            "schema": Schema(
                {
                    Literal(
                        "path",
                        description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                    ): str,
                    "content": str,
                }
            ),
        }
    )
    def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
        content = params["values"]["content"]
        new_path = params["values"]["path"]
        full_path = os.path.join(self.workdir, new_path)

        try:
            self._save_to_disk(full_path, content)

            return InfoArtifact(f"saved successfully")
        except Exception as e:
            return ErrorArtifact(f"error writing file to disk: {e}")

    def _save_to_disk(self, path: str, value: any) -> None:
        os.makedirs(os.path.dirname(path), exist_ok=True)

        with open(path, "wb") as file:
            if isinstance(value, str):
                if self.save_file_encoding:
                    file.write(value.encode(self.save_file_encoding))
                else:
                    file.write(value.encode())
            else:
                file.write(value)

default_loader: BaseLoader = field(default=Factory(lambda : FileLoader())) class-attribute instance-attribute

loaders: dict[str, BaseLoader] = field(default=Factory(lambda : {'pdf': PdfLoader(), 'csv': CsvLoader(), 'txt': TextLoader(), 'html': TextLoader(), 'json': TextLoader(), 'yaml': TextLoader(), 'xml': TextLoader()}), kw_only=True) class-attribute instance-attribute

save_file_encoding: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

workdir: str = field(default=Factory(lambda : os.getcwd()), kw_only=True) class-attribute instance-attribute

load_files_from_disk(params)

Source code in griptape/griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to load files from disk",
        "schema": Schema(
            {
                Literal(
                    "paths",
                    description="Paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                ): []
            }
        ),
    }
)
def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
    list_artifact = ListArtifact()

    for path in params["values"]["paths"]:
        full_path = Path(os.path.join(self.workdir, path))
        extension = path.split(".")[-1]
        loader = self.loaders.get(extension) or self.default_loader
        result = loader.load(full_path)

        if isinstance(result, list):
            list_artifact.value.extend(result)
        elif isinstance(result, BaseArtifact):
            list_artifact.value.append(result)
        else:
            logging.warning(f"Unknown loader return type for file {path}")

    return list_artifact

save_content_to_file(params)

Source code in griptape/griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save content to a file",
        "schema": Schema(
            {
                Literal(
                    "path",
                    description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                ): str,
                "content": str,
            }
        ),
    }
)
def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
    content = params["values"]["content"]
    new_path = params["values"]["path"]
    full_path = os.path.join(self.workdir, new_path)

    try:
        self._save_to_disk(full_path, content)

        return InfoArtifact(f"saved successfully")
    except Exception as e:
        return ErrorArtifact(f"error writing file to disk: {e}")

save_memory_artifacts_to_disk(params)

Source code in griptape/griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save memory artifacts to disk",
        "schema": Schema(
            {
                Literal(
                    "dir_name",
                    description="Destination directory name on disk in the POSIX format. For example, 'foo/bar'",
                ): str,
                Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                "memory_name": str,
                "artifact_namespace": str,
            }
        ),
    }
)
def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    dir_name = params["values"]["dir_name"]
    file_name = params["values"]["file_name"]

    if memory:
        list_artifact = memory.load_artifacts(artifact_namespace)

        if len(list_artifact) == 0:
            return ErrorArtifact("no artifacts found")
        elif len(list_artifact) == 1:
            try:
                self._save_to_disk(os.path.join(self.workdir, dir_name, file_name), list_artifact.value[0].value)

                return InfoArtifact(f"saved successfully")
            except Exception as e:
                return ErrorArtifact(f"error writing file to disk: {e}")
        else:
            try:
                for a in list_artifact.value:
                    self._save_to_disk(os.path.join(self.workdir, dir_name, f"{a.name}-{file_name}"), a.to_text())

                return InfoArtifact(f"saved successfully")
            except Exception as e:
                return ErrorArtifact(f"error writing file to disk: {e}")
    else:
        return ErrorArtifact("memory not found")

validate_workdir(_, workdir)

Source code in griptape/griptape/tools/file_manager/tool.py
@workdir.validator  # pyright: ignore
def validate_workdir(self, _, workdir: str) -> None:
    if not Path(workdir).is_absolute():
        raise ValueError("workdir has to be absolute absolute")

GoogleCalendarClient

Bases: BaseGoogleClient

Source code in griptape/griptape/tools/google_cal/tool.py
@define
class GoogleCalendarClient(BaseGoogleClient):
    CREATE_EVENT_SCOPES = ["https://www.googleapis.com/auth/calendar"]

    GET_UPCOMING_EVENTS_SCOPES = ["https://www.googleapis.com/auth/calendar"]

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to get upcoming events from a google calendar",
            "schema": Schema(
                {
                    Literal("calendar_id", description="id of the google calendar such as 'primary'"): str,
                    Literal("max_events", description="maximum number of events to return"): int,
                }
            ),
        }
    )
    def get_upcoming_events(self, params: dict) -> ListArtifact | ErrorArtifact:
        values = params["values"]

        try:
            service = self._build_client(
                scopes=self.GET_UPCOMING_EVENTS_SCOPES,
                service_name="calendar",
                version="v3",
                owner_email=self.owner_email,
            )
            now = datetime.datetime.utcnow().isoformat() + "Z"

            events_result = (
                service.events()
                .list(
                    calendarId=values["calendar_id"],
                    timeMin=now,
                    maxResults=values["max_events"],
                    singleEvents=True,
                    orderBy="startTime",
                )
                .execute()
            )
            events = events_result.get("items", [])

            return ListArtifact([TextArtifact(str(e)) for e in events])
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error retrieving calendar events {e}")

    @activity(
        config={
            "description": "Can be used to create an event on a google calendar",
            "schema": Schema(
                {
                    Literal(
                        "start_datetime",
                        description="combined date-time value in string format according to RFC3399 "
                        "excluding the timezone for when the meeting starts",
                    ): str,
                    Literal(
                        "start_time_zone",
                        description="time zone in which the start time is specified in string format "
                        "according to IANA time zone data base name, such as 'Europe/Zurich'",
                    ): str,
                    Literal(
                        "end_datetime",
                        description="combined date-time value in string format according to RFC3399 "
                        "excluding the timezone for when the meeting ends",
                    ): str,
                    Literal(
                        "end_time_zone",
                        description="time zone in which the end time is specified in string format "
                        "according to IANA time zone data base name, such as 'Europe/Zurich'",
                    ): str,
                    Literal("title", description="title of the event"): str,
                    Literal("description", description="description of the event"): str,
                    Literal(
                        "attendees", description="list of the email addresses of attendees using 'email' as key"
                    ): list,
                    Optional(Literal("location", description="location of the event")): str,
                }
            ),
        }
    )
    def create_event(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]

        try:
            service = self._build_client(
                scopes=self.CREATE_EVENT_SCOPES, service_name="calendar", version="v3", owner_email=self.owner_email
            )

            event = {
                "summary": values["title"],
                "location": values.get("location"),
                "description": values["description"],
                "start": {"dateTime": values["start_datetime"], "timeZone": values["start_time_zone"]},
                "end": {"dateTime": values["end_datetime"], "timeZone": values["end_time_zone"]},
                "attendees": values["attendees"],
            }
            event = service.events().insert(calendarId="primary", body=event).execute()
            return InfoArtifact(f'A calendar event was successfully created. (Link:{event.get("htmlLink")})')
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error creating calendar event: {e}")

CREATE_EVENT_SCOPES = ['https://www.googleapis.com/auth/calendar'] class-attribute instance-attribute

GET_UPCOMING_EVENTS_SCOPES = ['https://www.googleapis.com/auth/calendar'] class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

create_event(params)

Source code in griptape/griptape/tools/google_cal/tool.py
@activity(
    config={
        "description": "Can be used to create an event on a google calendar",
        "schema": Schema(
            {
                Literal(
                    "start_datetime",
                    description="combined date-time value in string format according to RFC3399 "
                    "excluding the timezone for when the meeting starts",
                ): str,
                Literal(
                    "start_time_zone",
                    description="time zone in which the start time is specified in string format "
                    "according to IANA time zone data base name, such as 'Europe/Zurich'",
                ): str,
                Literal(
                    "end_datetime",
                    description="combined date-time value in string format according to RFC3399 "
                    "excluding the timezone for when the meeting ends",
                ): str,
                Literal(
                    "end_time_zone",
                    description="time zone in which the end time is specified in string format "
                    "according to IANA time zone data base name, such as 'Europe/Zurich'",
                ): str,
                Literal("title", description="title of the event"): str,
                Literal("description", description="description of the event"): str,
                Literal(
                    "attendees", description="list of the email addresses of attendees using 'email' as key"
                ): list,
                Optional(Literal("location", description="location of the event")): str,
            }
        ),
    }
)
def create_event(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]

    try:
        service = self._build_client(
            scopes=self.CREATE_EVENT_SCOPES, service_name="calendar", version="v3", owner_email=self.owner_email
        )

        event = {
            "summary": values["title"],
            "location": values.get("location"),
            "description": values["description"],
            "start": {"dateTime": values["start_datetime"], "timeZone": values["start_time_zone"]},
            "end": {"dateTime": values["end_datetime"], "timeZone": values["end_time_zone"]},
            "attendees": values["attendees"],
        }
        event = service.events().insert(calendarId="primary", body=event).execute()
        return InfoArtifact(f'A calendar event was successfully created. (Link:{event.get("htmlLink")})')
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error creating calendar event: {e}")

get_upcoming_events(params)

Source code in griptape/griptape/tools/google_cal/tool.py
@activity(
    config={
        "description": "Can be used to get upcoming events from a google calendar",
        "schema": Schema(
            {
                Literal("calendar_id", description="id of the google calendar such as 'primary'"): str,
                Literal("max_events", description="maximum number of events to return"): int,
            }
        ),
    }
)
def get_upcoming_events(self, params: dict) -> ListArtifact | ErrorArtifact:
    values = params["values"]

    try:
        service = self._build_client(
            scopes=self.GET_UPCOMING_EVENTS_SCOPES,
            service_name="calendar",
            version="v3",
            owner_email=self.owner_email,
        )
        now = datetime.datetime.utcnow().isoformat() + "Z"

        events_result = (
            service.events()
            .list(
                calendarId=values["calendar_id"],
                timeMin=now,
                maxResults=values["max_events"],
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )
        events = events_result.get("items", [])

        return ListArtifact([TextArtifact(str(e)) for e in events])
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error retrieving calendar events {e}")

GoogleDocsClient

Bases: BaseGoogleClient

Source code in griptape/griptape/tools/google_docs/tool.py
@define
class GoogleDocsClient(BaseGoogleClient):
    DOCS_SCOPES = ["https://www.googleapis.com/auth/documents"]

    DEFAULT_FOLDER_PATH = "root"

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to append text to a Google Doc.",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Destination file path of Google Doc in the POSIX format. "
                        "For example, 'foo/bar/baz.txt'",
                    ): str,
                    Literal("text", description="Text to be appended to the Google Doc."): str,
                }
            ),
        }
    )
    def append_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        text = values.get("text")

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            document_id = self._convert_path_to_file_id(drive_service, file_path)
            if document_id:
                doc = docs_service.documents().get(documentId=document_id).execute()
                content = doc["body"]["content"]
                last_text = content[-1]["paragraph"]["elements"][-1]["textRun"]["content"]
                append_index = content[-1]["endIndex"]
                if last_text.endswith("\n"):
                    append_index -= 1

                requests = [{"insertText": {"location": {"index": append_index}, "text": text}}]

                docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
                return InfoArtifact("text appended successfully")
            else:
                return ErrorArtifact(f"error appending to Google Doc, file not found for path {file_path}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error appending text to Google Doc with path {file_path}: {e}")

    @activity(
        config={
            "description": "Can be used to prepend text to a Google Doc",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Destination file path of Google Doc in the POSIX format. "
                        "For example, 'foo/bar/baz.txt'",
                    ): str,
                    Literal("text", description="Text to be prepended to the Google Doc."): str,
                }
            ),
        }
    )
    def prepend_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        text = values.get("text")

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            document_id = self._convert_path_to_file_id(drive_service, file_path)
            if document_id:
                doc = docs_service.documents().get(documentId=document_id).execute()

                if len(doc["body"]["content"]) == 1:
                    requests = [{"insertText": {"location": {"index": 1}, "text": text}}]
                else:
                    start_index = doc["body"]["content"][1]["startIndex"]
                    requests = [{"insertText": {"location": {"index": start_index}, "text": text}}]

                docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
                return InfoArtifact("text prepended successfully")
            else:
                return ErrorArtifact(f"error prepending to google doc, file not found for path {file_path}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error prepending text to Google Doc with path {file_path}: {e}")

    @activity(
        config={
            "description": "Can be used to create a new Google Doc and optionally save content to it.",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Name of the file to be created, which will be used to save content in.",
                    ): str,
                    Optional("content", default=None, description="Optional content to be saved in Google Doc."): str,
                    Optional(
                        "folder_path",
                        default=DEFAULT_FOLDER_PATH,
                        description="Path of the folder where the Google doc will be created.",
                    ): str,
                }
            ),
        }
    )
    def save_content_to_google_doc(self, params: dict) -> ErrorArtifact | InfoArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        content = values.get("content")
        folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            body = {"title": file_path}

            doc = docs_service.documents().create(body=body).execute()
            doc_id = doc["documentId"]

            if folder_path.lower() != self.DEFAULT_FOLDER_PATH:
                folder_id = self._convert_path_to_file_id(drive_service, folder_path)
                if folder_id:
                    drive_service.files().update(fileId=doc_id, addParents=folder_id, fields="id, parents").execute()
                else:
                    return ErrorArtifact(f"Error: Folder not found for path {folder_path}")

            if content:
                save_content_params = {"document_id": doc_id, "content": content}
                saved_document_id = self._save_to_doc(save_content_params)
                return InfoArtifact(f"Content has been successfully saved to Google Doc with ID: {saved_document_id}.")
            else:
                return InfoArtifact(f"Google Doc '{file_path}' created with ID: {doc_id}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"Error creating/saving Google Doc: {e}")

    @activity(
        config={
            "description": "Can be used to load content from memory and save it to a new Google Doc "
            "in the specified folder.",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    "file_name": str,
                    Optional(
                        "folder_path",
                        description="Path of the folder where the Google Doc should be saved.",
                        default=DEFAULT_FOLDER_PATH,
                    ): str,
                }
            ),
        }
    )
    def save_memory_artifacts_to_google_docs(self, params: dict) -> ErrorArtifact | InfoArtifact:
        values = params["values"]
        memory = self.find_input_memory(values["memory_name"])

        if memory:
            artifacts = memory.load_artifacts(values["artifact_namespace"])

            if artifacts:
                try:
                    file_path = values["file_name"]
                    content = "\n".join([a.value for a in artifacts])

                    save_params = {
                        "file_path": file_path,
                        "content": content,
                        "folder_path": values.get("folder_path", self.DEFAULT_FOLDER_PATH),
                    }

                    return self.save_content_to_google_doc(save_params)

                except Exception as e:
                    return ErrorArtifact(f"Error: {e}")

            else:
                return ErrorArtifact("no artifacts found")
        else:
            return ErrorArtifact("memory not found")

    def _save_to_doc(self, params: dict) -> str:
        service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )

        requests = [{"insertText": {"location": {"index": 1}, "text": params["content"]}}]
        service.documents().batchUpdate(documentId=params["document_id"], body={"requests": requests}).execute()
        return params["document_id"]

DEFAULT_FOLDER_PATH = 'root' class-attribute instance-attribute

DOCS_SCOPES = ['https://www.googleapis.com/auth/documents'] class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

append_text_to_google_doc(params)

Source code in griptape/griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to append text to a Google Doc.",
        "schema": Schema(
            {
                Literal(
                    "file_path",
                    description="Destination file path of Google Doc in the POSIX format. "
                    "For example, 'foo/bar/baz.txt'",
                ): str,
                Literal("text", description="Text to be appended to the Google Doc."): str,
            }
        ),
    }
)
def append_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    file_path = values.get("file_path")
    text = values.get("text")

    try:
        docs_service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )
        drive_service = self._build_client(
            scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
        )

        document_id = self._convert_path_to_file_id(drive_service, file_path)
        if document_id:
            doc = docs_service.documents().get(documentId=document_id).execute()
            content = doc["body"]["content"]
            last_text = content[-1]["paragraph"]["elements"][-1]["textRun"]["content"]
            append_index = content[-1]["endIndex"]
            if last_text.endswith("\n"):
                append_index -= 1

            requests = [{"insertText": {"location": {"index": append_index}, "text": text}}]

            docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
            return InfoArtifact("text appended successfully")
        else:
            return ErrorArtifact(f"error appending to Google Doc, file not found for path {file_path}")

    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error appending text to Google Doc with path {file_path}: {e}")

prepend_text_to_google_doc(params)

Source code in griptape/griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to prepend text to a Google Doc",
        "schema": Schema(
            {
                Literal(
                    "file_path",
                    description="Destination file path of Google Doc in the POSIX format. "
                    "For example, 'foo/bar/baz.txt'",
                ): str,
                Literal("text", description="Text to be prepended to the Google Doc."): str,
            }
        ),
    }
)
def prepend_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    file_path = values.get("file_path")
    text = values.get("text")

    try:
        docs_service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )
        drive_service = self._build_client(
            scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
        )

        document_id = self._convert_path_to_file_id(drive_service, file_path)
        if document_id:
            doc = docs_service.documents().get(documentId=document_id).execute()

            if len(doc["body"]["content"]) == 1:
                requests = [{"insertText": {"location": {"index": 1}, "text": text}}]
            else:
                start_index = doc["body"]["content"][1]["startIndex"]
                requests = [{"insertText": {"location": {"index": start_index}, "text": text}}]

            docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
            return InfoArtifact("text prepended successfully")
        else:
            return ErrorArtifact(f"error prepending to google doc, file not found for path {file_path}")

    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error prepending text to Google Doc with path {file_path}: {e}")

save_content_to_google_doc(params)

Source code in griptape/griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to create a new Google Doc and optionally save content to it.",
        "schema": Schema(
            {
                Literal(
                    "file_path",
                    description="Name of the file to be created, which will be used to save content in.",
                ): str,
                Optional("content", default=None, description="Optional content to be saved in Google Doc."): str,
                Optional(
                    "folder_path",
                    default=DEFAULT_FOLDER_PATH,
                    description="Path of the folder where the Google doc will be created.",