Skip to content

Blob artifact storage

BlobArtifactStorage

Bases: BaseArtifactStorage

Source code in griptape/memory/task/storage/blob_artifact_storage.py
@define
class BlobArtifactStorage(BaseArtifactStorage):
    blobs: dict[str, list[BlobArtifact]] = field(factory=dict, kw_only=True)

    def can_store(self, artifact: BaseArtifact) -> bool:
        return isinstance(artifact, BlobArtifact)

    def store_artifact(self, namespace: str, artifact: BaseArtifact) -> None:
        if isinstance(artifact, BlobArtifact):
            if namespace not in self.blobs:
                self.blobs[namespace] = []

            self.blobs[namespace].append(artifact)
        else:
            raise ValueError("Artifact must be of instance BlobArtifact")

    def load_artifacts(self, namespace: str) -> ListArtifact:
        return ListArtifact(next((blobs for key, blobs in self.blobs.items() if key == namespace), []))

    def summarize(self, namespace: str) -> InfoArtifact:
        return InfoArtifact("can't summarize artifacts")

    def query(self, namespace: str, query: str, metadata: Any = None) -> InfoArtifact:
        return InfoArtifact("can't query artifacts")

blobs: dict[str, list[BlobArtifact]] = field(factory=dict, kw_only=True) class-attribute instance-attribute

can_store(artifact)

Source code in griptape/memory/task/storage/blob_artifact_storage.py
def can_store(self, artifact: BaseArtifact) -> bool:
    return isinstance(artifact, BlobArtifact)

load_artifacts(namespace)

Source code in griptape/memory/task/storage/blob_artifact_storage.py
def load_artifacts(self, namespace: str) -> ListArtifact:
    return ListArtifact(next((blobs for key, blobs in self.blobs.items() if key == namespace), []))

query(namespace, query, metadata=None)

Source code in griptape/memory/task/storage/blob_artifact_storage.py
def query(self, namespace: str, query: str, metadata: Any = None) -> InfoArtifact:
    return InfoArtifact("can't query artifacts")

store_artifact(namespace, artifact)

Source code in griptape/memory/task/storage/blob_artifact_storage.py
def store_artifact(self, namespace: str, artifact: BaseArtifact) -> None:
    if isinstance(artifact, BlobArtifact):
        if namespace not in self.blobs:
            self.blobs[namespace] = []

        self.blobs[namespace].append(artifact)
    else:
        raise ValueError("Artifact must be of instance BlobArtifact")

summarize(namespace)

Source code in griptape/memory/task/storage/blob_artifact_storage.py
def summarize(self, namespace: str) -> InfoArtifact:
    return InfoArtifact("can't summarize artifacts")