Skip to content

Tools

__all__ = ['BaseTool', 'BaseAwsClient', 'AwsIamClient', 'AwsS3Client', 'BaseGoogleClient', 'GoogleGmailClient', 'GoogleDocsClient', 'GoogleCalendarClient', 'GoogleDriveClient', 'Calculator', 'WebSearch', 'WebScraper', 'SqlClient', 'EmailClient', 'RestApiClient', 'FileManager', 'VectorStoreClient', 'DateTime', 'TaskMemoryClient', 'Computer', 'OpenWeatherClient', 'PromptImageGenerationClient', 'VariationImageGenerationClient', 'InpaintingImageGenerationClient', 'OutpaintingImageGenerationClient', 'GriptapeCloudKnowledgeBaseClient', 'StructureRunClient', 'ImageQueryClient'] module-attribute

AwsIamClient

Bases: BaseAwsClient

Source code in griptape/tools/aws_iam_client/tool.py
@define
class AwsIamClient(BaseAwsClient):
    iam_client: Client = field(default=Factory(lambda self: self.session.client("iam"), takes_self=True), kw_only=True)

    @activity(
        config={
            "description": "Can be use to get a policy for an AWS IAM user.",
            "schema": Schema(
                {
                    Literal("user_name", description="Username of the AWS IAM user."): str,
                    Literal(
                        "policy_name",
                        description="PolicyName of the AWS IAM Policy embedded in the specified IAM user.",
                    ): str,
                }
            ),
        }
    )
    def get_user_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            policy = self.iam_client.get_user_policy(
                UserName=params["values"]["user_name"], PolicyName=params["values"]["policy_name"]
            )
            return TextArtifact(policy["PolicyDocument"])
        except Exception as e:
            return ErrorArtifact(f"error returning policy document: {e}")

    @activity(config={"description": "Can be used to list AWS MFA Devices"})
    def list_mfa_devices(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            devices = self.iam_client.list_mfa_devices()
            return ListArtifact([TextArtifact(str(d)) for d in devices["MFADevices"]])
        except Exception as e:
            return ErrorArtifact(f"error listing mfa devices: {e}")

    @activity(
        config={
            "description": "Can be used to list policies for a given IAM user.",
            "schema": Schema(
                {Literal("user_name", description="Username of the AWS IAM user for which to list policies."): str}
            ),
        }
    )
    def list_user_policies(self, params: dict) -> ListArtifact | ErrorArtifact:
        try:
            policies = self.iam_client.list_user_policies(UserName=params["values"]["user_name"])
            policy_names = policies["PolicyNames"]

            attached_policies = self.iam_client.list_attached_user_policies(UserName=params["values"]["user_name"])
            attached_policy_names = [
                p["PolicyName"] for p in attached_policies["AttachedPolicies"] if "PolicyName" in p
            ]

            return ListArtifact([TextArtifact(str(p)) for p in policy_names + attached_policy_names])
        except Exception as e:
            return ErrorArtifact(f"error listing iam user policies: {e}")

    @activity(config={"description": "Can be used to list AWS IAM users."})
    def list_users(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            users = self.iam_client.list_users()
            return ListArtifact([TextArtifact(str(u)) for u in users["Users"]])
        except Exception as e:
            return ErrorArtifact(f"error listing s3 users: {e}")

iam_client: Client = field(default=Factory(lambda self: self.session.client('iam'), takes_self=True), kw_only=True) class-attribute instance-attribute

get_user_policy(params)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(
    config={
        "description": "Can be use to get a policy for an AWS IAM user.",
        "schema": Schema(
            {
                Literal("user_name", description="Username of the AWS IAM user."): str,
                Literal(
                    "policy_name",
                    description="PolicyName of the AWS IAM Policy embedded in the specified IAM user.",
                ): str,
            }
        ),
    }
)
def get_user_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        policy = self.iam_client.get_user_policy(
            UserName=params["values"]["user_name"], PolicyName=params["values"]["policy_name"]
        )
        return TextArtifact(policy["PolicyDocument"])
    except Exception as e:
        return ErrorArtifact(f"error returning policy document: {e}")

list_mfa_devices(_)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(config={"description": "Can be used to list AWS MFA Devices"})
def list_mfa_devices(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        devices = self.iam_client.list_mfa_devices()
        return ListArtifact([TextArtifact(str(d)) for d in devices["MFADevices"]])
    except Exception as e:
        return ErrorArtifact(f"error listing mfa devices: {e}")

list_user_policies(params)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(
    config={
        "description": "Can be used to list policies for a given IAM user.",
        "schema": Schema(
            {Literal("user_name", description="Username of the AWS IAM user for which to list policies."): str}
        ),
    }
)
def list_user_policies(self, params: dict) -> ListArtifact | ErrorArtifact:
    try:
        policies = self.iam_client.list_user_policies(UserName=params["values"]["user_name"])
        policy_names = policies["PolicyNames"]

        attached_policies = self.iam_client.list_attached_user_policies(UserName=params["values"]["user_name"])
        attached_policy_names = [
            p["PolicyName"] for p in attached_policies["AttachedPolicies"] if "PolicyName" in p
        ]

        return ListArtifact([TextArtifact(str(p)) for p in policy_names + attached_policy_names])
    except Exception as e:
        return ErrorArtifact(f"error listing iam user policies: {e}")

list_users(_)

Source code in griptape/tools/aws_iam_client/tool.py
@activity(config={"description": "Can be used to list AWS IAM users."})
def list_users(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        users = self.iam_client.list_users()
        return ListArtifact([TextArtifact(str(u)) for u in users["Users"]])
    except Exception as e:
        return ErrorArtifact(f"error listing s3 users: {e}")

AwsS3Client

Bases: BaseAwsClient

Source code in griptape/tools/aws_s3_client/tool.py
@define
class AwsS3Client(BaseAwsClient):
    s3_client: Client = field(default=Factory(lambda self: self.session.client("s3"), takes_self=True), kw_only=True)

    @activity(
        config={
            "description": "Can be used to get an access control list (ACL) of an AWS S3 bucket.",
            "schema": Schema(
                {
                    Literal(
                        "bucket_name",
                        description="The bucket name that contains the object for which to get the ACL information.",
                    ): str
                }
            ),
        }
    )
    def get_bucket_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            acl = self.s3_client.get_bucket_acl(Bucket=params["values"]["bucket_name"])
            return TextArtifact(acl)
        except Exception as e:
            return ErrorArtifact(f"error getting bucket acl: {e}")

    @activity(
        config={
            "description": "Can be used to get an AWS S3 bucket policy.",
            "schema": Schema(
                {Literal("bucket_name", description="The bucket name for which to get the bucket policy."): str}
            ),
        }
    )
    def get_bucket_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            policy = self.s3_client.get_bucket_policy(Bucket=params["values"]["bucket_name"])
            return TextArtifact(policy)
        except Exception as e:
            return ErrorArtifact(f"error getting bucket policy: {e}")

    @activity(
        config={
            "description": "Can be used to get an access control list (ACL) of an object in the AWS S3 bucket.",
            "schema": Schema(
                {
                    Literal("bucket_name", description="Name of the AWS S3 bucket for which to get an ACL."): str,
                    Literal("object_key", description="Key of the object for which to get the ACL information."): str,
                }
            ),
        }
    )
    def get_object_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
        try:
            acl = self.s3_client.get_object_acl(
                Bucket=params["values"]["bucket_name"], Key=params["values"]["object_key"]
            )
            return TextArtifact(acl)
        except Exception as e:
            return ErrorArtifact(f"error getting object acl: {e}")

    @activity(config={"description": "Can be used to list all AWS S3 buckets."})
    def list_s3_buckets(self, _: dict) -> ListArtifact | ErrorArtifact:
        try:
            buckets = self.s3_client.list_buckets()

            return ListArtifact([TextArtifact(str(b)) for b in buckets["Buckets"]])
        except Exception as e:
            return ErrorArtifact(f"error listing s3 buckets: {e}")

    @activity(
        config={
            "description": "Can be used to list all objects in an AWS S3 bucket.",
            "schema": Schema({Literal("bucket_name", description="The name of the S3 bucket to list."): str}),
        }
    )
    def list_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
        try:
            objects = self.s3_client.list_objects_v2(Bucket=params["values"]["bucket_name"])

            if "Contents" not in objects:
                return ErrorArtifact("no objects found in the bucket")

            return ListArtifact([TextArtifact(str(o)) for o in objects["Contents"]])
        except Exception as e:
            return ErrorArtifact(f"error listing objects in bucket: {e}")

    @activity(
        config={
            "description": "Can be used to upload memory artifacts to an AWS S3 bucket",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    "bucket_name": str,
                    Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                }
            ),
        }
    )
    def upload_memory_artifacts_to_s3(self, params: dict) -> InfoArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        bucket_name = params["values"]["bucket_name"]
        object_key = params["values"]["object_key"]

        if memory:
            artifacts = memory.load_artifacts(artifact_namespace)

            if len(artifacts) == 0:
                return ErrorArtifact("no artifacts found")
            elif len(artifacts) == 1:
                try:
                    self._upload_object(bucket_name, object_key, artifacts.value[0].value)

                    return InfoArtifact("uploaded successfully")
                except Exception as e:
                    return ErrorArtifact(f"error uploading objects to the bucket: {e}")
            else:
                try:
                    for a in artifacts.value:
                        self._upload_object(bucket_name, object_key, a.value)

                    return InfoArtifact("uploaded successfully")
                except Exception as e:
                    return ErrorArtifact(f"error uploading objects to the bucket: {e}")
        else:
            return ErrorArtifact("memory not found")

    @activity(
        config={
            "description": "Can be used to upload content to an AWS S3 bucket",
            "schema": Schema(
                {
                    "bucket_name": str,
                    Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                    "content": str,
                }
            ),
        }
    )
    def upload_content_to_s3(self, params: dict) -> ErrorArtifact | InfoArtifact:
        content = params["values"]["content"]
        bucket_name = params["values"]["bucket_name"]
        object_key = params["values"]["object_key"]

        try:
            self._upload_object(bucket_name, object_key, content)

            return InfoArtifact("uploaded successfully")
        except Exception as e:
            return ErrorArtifact(f"error uploading objects to the bucket: {e}")

    @activity(
        config={
            "description": "Can be used to download objects from AWS S3",
            "schema": Schema(
                {
                    Literal("objects", description="A list of bucket name and object key pairs to download"): [
                        {
                            Literal(
                                "bucket_name", description="The name of the bucket to download the object from"
                            ): str,
                            Literal(
                                "object_key", description="The name of the object key to download from the bucket"
                            ): str,
                        }
                    ]
                }
            ),
        }
    )
    def download_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
        objects = params["values"]["objects"]
        artifacts = []
        for object_info in objects:
            try:
                obj = self.s3_client.get_object(Bucket=object_info["bucket_name"], Key=object_info["object_key"])

                content = obj["Body"].read()
                artifacts.append(BlobArtifact(content, name=object_info["object_key"]))

            except Exception as e:
                return ErrorArtifact(f"error downloading objects from bucket: {e}")

        return ListArtifact(artifacts)

    def _upload_object(self, bucket_name: str, object_name: str, value: Any) -> None:
        self.s3_client.create_bucket(Bucket=bucket_name)

        self.s3_client.upload_fileobj(
            Fileobj=io.BytesIO(value.encode() if isinstance(value, str) else value), Bucket=bucket_name, Key=object_name
        )

s3_client: Client = field(default=Factory(lambda self: self.session.client('s3'), takes_self=True), kw_only=True) class-attribute instance-attribute

download_objects(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to download objects from AWS S3",
        "schema": Schema(
            {
                Literal("objects", description="A list of bucket name and object key pairs to download"): [
                    {
                        Literal(
                            "bucket_name", description="The name of the bucket to download the object from"
                        ): str,
                        Literal(
                            "object_key", description="The name of the object key to download from the bucket"
                        ): str,
                    }
                ]
            }
        ),
    }
)
def download_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
    objects = params["values"]["objects"]
    artifacts = []
    for object_info in objects:
        try:
            obj = self.s3_client.get_object(Bucket=object_info["bucket_name"], Key=object_info["object_key"])

            content = obj["Body"].read()
            artifacts.append(BlobArtifact(content, name=object_info["object_key"]))

        except Exception as e:
            return ErrorArtifact(f"error downloading objects from bucket: {e}")

    return ListArtifact(artifacts)

get_bucket_acl(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an access control list (ACL) of an AWS S3 bucket.",
        "schema": Schema(
            {
                Literal(
                    "bucket_name",
                    description="The bucket name that contains the object for which to get the ACL information.",
                ): str
            }
        ),
    }
)
def get_bucket_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        acl = self.s3_client.get_bucket_acl(Bucket=params["values"]["bucket_name"])
        return TextArtifact(acl)
    except Exception as e:
        return ErrorArtifact(f"error getting bucket acl: {e}")

get_bucket_policy(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an AWS S3 bucket policy.",
        "schema": Schema(
            {Literal("bucket_name", description="The bucket name for which to get the bucket policy."): str}
        ),
    }
)
def get_bucket_policy(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        policy = self.s3_client.get_bucket_policy(Bucket=params["values"]["bucket_name"])
        return TextArtifact(policy)
    except Exception as e:
        return ErrorArtifact(f"error getting bucket policy: {e}")

get_object_acl(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to get an access control list (ACL) of an object in the AWS S3 bucket.",
        "schema": Schema(
            {
                Literal("bucket_name", description="Name of the AWS S3 bucket for which to get an ACL."): str,
                Literal("object_key", description="Key of the object for which to get the ACL information."): str,
            }
        ),
    }
)
def get_object_acl(self, params: dict) -> TextArtifact | ErrorArtifact:
    try:
        acl = self.s3_client.get_object_acl(
            Bucket=params["values"]["bucket_name"], Key=params["values"]["object_key"]
        )
        return TextArtifact(acl)
    except Exception as e:
        return ErrorArtifact(f"error getting object acl: {e}")

list_objects(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to list all objects in an AWS S3 bucket.",
        "schema": Schema({Literal("bucket_name", description="The name of the S3 bucket to list."): str}),
    }
)
def list_objects(self, params: dict) -> ListArtifact | ErrorArtifact:
    try:
        objects = self.s3_client.list_objects_v2(Bucket=params["values"]["bucket_name"])

        if "Contents" not in objects:
            return ErrorArtifact("no objects found in the bucket")

        return ListArtifact([TextArtifact(str(o)) for o in objects["Contents"]])
    except Exception as e:
        return ErrorArtifact(f"error listing objects in bucket: {e}")

list_s3_buckets(_)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(config={"description": "Can be used to list all AWS S3 buckets."})
def list_s3_buckets(self, _: dict) -> ListArtifact | ErrorArtifact:
    try:
        buckets = self.s3_client.list_buckets()

        return ListArtifact([TextArtifact(str(b)) for b in buckets["Buckets"]])
    except Exception as e:
        return ErrorArtifact(f"error listing s3 buckets: {e}")

upload_content_to_s3(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to upload content to an AWS S3 bucket",
        "schema": Schema(
            {
                "bucket_name": str,
                Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
                "content": str,
            }
        ),
    }
)
def upload_content_to_s3(self, params: dict) -> ErrorArtifact | InfoArtifact:
    content = params["values"]["content"]
    bucket_name = params["values"]["bucket_name"]
    object_key = params["values"]["object_key"]

    try:
        self._upload_object(bucket_name, object_key, content)

        return InfoArtifact("uploaded successfully")
    except Exception as e:
        return ErrorArtifact(f"error uploading objects to the bucket: {e}")

upload_memory_artifacts_to_s3(params)

Source code in griptape/tools/aws_s3_client/tool.py
@activity(
    config={
        "description": "Can be used to upload memory artifacts to an AWS S3 bucket",
        "schema": Schema(
            {
                "memory_name": str,
                "artifact_namespace": str,
                "bucket_name": str,
                Literal("object_key", description="Destination object key name. For example, 'baz.txt'"): str,
            }
        ),
    }
)
def upload_memory_artifacts_to_s3(self, params: dict) -> InfoArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    bucket_name = params["values"]["bucket_name"]
    object_key = params["values"]["object_key"]

    if memory:
        artifacts = memory.load_artifacts(artifact_namespace)

        if len(artifacts) == 0:
            return ErrorArtifact("no artifacts found")
        elif len(artifacts) == 1:
            try:
                self._upload_object(bucket_name, object_key, artifacts.value[0].value)

                return InfoArtifact("uploaded successfully")
            except Exception as e:
                return ErrorArtifact(f"error uploading objects to the bucket: {e}")
        else:
            try:
                for a in artifacts.value:
                    self._upload_object(bucket_name, object_key, a.value)

                return InfoArtifact("uploaded successfully")
            except Exception as e:
                return ErrorArtifact(f"error uploading objects to the bucket: {e}")
    else:
        return ErrorArtifact("memory not found")

BaseAwsClient

Bases: BaseTool, ABC

Source code in griptape/tools/base_aws_client.py
@define
class BaseAwsClient(BaseTool, ABC):
    session: boto3.Session = field(kw_only=True)

    @activity(config={"description": "Can be used to get current AWS account and IAM principal."})
    def get_current_aws_identity(self, params: dict) -> BaseArtifact:
        try:
            session = self.session
            sts = session.client("sts")
            return TextArtifact(str(sts.get_caller_identity()))
        except Exception as e:
            return ErrorArtifact(f"error getting current aws caller identity: {e}")

session: boto3.Session = field(kw_only=True) class-attribute instance-attribute

get_current_aws_identity(params)

Source code in griptape/tools/base_aws_client.py
@activity(config={"description": "Can be used to get current AWS account and IAM principal."})
def get_current_aws_identity(self, params: dict) -> BaseArtifact:
    try:
        session = self.session
        sts = session.client("sts")
        return TextArtifact(str(sts.get_caller_identity()))
    except Exception as e:
        return ErrorArtifact(f"error getting current aws caller identity: {e}")

BaseGoogleClient

Bases: BaseTool, ABC

Source code in griptape/tools/base_google_client.py
@define
class BaseGoogleClient(BaseTool, ABC):
    DRIVE_FILE_SCOPES = ["https://www.googleapis.com/auth/drive.file"]

    DRIVE_AUTH_SCOPES = ["https://www.googleapis.com/auth/drive"]

    service_account_credentials: dict = field(kw_only=True)

    def _build_client(self, scopes: list[str], service_name: str, version: str, owner_email: str) -> Any:
        from google.oauth2 import service_account  # pyright: ignore
        from googleapiclient.discovery import build  # pyright: ignore

        credentials = service_account.Credentials.from_service_account_info(
            self.service_account_credentials, scopes=scopes
        )

        return build(serviceName=service_name, version=version, credentials=credentials.with_subject(owner_email))

    def _convert_path_to_file_id(self, service: Any, path: str) -> Optional[str]:
        parts = path.split("/")
        current_id = "root"

        for idx, part in enumerate(parts):
            if idx == len(parts) - 1:
                query = f"name='{part}' and '{current_id}' in parents"
            else:
                query = f"name='{part}' and '{current_id}' in parents and mimeType='application/vnd.google-apps.folder'"

            response = service.files().list(q=query).execute()
            files = response.get("files", [])

            if not files:
                if idx != len(parts) - 1:
                    folder_metadata = {
                        "name": part,
                        "mimeType": "application/vnd.google-apps.folder",
                        "parents": [current_id],
                    }
                    folder = service.files().create(body=folder_metadata, fields="id").execute()
                    current_id = folder.get("id")
                else:
                    current_id = None
            else:
                current_id = files[0]["id"]

        return current_id

DRIVE_AUTH_SCOPES = ['https://www.googleapis.com/auth/drive'] class-attribute instance-attribute

DRIVE_FILE_SCOPES = ['https://www.googleapis.com/auth/drive.file'] class-attribute instance-attribute

service_account_credentials: dict = field(kw_only=True) class-attribute instance-attribute

BaseTool

Bases: ActivityMixin, ABC

Abstract class for all tools to inherit from for.

Attributes:

Name Type Description
name str

Tool name.

input_memory Optional[list[TaskMemory]]

TaskMemory available in tool activities. Gets automatically set if None.

output_memory Optional[dict[str, list[TaskMemory]]]

TaskMemory that activities write to be default. Gets automatically set if None.

install_dependencies_on_init bool

Determines whether dependencies from the tool requirements.txt file are installed in init.

dependencies_install_directory Optional[str]

Custom dependency install directory.

verbose bool

Determines whether tool operations (such as dependency installation) should be verbose.

off_prompt bool

Determines whether tool activity output goes to the output memory.

Source code in griptape/tools/base_tool.py
@define
class BaseTool(ActivityMixin, ABC):
    """Abstract class for all tools to inherit from for.

    Attributes:
        name: Tool name.
        input_memory: TaskMemory available in tool activities. Gets automatically set if None.
        output_memory: TaskMemory that activities write to be default. Gets automatically set if None.
        install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init.
        dependencies_install_directory: Custom dependency install directory.
        verbose: Determines whether tool operations (such as dependency installation) should be verbose.
        off_prompt: Determines whether tool activity output goes to the output memory.
    """

    MANIFEST_FILE = "manifest.yml"
    REQUIREMENTS_FILE = "requirements.txt"

    name: str = field(default=Factory(lambda self: self.class_name, takes_self=True), kw_only=True)
    input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True)
    output_memory: Optional[dict[str, list[TaskMemory]]] = field(default=None, kw_only=True)
    install_dependencies_on_init: bool = field(default=True, kw_only=True)
    dependencies_install_directory: Optional[str] = field(default=None, kw_only=True)
    verbose: bool = field(default=False, kw_only=True)
    off_prompt: bool = field(default=True, kw_only=True)

    def __attrs_post_init__(self) -> None:
        if self.install_dependencies_on_init:
            self.install_dependencies(os.environ.copy())

    @output_memory.validator  # pyright: ignore
    def validate_output_memory(self, _, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
        if output_memory:
            for activity_name, memory_list in output_memory.items():
                if not self.find_activity(activity_name):
                    raise ValueError(f"activity {activity_name} doesn't exist")
                if memory_list is None:
                    raise ValueError(f"memory list for activity '{activity_name}' can't be None")

                output_memory_names = [memory.name for memory in memory_list]

                if len(output_memory_names) > len(set(output_memory_names)):
                    raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

    @property
    def class_name(self):
        return self.__class__.__name__

    @property
    def manifest_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.MANIFEST_FILE)

    @property
    def requirements_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE)

    @property
    def manifest(self) -> dict:
        with open(self.manifest_path) as yaml_file:
            return yaml.safe_load(yaml_file)

    @property
    def abs_file_path(self):
        return os.path.abspath(inspect.getfile(self.__class__))

    @property
    def abs_dir_path(self):
        return os.path.dirname(self.abs_file_path)

    # This method has to remain a method and can't be decorated with @property because
    # of the max depth recursion issue in `self.activities`.
    def schema(self) -> dict:
        full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

        return full_schema.json_schema(f"{self.name} Action Schema")

    def activity_schemas(self) -> list[Schema]:
        return [
            Schema(
                {
                    Literal("name"): self.name,
                    Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
                    **self.activity_to_input(
                        activity
                    ),  # Unpack the dictionary in order to only add the key-values if there are any
                }
            )
            for activity in self.activities()
        ]

    def execute(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> BaseArtifact:
        preprocessed_input = self.before_run(activity, subtask, action)
        output = self.run(activity, subtask, action, preprocessed_input)
        postprocessed_output = self.after_run(activity, subtask, action, output)

        return postprocessed_output

    def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> Optional[dict]:
        return action.input

    def run(
        self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: Optional[dict]
    ) -> BaseArtifact:
        activity_result = activity(value)

        if isinstance(activity_result, BaseArtifact):
            result = activity_result
        else:
            logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

            result = InfoArtifact(activity_result)

        return result

    def after_run(
        self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: BaseArtifact
    ) -> BaseArtifact:
        if value:
            if self.output_memory:
                output_memories = self.output_memory[getattr(activity, "name")] or []
                for memory in output_memories:
                    value = memory.process_output(activity, subtask, value)

                if isinstance(value, BaseArtifact):
                    return value
                else:
                    return TextArtifact(str(value))
            else:
                return value
        else:
            return InfoArtifact("Tool returned an empty value")

    def validate(self) -> bool:
        from griptape.utils import ManifestValidator

        if not os.path.exists(self.manifest_path):
            raise Exception(f"{self.MANIFEST_FILE} not found")

        if not os.path.exists(self.requirements_path):
            raise Exception(f"{self.REQUIREMENTS_FILE} not found")

        ManifestValidator().validate(self.manifest)

        return True

    def tool_dir(self):
        class_file = inspect.getfile(self.__class__)

        return os.path.dirname(os.path.abspath(class_file))

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        env = env if env else {}

        command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

        if self.dependencies_install_directory is None:
            command.extend(["-U"])
        else:
            command.extend(["-t", self.dependencies_install_directory])

        subprocess.run(
            command,
            env=env,
            cwd=self.tool_dir(),
            stdout=None if self.verbose else subprocess.DEVNULL,
            stderr=None if self.verbose else subprocess.DEVNULL,
        )

    def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
        if self.input_memory:
            return next((m for m in self.input_memory if m.name == memory_name), None)
        else:
            return None

MANIFEST_FILE = 'manifest.yml' class-attribute instance-attribute

REQUIREMENTS_FILE = 'requirements.txt' class-attribute instance-attribute

abs_dir_path property

abs_file_path property

class_name property

dependencies_install_directory: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True) class-attribute instance-attribute

install_dependencies_on_init: bool = field(default=True, kw_only=True) class-attribute instance-attribute

manifest: dict property

manifest_path: str property

name: str = field(default=Factory(lambda self: self.class_name, takes_self=True), kw_only=True) class-attribute instance-attribute

off_prompt: bool = field(default=True, kw_only=True) class-attribute instance-attribute

output_memory: Optional[dict[str, list[TaskMemory]]] = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_path: str property

verbose: bool = field(default=False, kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None:
    if self.install_dependencies_on_init:
        self.install_dependencies(os.environ.copy())

activity_schemas()

Source code in griptape/tools/base_tool.py
def activity_schemas(self) -> list[Schema]:
    return [
        Schema(
            {
                Literal("name"): self.name,
                Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
                **self.activity_to_input(
                    activity
                ),  # Unpack the dictionary in order to only add the key-values if there are any
            }
        )
        for activity in self.activities()
    ]

after_run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
def after_run(
    self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: BaseArtifact
) -> BaseArtifact:
    if value:
        if self.output_memory:
            output_memories = self.output_memory[getattr(activity, "name")] or []
            for memory in output_memories:
                value = memory.process_output(activity, subtask, value)

            if isinstance(value, BaseArtifact):
                return value
            else:
                return TextArtifact(str(value))
        else:
            return value
    else:
        return InfoArtifact("Tool returned an empty value")

before_run(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> Optional[dict]:
    return action.input

execute(activity, subtask, action)

Source code in griptape/tools/base_tool.py
def execute(self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action) -> BaseArtifact:
    preprocessed_input = self.before_run(activity, subtask, action)
    output = self.run(activity, subtask, action, preprocessed_input)
    postprocessed_output = self.after_run(activity, subtask, action, output)

    return postprocessed_output

find_input_memory(memory_name)

Source code in griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
    if self.input_memory:
        return next((m for m in self.input_memory if m.name == memory_name), None)
    else:
        return None

install_dependencies(env=None)

Source code in griptape/tools/base_tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    env = env if env else {}

    command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

    if self.dependencies_install_directory is None:
        command.extend(["-U"])
    else:
        command.extend(["-t", self.dependencies_install_directory])

    subprocess.run(
        command,
        env=env,
        cwd=self.tool_dir(),
        stdout=None if self.verbose else subprocess.DEVNULL,
        stderr=None if self.verbose else subprocess.DEVNULL,
    )

run(activity, subtask, action, value)

Source code in griptape/tools/base_tool.py
def run(
    self, activity: Callable, subtask: ActionsSubtask, action: ActionsSubtask.Action, value: Optional[dict]
) -> BaseArtifact:
    activity_result = activity(value)

    if isinstance(activity_result, BaseArtifact):
        result = activity_result
    else:
        logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

        result = InfoArtifact(activity_result)

    return result

schema()

Source code in griptape/tools/base_tool.py
def schema(self) -> dict:
    full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

    return full_schema.json_schema(f"{self.name} Action Schema")

tool_dir()

Source code in griptape/tools/base_tool.py
def tool_dir(self):
    class_file = inspect.getfile(self.__class__)

    return os.path.dirname(os.path.abspath(class_file))

validate()

Source code in griptape/tools/base_tool.py
def validate(self) -> bool:
    from griptape.utils import ManifestValidator

    if not os.path.exists(self.manifest_path):
        raise Exception(f"{self.MANIFEST_FILE} not found")

    if not os.path.exists(self.requirements_path):
        raise Exception(f"{self.REQUIREMENTS_FILE} not found")

    ManifestValidator().validate(self.manifest)

    return True

validate_output_memory(_, output_memory)

Source code in griptape/tools/base_tool.py
@output_memory.validator  # pyright: ignore
def validate_output_memory(self, _, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
    if output_memory:
        for activity_name, memory_list in output_memory.items():
            if not self.find_activity(activity_name):
                raise ValueError(f"activity {activity_name} doesn't exist")
            if memory_list is None:
                raise ValueError(f"memory list for activity '{activity_name}' can't be None")

            output_memory_names = [memory.name for memory in memory_list]

            if len(output_memory_names) > len(set(output_memory_names)):
                raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

Calculator

Bases: BaseTool

Source code in griptape/tools/calculator/tool.py
class Calculator(BaseTool):
    @activity(
        config={
            "description": "Can be used for computing simple numerical or algebraic calculations in Python",
            "schema": Schema(
                {
                    Literal(
                        "expression",
                        description="Arithmetic expression parsable in pure Python. Single line only. "
                        "Don't use variables. Don't use any imports or external libraries",
                    ): str
                }
            ),
        }
    )
    def calculate(self, params: dict) -> BaseArtifact:
        import numexpr  # type: ignore

        try:
            expression = params["values"]["expression"]

            return TextArtifact(numexpr.evaluate(expression))
        except Exception as e:
            return ErrorArtifact(f"error calculating: {e}")

calculate(params)

Source code in griptape/tools/calculator/tool.py
@activity(
    config={
        "description": "Can be used for computing simple numerical or algebraic calculations in Python",
        "schema": Schema(
            {
                Literal(
                    "expression",
                    description="Arithmetic expression parsable in pure Python. Single line only. "
                    "Don't use variables. Don't use any imports or external libraries",
                ): str
            }
        ),
    }
)
def calculate(self, params: dict) -> BaseArtifact:
    import numexpr  # type: ignore

    try:
        expression = params["values"]["expression"]

        return TextArtifact(numexpr.evaluate(expression))
    except Exception as e:
        return ErrorArtifact(f"error calculating: {e}")

Computer

Bases: BaseTool

Source code in griptape/tools/computer/tool.py
@define
class Computer(BaseTool):
    local_workdir: Optional[str] = field(default=None, kw_only=True)
    container_workdir: str = field(default="/griptape", kw_only=True)
    env_vars: dict = field(factory=dict, kw_only=True)
    dockerfile_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True),
        kw_only=True,
    )
    requirements_txt_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True),
        kw_only=True,
    )
    docker_client: DockerClient = field(
        default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True
    )

    _tempdir: Optional[tempfile.TemporaryDirectory] = field(default=None, kw_only=True)

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()

        if self.local_workdir:
            Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
        else:
            self._tempdir = tempfile.TemporaryDirectory()
            self.local_workdir = self._tempdir.name

    @docker_client.validator  # pyright: ignore
    def validate_docker_client(self, _, docker_client: DockerClient) -> None:
        if not docker_client:
            raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        super().install_dependencies(env)

        self.remove_existing_container(self.container_name(self))
        self.build_image(self)

    @activity(
        config={
            "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
            " files in the file system. If you need to use code output use `print` statements. "
            "You have access to the following external Python libraries: "
            "{{ _self.dependencies() }}",
            "schema": Schema(
                {
                    Literal("code", description="Python code to execute"): str,
                    Literal(
                        "filename", description="name of the file to put the Python code in before executing it"
                    ): str,
                }
            ),
        }
    )
    def execute_code(self, params: dict) -> BaseArtifact:
        code = params["values"]["code"]
        filename = params["values"]["filename"]

        return self.execute_code_in_container(filename, code)

    @activity(
        config={
            "description": "Can be used to execute shell commands in Linux",
            "schema": Schema({Literal("command", description="shell command to execute"): str}),
        }
    )
    def execute_command(self, params: dict) -> BaseArtifact:
        command = params["values"]["command"]

        return self.execute_command_in_container(command)

    def execute_command_in_container(self, command: str) -> BaseArtifact:
        try:
            binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}}

            container = self.docker_client.containers.run(
                self.image_name(self),
                environment=self.env_vars,
                command=command,
                name=self.container_name(self),
                volumes=binds,
                stdout=True,
                stderr=True,
                detach=True,
            )

            if isinstance(container, Container):
                container.wait()

                stderr = container.logs(stdout=False, stderr=True).decode().strip()
                stdout = container.logs(stdout=True, stderr=False).decode().strip()

                container.stop()
                container.remove()

                if stderr:
                    return ErrorArtifact(stderr)
                else:
                    return TextArtifact(stdout)
            else:
                return ErrorArtifact("error running container")
        except Exception as e:
            return ErrorArtifact(f"error executing command: {e}")

    def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
        container_file_path = os.path.join(self.container_workdir, filename)

        if self.local_workdir:
            tempdir = None
            local_workdir = self.local_workdir
        else:
            tempdir = tempfile.TemporaryDirectory()
            local_workdir = tempdir.name

        local_file_path = os.path.join(local_workdir, filename)

        try:
            with open(local_file_path, "w") as f:
                f.write(code)

            return self.execute_command_in_container(f"python {container_file_path}")
        except Exception as e:
            return ErrorArtifact(f"error executing code: {e}")
        finally:
            if tempdir:
                tempdir.cleanup()

    def default_docker_client(self) -> Optional[DockerClient]:
        try:
            return docker.from_env()
        except Exception as e:
            logging.error(e)

            return None

    def image_name(self, tool: BaseTool) -> str:
        return f"{stringcase.snakecase(tool.name)}_image"

    def container_name(self, tool: BaseTool) -> str:
        return f"{stringcase.snakecase(tool.name)}_container"

    def remove_existing_container(self, name: str) -> None:
        try:
            existing_container = self.docker_client.containers.get(name)
            if isinstance(existing_container, Container):
                existing_container.remove(force=True)

                logging.info(f"Removed existing container: {name}")
        except NotFound:
            pass

    def build_image(self, tool: BaseTool) -> None:
        with tempfile.TemporaryDirectory() as temp_dir:
            shutil.copy(self.dockerfile_path, temp_dir)
            shutil.copy(self.requirements_txt_path, temp_dir)

            image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

            if isinstance(image, tuple):
                logging.info(f"Built image: {image[0].short_id}")

    def dependencies(self) -> list[str]:
        with open(self.requirements_txt_path) as file:
            return [line.strip() for line in file.readlines()]

    def __del__(self) -> None:
        if self._tempdir:
            self._tempdir.cleanup()

container_workdir: str = field(default='/griptape', kw_only=True) class-attribute instance-attribute

docker_client: DockerClient = field(default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True) class-attribute instance-attribute

dockerfile_path: str = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True) class-attribute instance-attribute

env_vars: dict = field(factory=dict, kw_only=True) class-attribute instance-attribute

local_workdir: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

requirements_txt_path: str = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True) class-attribute instance-attribute

__attrs_post_init__()

Source code in griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()

    if self.local_workdir:
        Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
    else:
        self._tempdir = tempfile.TemporaryDirectory()
        self.local_workdir = self._tempdir.name

__del__()

Source code in griptape/tools/computer/tool.py
def __del__(self) -> None:
    if self._tempdir:
        self._tempdir.cleanup()

build_image(tool)

Source code in griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None:
    with tempfile.TemporaryDirectory() as temp_dir:
        shutil.copy(self.dockerfile_path, temp_dir)
        shutil.copy(self.requirements_txt_path, temp_dir)

        image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

        if isinstance(image, tuple):
            logging.info(f"Built image: {image[0].short_id}")

container_name(tool)

Source code in griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str:
    return f"{stringcase.snakecase(tool.name)}_container"

default_docker_client()

Source code in griptape/tools/computer/tool.py
def default_docker_client(self) -> Optional[DockerClient]:
    try:
        return docker.from_env()
    except Exception as e:
        logging.error(e)

        return None

dependencies()

Source code in griptape/tools/computer/tool.py
def dependencies(self) -> list[str]:
    with open(self.requirements_txt_path) as file:
        return [line.strip() for line in file.readlines()]

execute_code(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
        " files in the file system. If you need to use code output use `print` statements. "
        "You have access to the following external Python libraries: "
        "{{ _self.dependencies() }}",
        "schema": Schema(
            {
                Literal("code", description="Python code to execute"): str,
                Literal(
                    "filename", description="name of the file to put the Python code in before executing it"
                ): str,
            }
        ),
    }
)
def execute_code(self, params: dict) -> BaseArtifact:
    code = params["values"]["code"]
    filename = params["values"]["filename"]

    return self.execute_code_in_container(filename, code)

execute_code_in_container(filename, code)

Source code in griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
    container_file_path = os.path.join(self.container_workdir, filename)

    if self.local_workdir:
        tempdir = None
        local_workdir = self.local_workdir
    else:
        tempdir = tempfile.TemporaryDirectory()
        local_workdir = tempdir.name

    local_file_path = os.path.join(local_workdir, filename)

    try:
        with open(local_file_path, "w") as f:
            f.write(code)

        return self.execute_command_in_container(f"python {container_file_path}")
    except Exception as e:
        return ErrorArtifact(f"error executing code: {e}")
    finally:
        if tempdir:
            tempdir.cleanup()

execute_command(params)

Source code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute shell commands in Linux",
        "schema": Schema({Literal("command", description="shell command to execute"): str}),
    }
)
def execute_command(self, params: dict) -> BaseArtifact:
    command = params["values"]["command"]

    return self.execute_command_in_container(command)

execute_command_in_container(command)

Source code in griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact:
    try:
        binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}}

        container = self.docker_client.containers.run(
            self.image_name(self),
            environment=self.env_vars,
            command=command,
            name=self.container_name(self),
            volumes=binds,
            stdout=True,
            stderr=True,
            detach=True,
        )

        if isinstance(container, Container):
            container.wait()

            stderr = container.logs(stdout=False, stderr=True).decode().strip()
            stdout = container.logs(stdout=True, stderr=False).decode().strip()

            container.stop()
            container.remove()

            if stderr:
                return ErrorArtifact(stderr)
            else:
                return TextArtifact(stdout)
        else:
            return ErrorArtifact("error running container")
    except Exception as e:
        return ErrorArtifact(f"error executing command: {e}")

image_name(tool)

Source code in griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str:
    return f"{stringcase.snakecase(tool.name)}_image"

install_dependencies(env=None)

Source code in griptape/tools/computer/tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    super().install_dependencies(env)

    self.remove_existing_container(self.container_name(self))
    self.build_image(self)

remove_existing_container(name)

Source code in griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None:
    try:
        existing_container = self.docker_client.containers.get(name)
        if isinstance(existing_container, Container):
            existing_container.remove(force=True)

            logging.info(f"Removed existing container: {name}")
    except NotFound:
        pass

validate_docker_client(_, docker_client)

Source code in griptape/tools/computer/tool.py
@docker_client.validator  # pyright: ignore
def validate_docker_client(self, _, docker_client: DockerClient) -> None:
    if not docker_client:
        raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

DateTime

Bases: BaseTool

Source code in griptape/tools/date_time/tool.py
class DateTime(BaseTool):
    @activity(config={"description": "Can be used to return current date and time."})
    def get_current_datetime(self, _: dict) -> BaseArtifact:
        try:
            current_datetime = datetime.now()

            return TextArtifact(str(current_datetime))
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

    @activity(
        config={
            "description": "Can be used to return a relative date and time.",
            "schema": Schema(
                {
                    Literal(
                        "relative_date_string",
                        description='Relative date in English. For example, "now EST", "20 minutes ago", '
                        '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                    ): str
                }
            ),
        }
    )
    def get_relative_datetime(self, params: dict) -> BaseArtifact:
        from dateparser import parse

        try:
            date_string = params["values"]["relative_date_string"]
            relative_datetime = parse(date_string)

            if relative_datetime:
                return TextArtifact(str(relative_datetime))
            else:
                return ErrorArtifact("invalid date string")
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

get_current_datetime(_)

Source code in griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."})
def get_current_datetime(self, _: dict) -> BaseArtifact:
    try:
        current_datetime = datetime.now()

        return TextArtifact(str(current_datetime))
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

get_relative_datetime(params)

Source code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to return a relative date and time.",
        "schema": Schema(
            {
                Literal(
                    "relative_date_string",
                    description='Relative date in English. For example, "now EST", "20 minutes ago", '
                    '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                ): str
            }
        ),
    }
)
def get_relative_datetime(self, params: dict) -> BaseArtifact:
    from dateparser import parse

    try:
        date_string = params["values"]["relative_date_string"]
        relative_datetime = parse(date_string)

        if relative_datetime:
            return TextArtifact(str(relative_datetime))
        else:
            return ErrorArtifact("invalid date string")
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

EmailClient

Bases: BaseTool

Tool for working with email

Attributes:

Name Type Description
username Optional[str]

Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com.

password Optional[str]

Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password.

email_max_retrieve_count Optional[int]

Used to limit the number of messages retrieved during any given activities.

smtp_host Optional[str]

Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the send activity.

smtp_port Optional[int]

Port of the SMTP server. Example: 465. Required when using the send activity.

smtp_use_ssl bool

Whether to use SSL when sending email via the SMTP protocol.

smtp_user Optional[str]

Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the send activity.

smtp_password Optional[str]

Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the send activity.

imap_url Optional[str]

Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the retrieve activity.

imap_user Optional[str]

Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the retrieve activity.

imap_password Optional[str]

Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the retrieve activity.

mailboxes dict[str, Optional[str]]

Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the retrieve activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}

email_loader EmailLoader

Instance of EmailLoader.

Source code in griptape/tools/email_client/tool.py
@define
class EmailClient(BaseTool):
    """Tool for working with email

    Attributes:
        username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol.
            Example: bender@futurama.com.
        password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail,
            this would be an App Password.
        email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities.
        smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the `send` activity.
        smtp_port: Port of the SMTP server. Example: 465. Required when using the `send` activity.
        smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol.
        smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the `send` activity.
        smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the `send` activity.
        imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the `retrieve` activity.
        imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the `retrieve` activity.
        imap_password: Password to retrieve email via the IMAP protocol.  Overrides password for IMAP only. Required when using the `retrieve` activity.
        mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the `retrieve` activity.
            Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}
        email_loader: Instance of `EmailLoader`.
    """

    username: Optional[str] = field(default=None, kw_only=True)
    password: Optional[str] = field(default=None, kw_only=True)
    email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True)
    smtp_host: Optional[str] = field(default=None, kw_only=True)
    smtp_port: Optional[int] = field(default=None, kw_only=True)
    smtp_use_ssl: bool = field(default=True, kw_only=True)
    smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    imap_url: Optional[str] = field(default=None, kw_only=True)
    imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    mailboxes: dict[str, Optional[str]] = field(default=None, kw_only=True)
    email_loader: EmailLoader = field(
        default=Factory(
            lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password),
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to retrieve emails."
            "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
            "schema": Schema(
                {
                    Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                    schema.Optional(
                        Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'")
                    ): str,
                    schema.Optional(
                        Literal("search_criteria", description="Optional search criteria to filter emails by key")
                    ): str,
                    schema.Optional(Literal("max_count", description="Optional max email count")): int,
                }
            ),
        }
    )
    def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
        if self.mailboxes is None:
            return ErrorArtifact("mailboxes is required")

        values = params["values"]
        max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

        return self.email_loader.load(
            EmailLoader.EmailQuery(
                label=values["label"],
                key=values.get("key"),
                search_criteria=values.get("search_criteria"),
                max_count=max_count,
            )
        )

    @activity(
        config={
            "description": "Can be used to send emails",
            "schema": Schema(
                {
                    Literal("to", description="Recipient's email address"): str,
                    Literal("subject", description="Email subject"): str,
                    Literal("body", description="Email body"): str,
                }
            ),
        }
    )
    def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        if self.smtp_user is None:
            return ErrorArtifact("smtp_user is required")
        if self.smtp_password is None:
            return ErrorArtifact("smtp_password is required")
        if self.smtp_host is None:
            return ErrorArtifact("smtp_host is required")
        if self.smtp_port is None:
            return ErrorArtifact("smtp_port is required")

        msg = MIMEText(values["body"])
        msg["Subject"] = values["subject"]
        msg["From"] = self.smtp_user
        msg["To"] = values["to"]

        try:
            with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
                client.login(self.smtp_user, self.smtp_password)
                client.sendmail(msg["From"], [msg["To"]], msg.as_string())
                return InfoArtifact("email was successfully sent")
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error sending email: {e}")

    def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL:
        if self.smtp_use_ssl:
            return smtplib.SMTP_SSL(smtp_host, smtp_port)
        else:
            return smtplib.SMTP(smtp_host, smtp_port)

email_loader: EmailLoader = field(default=Factory(lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True) class-attribute instance-attribute

email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute

imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

imap_url: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

mailboxes: dict[str, Optional[str]] = field(default=None, kw_only=True) class-attribute instance-attribute

password: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_host: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

smtp_port: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute

smtp_use_ssl: bool = field(default=True, kw_only=True) class-attribute instance-attribute

smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

username: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

retrieve(params)

Source code in griptape/tools/email_client/tool.py
@activity(
    config={
        "description": "Can be used to retrieve emails."
        "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
        "schema": Schema(
            {
                Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                schema.Optional(
                    Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'")
                ): str,
                schema.Optional(
                    Literal("search_criteria", description="Optional search criteria to filter emails by key")
                ): str,
                schema.Optional(Literal("max_count", description="Optional max email count")): int,
            }
        ),
    }
)
def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
    if self.mailboxes is None:
        return ErrorArtifact("mailboxes is required")

    values = params["values"]
    max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

    return self.email_loader.load(
        EmailLoader.EmailQuery(
            label=values["label"],
            key=values.get("key"),
            search_criteria=values.get("search_criteria"),
            max_count=max_count,
        )
    )

send(params)

Source code in griptape/tools/email_client/tool.py
@activity(
    config={
        "description": "Can be used to send emails",
        "schema": Schema(
            {
                Literal("to", description="Recipient's email address"): str,
                Literal("subject", description="Email subject"): str,
                Literal("body", description="Email body"): str,
            }
        ),
    }
)
def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    if self.smtp_user is None:
        return ErrorArtifact("smtp_user is required")
    if self.smtp_password is None:
        return ErrorArtifact("smtp_password is required")
    if self.smtp_host is None:
        return ErrorArtifact("smtp_host is required")
    if self.smtp_port is None:
        return ErrorArtifact("smtp_port is required")

    msg = MIMEText(values["body"])
    msg["Subject"] = values["subject"]
    msg["From"] = self.smtp_user
    msg["To"] = values["to"]

    try:
        with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
            client.login(self.smtp_user, self.smtp_password)
            client.sendmail(msg["From"], [msg["To"]], msg.as_string())
            return InfoArtifact("email was successfully sent")
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error sending email: {e}")

FileManager

Bases: BaseTool

FileManager is a tool that can be used to list, load, and save files.

Attributes:

Name Type Description
file_manager_driver BaseFileManagerDriver

File Manager Driver to use to list, load, and save files.

Source code in griptape/tools/file_manager/tool.py
@define
class FileManager(BaseTool):
    """
    FileManager is a tool that can be used to list, load, and save files.

    Attributes:
        file_manager_driver: File Manager Driver to use to list, load, and save files.
    """

    file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True)

    @activity(
        config={
            "description": "Can be used to list files on disk",
            "schema": Schema(
                {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}
            ),
        }
    )
    def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        path = params["values"]["path"]
        return self.file_manager_driver.list_files(path)

    @activity(
        config={
            "description": "Can be used to load files from disk",
            "schema": Schema(
                {
                    Literal(
                        "paths",
                        description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                    ): list
                }
            ),
        }
    )
    def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
        paths = params["values"]["paths"]
        artifacts = []

        for path in paths:
            artifact = self.file_manager_driver.load_file(path)
            if isinstance(artifact, ListArtifact):
                artifacts.extend(artifact.value)
            else:
                artifacts.append(artifact)
        return ListArtifact(artifacts)

    @activity(
        config={
            "description": "Can be used to save memory artifacts to disk",
            "schema": Schema(
                {
                    Literal(
                        "dir_name",
                        description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                    ): str,
                    Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                }
            ),
        }
    )
    def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
        dir_name = params["values"]["dir_name"]
        file_name = params["values"]["file_name"]
        memory_name = params["values"]["memory_name"]
        artifact_namespace = params["values"]["artifact_namespace"]

        memory = self.find_input_memory(params["values"]["memory_name"])
        if not memory:
            return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

        list_artifact = memory.load_artifacts(artifact_namespace)

        if len(list_artifact) == 0:
            return ErrorArtifact(
                f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts"
            )

        for artifact in list_artifact.value:
            formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
            result = self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), artifact.value)
            if isinstance(result, ErrorArtifact):
                return result

        return InfoArtifact("Successfully saved memory artifacts to disk")

    @activity(
        config={
            "description": "Can be used to save content to a file",
            "schema": Schema(
                {
                    Literal(
                        "path",
                        description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                    ): str,
                    "content": str,
                }
            ),
        }
    )
    def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
        path = params["values"]["path"]
        content = params["values"]["content"]
        return self.file_manager_driver.save_file(path, content)

file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) class-attribute instance-attribute

list_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to list files on disk",
        "schema": Schema(
            {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}
        ),
    }
)
def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    path = params["values"]["path"]
    return self.file_manager_driver.list_files(path)

load_files_from_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to load files from disk",
        "schema": Schema(
            {
                Literal(
                    "paths",
                    description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                ): list
            }
        ),
    }
)
def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
    paths = params["values"]["paths"]
    artifacts = []

    for path in paths:
        artifact = self.file_manager_driver.load_file(path)
        if isinstance(artifact, ListArtifact):
            artifacts.extend(artifact.value)
        else:
            artifacts.append(artifact)
    return ListArtifact(artifacts)

save_content_to_file(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save content to a file",
        "schema": Schema(
            {
                Literal(
                    "path",
                    description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                ): str,
                "content": str,
            }
        ),
    }
)
def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
    path = params["values"]["path"]
    content = params["values"]["content"]
    return self.file_manager_driver.save_file(path, content)

save_memory_artifacts_to_disk(params)

Source code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save memory artifacts to disk",
        "schema": Schema(
            {
                Literal(
                    "dir_name",
                    description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                ): str,
                Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                "memory_name": str,
                "artifact_namespace": str,
            }
        ),
    }
)
def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
    dir_name = params["values"]["dir_name"]
    file_name = params["values"]["file_name"]
    memory_name = params["values"]["memory_name"]
    artifact_namespace = params["values"]["artifact_namespace"]

    memory = self.find_input_memory(params["values"]["memory_name"])
    if not memory:
        return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

    list_artifact = memory.load_artifacts(artifact_namespace)

    if len(list_artifact) == 0:
        return ErrorArtifact(
            f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts"
        )

    for artifact in list_artifact.value:
        formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
        result = self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), artifact.value)
        if isinstance(result, ErrorArtifact):
            return result

    return InfoArtifact("Successfully saved memory artifacts to disk")

GoogleCalendarClient

Bases: BaseGoogleClient

Source code in griptape/tools/google_cal/tool.py
@define
class GoogleCalendarClient(BaseGoogleClient):
    CREATE_EVENT_SCOPES = ["https://www.googleapis.com/auth/calendar"]

    GET_UPCOMING_EVENTS_SCOPES = ["https://www.googleapis.com/auth/calendar"]

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to get upcoming events from a google calendar",
            "schema": Schema(
                {
                    Literal("calendar_id", description="id of the google calendar such as 'primary'"): str,
                    Literal("max_events", description="maximum number of events to return"): int,
                }
            ),
        }
    )
    def get_upcoming_events(self, params: dict) -> ListArtifact | ErrorArtifact:
        values = params["values"]

        try:
            service = self._build_client(
                scopes=self.GET_UPCOMING_EVENTS_SCOPES,
                service_name="calendar",
                version="v3",
                owner_email=self.owner_email,
            )
            now = datetime.datetime.utcnow().isoformat() + "Z"

            events_result = (
                service.events()
                .list(
                    calendarId=values["calendar_id"],
                    timeMin=now,
                    maxResults=values["max_events"],
                    singleEvents=True,
                    orderBy="startTime",
                )
                .execute()
            )
            events = events_result.get("items", [])

            return ListArtifact([TextArtifact(str(e)) for e in events])
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error retrieving calendar events {e}")

    @activity(
        config={
            "description": "Can be used to create an event on a google calendar",
            "schema": Schema(
                {
                    Literal(
                        "start_datetime",
                        description="combined date-time value in string format according to RFC3399 "
                        "excluding the timezone for when the meeting starts",
                    ): str,
                    Literal(
                        "start_time_zone",
                        description="time zone in which the start time is specified in string format "
                        "according to IANA time zone data base name, such as 'Europe/Zurich'",
                    ): str,
                    Literal(
                        "end_datetime",
                        description="combined date-time value in string format according to RFC3399 "
                        "excluding the timezone for when the meeting ends",
                    ): str,
                    Literal(
                        "end_time_zone",
                        description="time zone in which the end time is specified in string format "
                        "according to IANA time zone data base name, such as 'Europe/Zurich'",
                    ): str,
                    Literal("title", description="title of the event"): str,
                    Literal("description", description="description of the event"): str,
                    Literal(
                        "attendees", description="list of the email addresses of attendees using 'email' as key"
                    ): list,
                    Optional(Literal("location", description="location of the event")): str,
                }
            ),
        }
    )
    def create_event(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]

        try:
            service = self._build_client(
                scopes=self.CREATE_EVENT_SCOPES, service_name="calendar", version="v3", owner_email=self.owner_email
            )

            event = {
                "summary": values["title"],
                "location": values.get("location"),
                "description": values["description"],
                "start": {"dateTime": values["start_datetime"], "timeZone": values["start_time_zone"]},
                "end": {"dateTime": values["end_datetime"], "timeZone": values["end_time_zone"]},
                "attendees": values["attendees"],
            }
            event = service.events().insert(calendarId="primary", body=event).execute()
            return InfoArtifact(f'A calendar event was successfully created. (Link:{event.get("htmlLink")})')
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error creating calendar event: {e}")

CREATE_EVENT_SCOPES = ['https://www.googleapis.com/auth/calendar'] class-attribute instance-attribute

GET_UPCOMING_EVENTS_SCOPES = ['https://www.googleapis.com/auth/calendar'] class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

create_event(params)

Source code in griptape/tools/google_cal/tool.py
@activity(
    config={
        "description": "Can be used to create an event on a google calendar",
        "schema": Schema(
            {
                Literal(
                    "start_datetime",
                    description="combined date-time value in string format according to RFC3399 "
                    "excluding the timezone for when the meeting starts",
                ): str,
                Literal(
                    "start_time_zone",
                    description="time zone in which the start time is specified in string format "
                    "according to IANA time zone data base name, such as 'Europe/Zurich'",
                ): str,
                Literal(
                    "end_datetime",
                    description="combined date-time value in string format according to RFC3399 "
                    "excluding the timezone for when the meeting ends",
                ): str,
                Literal(
                    "end_time_zone",
                    description="time zone in which the end time is specified in string format "
                    "according to IANA time zone data base name, such as 'Europe/Zurich'",
                ): str,
                Literal("title", description="title of the event"): str,
                Literal("description", description="description of the event"): str,
                Literal(
                    "attendees", description="list of the email addresses of attendees using 'email' as key"
                ): list,
                Optional(Literal("location", description="location of the event")): str,
            }
        ),
    }
)
def create_event(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]

    try:
        service = self._build_client(
            scopes=self.CREATE_EVENT_SCOPES, service_name="calendar", version="v3", owner_email=self.owner_email
        )

        event = {
            "summary": values["title"],
            "location": values.get("location"),
            "description": values["description"],
            "start": {"dateTime": values["start_datetime"], "timeZone": values["start_time_zone"]},
            "end": {"dateTime": values["end_datetime"], "timeZone": values["end_time_zone"]},
            "attendees": values["attendees"],
        }
        event = service.events().insert(calendarId="primary", body=event).execute()
        return InfoArtifact(f'A calendar event was successfully created. (Link:{event.get("htmlLink")})')
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error creating calendar event: {e}")

get_upcoming_events(params)

Source code in griptape/tools/google_cal/tool.py
@activity(
    config={
        "description": "Can be used to get upcoming events from a google calendar",
        "schema": Schema(
            {
                Literal("calendar_id", description="id of the google calendar such as 'primary'"): str,
                Literal("max_events", description="maximum number of events to return"): int,
            }
        ),
    }
)
def get_upcoming_events(self, params: dict) -> ListArtifact | ErrorArtifact:
    values = params["values"]

    try:
        service = self._build_client(
            scopes=self.GET_UPCOMING_EVENTS_SCOPES,
            service_name="calendar",
            version="v3",
            owner_email=self.owner_email,
        )
        now = datetime.datetime.utcnow().isoformat() + "Z"

        events_result = (
            service.events()
            .list(
                calendarId=values["calendar_id"],
                timeMin=now,
                maxResults=values["max_events"],
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )
        events = events_result.get("items", [])

        return ListArtifact([TextArtifact(str(e)) for e in events])
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error retrieving calendar events {e}")

GoogleDocsClient

Bases: BaseGoogleClient

Source code in griptape/tools/google_docs/tool.py
@define
class GoogleDocsClient(BaseGoogleClient):
    DOCS_SCOPES = ["https://www.googleapis.com/auth/documents"]

    DEFAULT_FOLDER_PATH = "root"

    owner_email: str = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to append text to a Google Doc.",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Destination file path of Google Doc in the POSIX format. "
                        "For example, 'foo/bar/baz.txt'",
                    ): str,
                    Literal("text", description="Text to be appended to the Google Doc."): str,
                }
            ),
        }
    )
    def append_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        text = values.get("text")

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            document_id = self._convert_path_to_file_id(drive_service, file_path)
            if document_id:
                doc = docs_service.documents().get(documentId=document_id).execute()
                content = doc["body"]["content"]
                last_text = content[-1]["paragraph"]["elements"][-1]["textRun"]["content"]
                append_index = content[-1]["endIndex"]
                if last_text.endswith("\n"):
                    append_index -= 1

                requests = [{"insertText": {"location": {"index": append_index}, "text": text}}]

                docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
                return InfoArtifact("text appended successfully")
            else:
                return ErrorArtifact(f"error appending to Google Doc, file not found for path {file_path}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error appending text to Google Doc with path {file_path}: {e}")

    @activity(
        config={
            "description": "Can be used to prepend text to a Google Doc",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Destination file path of Google Doc in the POSIX format. "
                        "For example, 'foo/bar/baz.txt'",
                    ): str,
                    Literal("text", description="Text to be prepended to the Google Doc."): str,
                }
            ),
        }
    )
    def prepend_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        text = values.get("text")

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            document_id = self._convert_path_to_file_id(drive_service, file_path)
            if document_id:
                doc = docs_service.documents().get(documentId=document_id).execute()

                if len(doc["body"]["content"]) == 1:
                    requests = [{"insertText": {"location": {"index": 1}, "text": text}}]
                else:
                    start_index = doc["body"]["content"][1]["startIndex"]
                    requests = [{"insertText": {"location": {"index": start_index}, "text": text}}]

                docs_service.documents().batchUpdate(documentId=document_id, body={"requests": requests}).execute()
                return InfoArtifact("text prepended successfully")
            else:
                return ErrorArtifact(f"error prepending to google doc, file not found for path {file_path}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error prepending text to Google Doc with path {file_path}: {e}")

    @activity(
        config={
            "description": "Can be used to create a new Google Doc and optionally save content to it.",
            "schema": Schema(
                {
                    Literal(
                        "file_path",
                        description="Name of the file to be created, which will be used to save content in.",
                    ): str,
                    Optional("content", default=None, description="Optional content to be saved in Google Doc."): str,
                    Optional(
                        "folder_path",
                        default=DEFAULT_FOLDER_PATH,
                        description="Path of the folder where the Google doc will be created.",
                    ): str,
                }
            ),
        }
    )
    def save_content_to_google_doc(self, params: dict) -> ErrorArtifact | InfoArtifact:
        values = params["values"]
        file_path = values.get("file_path")
        content = values.get("content")
        folder_path = values.get("folder_path", self.DEFAULT_FOLDER_PATH)

        try:
            docs_service = self._build_client(
                scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
            )
            drive_service = self._build_client(
                scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
            )

            body = {"title": file_path}

            doc = docs_service.documents().create(body=body).execute()
            doc_id = doc["documentId"]

            if folder_path.lower() != self.DEFAULT_FOLDER_PATH:
                folder_id = self._convert_path_to_file_id(drive_service, folder_path)
                if folder_id:
                    drive_service.files().update(fileId=doc_id, addParents=folder_id, fields="id, parents").execute()
                else:
                    return ErrorArtifact(f"Error: Folder not found for path {folder_path}")

            if content:
                save_content_params = {"document_id": doc_id, "content": content}
                saved_document_id = self._save_to_doc(save_content_params)
                return InfoArtifact(f"Content has been successfully saved to Google Doc with ID: {saved_document_id}.")
            else:
                return InfoArtifact(f"Google Doc '{file_path}' created with ID: {doc_id}")

        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"Error creating/saving Google Doc: {e}")

    @activity(
        config={
            "description": "Can be used to load content from memory and save it to a new Google Doc "
            "in the specified folder.",
            "schema": Schema(
                {
                    "memory_name": str,
                    "artifact_namespace": str,
                    "file_name": str,
                    Optional(
                        "folder_path",
                        description="Path of the folder where the Google Doc should be saved.",
                        default=DEFAULT_FOLDER_PATH,
                    ): str,
                }
            ),
        }
    )
    def save_memory_artifacts_to_google_docs(self, params: dict) -> ErrorArtifact | InfoArtifact:
        values = params["values"]
        memory = self.find_input_memory(values["memory_name"])

        if memory:
            artifacts = memory.load_artifacts(values["artifact_namespace"])

            if artifacts:
                try:
                    file_path = values["file_name"]
                    content = "\n".join([a.value for a in artifacts])

                    save_params = {
                        "file_path": file_path,
                        "content": content,
                        "folder_path": values.get("folder_path", self.DEFAULT_FOLDER_PATH),
                    }

                    return self.save_content_to_google_doc(save_params)

                except Exception as e:
                    return ErrorArtifact(f"Error: {e}")

            else:
                return ErrorArtifact("no artifacts found")
        else:
            return ErrorArtifact("memory not found")

    def _save_to_doc(self, params: dict) -> str:
        service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )

        requests = [{"insertText": {"location": {"index": 1}, "text": params["content"]}}]
        service.documents().batchUpdate(documentId=params["document_id"], body={"requests": requests}).execute()
        return params["document_id"]

DEFAULT_FOLDER_PATH = 'root' class-attribute instance-attribute

DOCS_SCOPES = ['https://www.googleapis.com/auth/documents'] class-attribute instance-attribute

owner_email: str = field(kw_only=True) class-attribute instance-attribute

append_text_to_google_doc(params)

Source code in griptape/tools/google_docs/tool.py
@activity(
    config={
        "description": "Can be used to append text to a Google Doc.",
        "schema": Schema(
            {
                Literal(
                    "file_path",
                    description="Destination file path of Google Doc in the POSIX format. "
                    "For example, 'foo/bar/baz.txt'",
                ): str,
                Literal("text", description="Text to be appended to the Google Doc."): str,
            }
        ),
    }
)
def append_text_to_google_doc(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    file_path = values.get("file_path")
    text = values.get("text")

    try:
        docs_service = self._build_client(
            scopes=self.DOCS_SCOPES, service_name="docs", version="v1", owner_email=self.owner_email
        )
        drive_service = self._build_client(
            scopes=self.DRIVE_FILE_SCOPES, service_name="drive", version="v3", owner_email=self.owner_email
        )

        document_id = self._convert_path_to_file_id(drive_service, file_path)
        if document_id:
            doc = docs_service.documents().get(documentId=document_id).execute()
            content = doc["body"]["content"]
            last_text = content[-1]["paragraph"]["elements"][-1]["textRun"]["content"]
            append_index = content[-1]["endIndex"]
            if last_text.endswith("\n"):
                append_index -= 1

            requests = [{"insertText": {"location": {"index": append_index}, "text": text