Skip to content

pgai

__all__ = ['PgAiKnowledgeBaseVectorStoreDriver'] module-attribute

PgAiKnowledgeBaseVectorStoreDriver

Bases: BaseVectorStoreDriver

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
@define
class PgAiKnowledgeBaseVectorStoreDriver(BaseVectorStoreDriver):
    connection_string: str = field(kw_only=True, metadata={"serializable": True})
    knowledge_base_name: str = field(kw_only=True, metadata={"serializable": True})
    embedding_driver: BaseEmbeddingDriver = field(
        default=Factory(lambda: DummyEmbeddingDriver()),
        metadata={"serializable": True},
        kw_only=True,
        init=False,
    )
    _engine: sqlalchemy.Engine = field(default=None, kw_only=True, alias="engine", metadata={"serializable": False})

    @lazy_property()
    def engine(self) -> sqlalchemy.Engine:
        return import_optional_dependency("sqlalchemy").create_engine(self.connection_string)

    def query(
        self,
        query: str | TextArtifact | ImageArtifact,
        *,
        count: Optional[int] = BaseVectorStoreDriver.DEFAULT_QUERY_COUNT,
        **kwargs,
    ) -> list[BaseVectorStoreDriver.Entry]:
        if isinstance(query, ImageArtifact):
            raise ValueError(f"{self.__class__.__name__} does not support querying with Image Artifacts.")

        sqlalchemy = import_optional_dependency("sqlalchemy")

        with sqlalchemy.orm.Session(self.engine) as session:
            rows = session.query(sqlalchemy.func.aidb.retrieve_text(self.knowledge_base_name, query, count)).all()

        entries = []
        for (row,) in rows:
            # Remove the first and last parentheses from the row and list by commas
            # Example: '(foo,bar)' -> ['foo', 'bar']
            row_list = "".join(row.replace("(", "", 1).rsplit(")", 1)).split(",")
            entries.append(
                BaseVectorStoreDriver.Entry(
                    id=row_list[0],
                    score=float(row_list[2]),
                    meta={"artifact": TextArtifact(row_list[1]).to_json()},
                )
            )

        return entries

    def upsert_vector(
        self,
        vector: list[float],
        vector_id: Optional[str] = None,
        namespace: Optional[str] = None,
        meta: Optional[dict] = None,
        **kwargs,
    ) -> str:
        raise NotImplementedError(f"{self.__class__.__name__} does not support vector upsert.")

    def upsert_text_artifact(
        self,
        artifact: TextArtifact,
        namespace: Optional[str] = None,
        meta: Optional[dict] = None,
        vector_id: Optional[str] = None,
        **kwargs,
    ) -> str:
        raise NotImplementedError(f"{self.__class__.__name__} does not support text artifact upsert.")

    def upsert_text(
        self,
        string: str,
        vector_id: Optional[str] = None,
        namespace: Optional[str] = None,
        meta: Optional[dict] = None,
        **kwargs,
    ) -> str:
        raise NotImplementedError(f"{self.__class__.__name__} does not support text upsert.")

    def load_entry(self, vector_id: str, *, namespace: Optional[str] = None) -> BaseVectorStoreDriver.Entry:
        raise NotImplementedError(f"{self.__class__.__name__} does not support entry loading.")

    def load_entries(self, *, namespace: Optional[str] = None) -> list[BaseVectorStoreDriver.Entry]:
        raise NotImplementedError(f"{self.__class__.__name__} does not support entry loading.")

    def load_artifacts(self, *, namespace: Optional[str] = None) -> ListArtifact:
        raise NotImplementedError(f"{self.__class__.__name__} does not support Artifact loading.")

    def delete_vector(self, vector_id: str) -> NoReturn:
        raise NotImplementedError(f"{self.__class__.__name__} does not support deletion.")

_engine = field(default=None, kw_only=True, alias='engine', metadata={'serializable': False}) class-attribute instance-attribute

connection_string = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

embedding_driver = field(default=Factory(lambda: DummyEmbeddingDriver()), metadata={'serializable': True}, kw_only=True, init=False) class-attribute instance-attribute

knowledge_base_name = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

delete_vector(vector_id)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def delete_vector(self, vector_id: str) -> NoReturn:
    raise NotImplementedError(f"{self.__class__.__name__} does not support deletion.")

engine()

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
@lazy_property()
def engine(self) -> sqlalchemy.Engine:
    return import_optional_dependency("sqlalchemy").create_engine(self.connection_string)

load_artifacts(*, namespace=None)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def load_artifacts(self, *, namespace: Optional[str] = None) -> ListArtifact:
    raise NotImplementedError(f"{self.__class__.__name__} does not support Artifact loading.")

load_entries(*, namespace=None)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def load_entries(self, *, namespace: Optional[str] = None) -> list[BaseVectorStoreDriver.Entry]:
    raise NotImplementedError(f"{self.__class__.__name__} does not support entry loading.")

load_entry(vector_id, *, namespace=None)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def load_entry(self, vector_id: str, *, namespace: Optional[str] = None) -> BaseVectorStoreDriver.Entry:
    raise NotImplementedError(f"{self.__class__.__name__} does not support entry loading.")

query(query, *, count=BaseVectorStoreDriver.DEFAULT_QUERY_COUNT, **kwargs)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def query(
    self,
    query: str | TextArtifact | ImageArtifact,
    *,
    count: Optional[int] = BaseVectorStoreDriver.DEFAULT_QUERY_COUNT,
    **kwargs,
) -> list[BaseVectorStoreDriver.Entry]:
    if isinstance(query, ImageArtifact):
        raise ValueError(f"{self.__class__.__name__} does not support querying with Image Artifacts.")

    sqlalchemy = import_optional_dependency("sqlalchemy")

    with sqlalchemy.orm.Session(self.engine) as session:
        rows = session.query(sqlalchemy.func.aidb.retrieve_text(self.knowledge_base_name, query, count)).all()

    entries = []
    for (row,) in rows:
        # Remove the first and last parentheses from the row and list by commas
        # Example: '(foo,bar)' -> ['foo', 'bar']
        row_list = "".join(row.replace("(", "", 1).rsplit(")", 1)).split(",")
        entries.append(
            BaseVectorStoreDriver.Entry(
                id=row_list[0],
                score=float(row_list[2]),
                meta={"artifact": TextArtifact(row_list[1]).to_json()},
            )
        )

    return entries

upsert_text(string, vector_id=None, namespace=None, meta=None, **kwargs)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def upsert_text(
    self,
    string: str,
    vector_id: Optional[str] = None,
    namespace: Optional[str] = None,
    meta: Optional[dict] = None,
    **kwargs,
) -> str:
    raise NotImplementedError(f"{self.__class__.__name__} does not support text upsert.")

upsert_text_artifact(artifact, namespace=None, meta=None, vector_id=None, **kwargs)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def upsert_text_artifact(
    self,
    artifact: TextArtifact,
    namespace: Optional[str] = None,
    meta: Optional[dict] = None,
    vector_id: Optional[str] = None,
    **kwargs,
) -> str:
    raise NotImplementedError(f"{self.__class__.__name__} does not support text artifact upsert.")

upsert_vector(vector, vector_id=None, namespace=None, meta=None, **kwargs)

Source code in griptape/drivers/vector/pgai_knowledge_base_vector_store_driver.py
def upsert_vector(
    self,
    vector: list[float],
    vector_id: Optional[str] = None,
    namespace: Optional[str] = None,
    meta: Optional[dict] = None,
    **kwargs,
) -> str:
    raise NotImplementedError(f"{self.__class__.__name__} does not support vector upsert.")