Skip to content

Marqo vector store driver

MarqoVectorStoreDriver

Bases: BaseVectorStoreDriver

A Vector Store Driver for Marqo.

Attributes:

Name Type Description
api_key str

The API key for the Marqo API.

url str

The URL to the Marqo API.

mq Optional[Client]

An optional Marqo client. Defaults to a new client with the given URL and API key.

index str

The name of the index to use.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
@define
class MarqoVectorStoreDriver(BaseVectorStoreDriver):
    """A Vector Store Driver for Marqo.

    Attributes:
        api_key: The API key for the Marqo API.
        url: The URL to the Marqo API.
        mq: An optional Marqo client. Defaults to a new client with the given URL and API key.
        index: The name of the index to use.
    """

    api_key: str = field(kw_only=True, metadata={"serializable": True})
    url: str = field(kw_only=True, metadata={"serializable": True})
    mq: Optional[marqo.Client] = field(
        default=Factory(
            lambda self: import_optional_dependency("marqo").Client(self.url, api_key=self.api_key), takes_self=True
        ),
        kw_only=True,
    )
    index: str = field(kw_only=True, metadata={"serializable": True})

    def upsert_text(
        self,
        string: str,
        vector_id: Optional[str] = None,
        namespace: Optional[str] = None,
        meta: Optional[dict] = None,
        **kwargs,
    ) -> str:
        """Upsert a text document into the Marqo index.

        Args:
            string: The string to be indexed.
            vector_id: The ID for the vector. If None, Marqo will generate an ID.
            namespace: An optional namespace for the document.
            meta: An optional dictionary of metadata for the document.

        Returns:
            str: The ID of the document that was added.
        """

        doc = {"_id": vector_id, "Description": string}  # Description will be treated as tensor field

        # Non-tensor fields
        if meta:
            doc["meta"] = str(meta)
        if namespace:
            doc["namespace"] = namespace

        response = self.mq.index(self.index).add_documents([doc], tensor_fields=["Description"])
        if isinstance(response, dict) and "items" in response and response["items"]:
            return response["items"][0]["_id"]
        else:
            raise ValueError(f"Failed to upsert text: {response}")

    def upsert_text_artifact(
        self, artifact: TextArtifact, namespace: Optional[str] = None, meta: Optional[dict] = None, **kwargs
    ) -> str:
        """Upsert a text artifact into the Marqo index.

        Args:
            artifact: The text artifact to be indexed.
            namespace: An optional namespace for the artifact.
            meta: An optional dictionary of metadata for the artifact.

        Returns:
            str: The ID of the artifact that was added.
        """

        artifact_json = artifact.to_json()

        doc = {
            "_id": artifact.id,
            "Description": artifact.value,  # Description will be treated as tensor field
            "artifact": str(artifact_json),
            "namespace": namespace,
        }

        response = self.mq.index(self.index).add_documents([doc], tensor_fields=["Description", "artifact"])
        if isinstance(response, dict) and "items" in response and response["items"]:
            return response["items"][0]["_id"]
        else:
            raise ValueError(f"Failed to upsert text: {response}")

    def load_entry(self, vector_id: str, namespace: Optional[str] = None) -> Optional[BaseVectorStoreDriver.Entry]:
        """Load a document entry from the Marqo index.

        Args:
            vector_id: The ID of the vector to load.
            namespace: The namespace of the vector to load.

        Returns:
            The loaded Entry if found, otherwise None.
        """
        result = self.mq.index(self.index).get_document(document_id=vector_id, expose_facets=True)

        if result and "_tensor_facets" in result and len(result["_tensor_facets"]) > 0:
            return BaseVectorStoreDriver.Entry(
                id=result["_id"],
                meta={k: v for k, v in result.items() if k not in ["_id"]},
                vector=result["_tensor_facets"][0]["_embedding"],
            )
        else:
            return None

    def load_entries(self, namespace: Optional[str] = None) -> list[BaseVectorStoreDriver.Entry]:
        """Load all document entries from the Marqo index.

        Args:
            namespace: The namespace to filter entries by.

        Returns:
            The list of loaded Entries.
        """

        filter_string = f"namespace:{namespace}" if namespace else None

        if filter_string is not None:
            results = self.mq.index(self.index).search("", limit=10000, filter_string=filter_string)
        else:
            results = self.mq.index(self.index).search("", limit=10000)

        # get all _id's from search results
        ids = [r["_id"] for r in results["hits"]]

        # get documents corresponding to the ids
        documents = self.mq.index(self.index).get_documents(document_ids=ids, expose_facets=True)

        # for each document, if it's found, create an Entry object
        entries = []
        for doc in documents["results"]:
            if doc["_found"]:
                entries.append(
                    BaseVectorStoreDriver.Entry(
                        id=doc["_id"],
                        vector=doc["_tensor_facets"][0]["_embedding"],
                        meta={k: v for k, v in doc.items() if k not in ["_id", "_tensor_facets", "_found"]},
                        namespace=doc.get("namespace"),
                    )
                )

        return entries

    def query(
        self,
        query: str,
        count: Optional[int] = None,
        namespace: Optional[str] = None,
        include_vectors: bool = False,
        include_metadata: bool = True,
        **kwargs,
    ) -> list[BaseVectorStoreDriver.QueryResult]:
        """Query the Marqo index for documents.

        Args:
            query: The query string.
            count: The maximum number of results to return.
            namespace: The namespace to filter results by.
            include_vectors: Whether to include vector data in the results.
            include_metadata: Whether to include metadata in the results.

        Returns:
            The list of query results.
        """

        params = {
            "limit": count if count else BaseVectorStoreDriver.DEFAULT_QUERY_COUNT,
            "attributes_to_retrieve": ["*"] if include_metadata else ["_id"],
            "filter_string": f"namespace:{namespace}" if namespace else None,
        } | kwargs

        results = self.mq.index(self.index).search(query, **params)

        if include_vectors:
            results["hits"] = [
                {**r, **self.mq.index(self.index).get_document(r["_id"], expose_facets=True)} for r in results["hits"]
            ]

        return [
            BaseVectorStoreDriver.QueryResult(
                id=r["_id"],
                vector=r["_tensor_facets"][0]["_embedding"] if include_vectors else [],
                score=r["_score"],
                meta={k: v for k, v in r.items() if k not in ["_score", "_tensor_facets"]},
            )
            for r in results["hits"]
        ]

    def delete_index(self, name: str) -> dict[str, Any]:
        """Delete an index in the Marqo client.

        Args:
            name: The name of the index to delete.
        """

        return self.mq.delete_index(name)

    def get_indexes(self) -> list[str]:
        """Get a list of all indexes in the Marqo client.

        Returns:
            The list of all indexes.
        """

        return [index["index"] for index in self.mq.get_indexes()["results"]]

    def upsert_vector(
        self,
        vector: list[float],
        vector_id: Optional[str] = None,
        namespace: Optional[str] = None,
        meta: Optional[dict] = None,
        **kwargs,
    ) -> str:
        """Upsert a vector into the Marqo index.

        Args:
            vector: The vector to be indexed.
            vector_id: The ID for the vector. If None, Marqo will generate an ID.
            namespace: An optional namespace for the vector.
            meta: An optional dictionary of metadata for the vector.

        Raises:
            Exception: This function is not yet implemented.

        Returns:
            The ID of the vector that was added.
        """

        raise NotImplementedError(f"{self.__class__.__name__} does not support upserting a vector.")

    def delete_vector(self, vector_id: str):
        raise NotImplementedError(f"{self.__class__.__name__} does not support deletion.")

api_key: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

index: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

mq: Optional[marqo.Client] = field(default=Factory(lambda self: import_optional_dependency('marqo').Client(self.url, api_key=self.api_key), takes_self=True), kw_only=True) class-attribute instance-attribute

url: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

delete_index(name)

Delete an index in the Marqo client.

Parameters:

Name Type Description Default
name str

The name of the index to delete.

required
Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def delete_index(self, name: str) -> dict[str, Any]:
    """Delete an index in the Marqo client.

    Args:
        name: The name of the index to delete.
    """

    return self.mq.delete_index(name)

delete_vector(vector_id)

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def delete_vector(self, vector_id: str):
    raise NotImplementedError(f"{self.__class__.__name__} does not support deletion.")

get_indexes()

Get a list of all indexes in the Marqo client.

Returns:

Type Description
list[str]

The list of all indexes.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def get_indexes(self) -> list[str]:
    """Get a list of all indexes in the Marqo client.

    Returns:
        The list of all indexes.
    """

    return [index["index"] for index in self.mq.get_indexes()["results"]]

load_entries(namespace=None)

Load all document entries from the Marqo index.

Parameters:

Name Type Description Default
namespace Optional[str]

The namespace to filter entries by.

None

Returns:

Type Description
list[Entry]

The list of loaded Entries.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def load_entries(self, namespace: Optional[str] = None) -> list[BaseVectorStoreDriver.Entry]:
    """Load all document entries from the Marqo index.

    Args:
        namespace: The namespace to filter entries by.

    Returns:
        The list of loaded Entries.
    """

    filter_string = f"namespace:{namespace}" if namespace else None

    if filter_string is not None:
        results = self.mq.index(self.index).search("", limit=10000, filter_string=filter_string)
    else:
        results = self.mq.index(self.index).search("", limit=10000)

    # get all _id's from search results
    ids = [r["_id"] for r in results["hits"]]

    # get documents corresponding to the ids
    documents = self.mq.index(self.index).get_documents(document_ids=ids, expose_facets=True)

    # for each document, if it's found, create an Entry object
    entries = []
    for doc in documents["results"]:
        if doc["_found"]:
            entries.append(
                BaseVectorStoreDriver.Entry(
                    id=doc["_id"],
                    vector=doc["_tensor_facets"][0]["_embedding"],
                    meta={k: v for k, v in doc.items() if k not in ["_id", "_tensor_facets", "_found"]},
                    namespace=doc.get("namespace"),
                )
            )

    return entries

load_entry(vector_id, namespace=None)

Load a document entry from the Marqo index.

Parameters:

Name Type Description Default
vector_id str

The ID of the vector to load.

required
namespace Optional[str]

The namespace of the vector to load.

None

Returns:

Type Description
Optional[Entry]

The loaded Entry if found, otherwise None.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def load_entry(self, vector_id: str, namespace: Optional[str] = None) -> Optional[BaseVectorStoreDriver.Entry]:
    """Load a document entry from the Marqo index.

    Args:
        vector_id: The ID of the vector to load.
        namespace: The namespace of the vector to load.

    Returns:
        The loaded Entry if found, otherwise None.
    """
    result = self.mq.index(self.index).get_document(document_id=vector_id, expose_facets=True)

    if result and "_tensor_facets" in result and len(result["_tensor_facets"]) > 0:
        return BaseVectorStoreDriver.Entry(
            id=result["_id"],
            meta={k: v for k, v in result.items() if k not in ["_id"]},
            vector=result["_tensor_facets"][0]["_embedding"],
        )
    else:
        return None

query(query, count=None, namespace=None, include_vectors=False, include_metadata=True, **kwargs)

Query the Marqo index for documents.

Parameters:

Name Type Description Default
query str

The query string.

required
count Optional[int]

The maximum number of results to return.

None
namespace Optional[str]

The namespace to filter results by.

None
include_vectors bool

Whether to include vector data in the results.

False
include_metadata bool

Whether to include metadata in the results.

True

Returns:

Type Description
list[QueryResult]

The list of query results.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def query(
    self,
    query: str,
    count: Optional[int] = None,
    namespace: Optional[str] = None,
    include_vectors: bool = False,
    include_metadata: bool = True,
    **kwargs,
) -> list[BaseVectorStoreDriver.QueryResult]:
    """Query the Marqo index for documents.

    Args:
        query: The query string.
        count: The maximum number of results to return.
        namespace: The namespace to filter results by.
        include_vectors: Whether to include vector data in the results.
        include_metadata: Whether to include metadata in the results.

    Returns:
        The list of query results.
    """

    params = {
        "limit": count if count else BaseVectorStoreDriver.DEFAULT_QUERY_COUNT,
        "attributes_to_retrieve": ["*"] if include_metadata else ["_id"],
        "filter_string": f"namespace:{namespace}" if namespace else None,
    } | kwargs

    results = self.mq.index(self.index).search(query, **params)

    if include_vectors:
        results["hits"] = [
            {**r, **self.mq.index(self.index).get_document(r["_id"], expose_facets=True)} for r in results["hits"]
        ]

    return [
        BaseVectorStoreDriver.QueryResult(
            id=r["_id"],
            vector=r["_tensor_facets"][0]["_embedding"] if include_vectors else [],
            score=r["_score"],
            meta={k: v for k, v in r.items() if k not in ["_score", "_tensor_facets"]},
        )
        for r in results["hits"]
    ]

upsert_text(string, vector_id=None, namespace=None, meta=None, **kwargs)

Upsert a text document into the Marqo index.

Parameters:

Name Type Description Default
string str

The string to be indexed.

required
vector_id Optional[str]

The ID for the vector. If None, Marqo will generate an ID.

None
namespace Optional[str]

An optional namespace for the document.

None
meta Optional[dict]

An optional dictionary of metadata for the document.

None

Returns:

Name Type Description
str str

The ID of the document that was added.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def upsert_text(
    self,
    string: str,
    vector_id: Optional[str] = None,
    namespace: Optional[str] = None,
    meta: Optional[dict] = None,
    **kwargs,
) -> str:
    """Upsert a text document into the Marqo index.

    Args:
        string: The string to be indexed.
        vector_id: The ID for the vector. If None, Marqo will generate an ID.
        namespace: An optional namespace for the document.
        meta: An optional dictionary of metadata for the document.

    Returns:
        str: The ID of the document that was added.
    """

    doc = {"_id": vector_id, "Description": string}  # Description will be treated as tensor field

    # Non-tensor fields
    if meta:
        doc["meta"] = str(meta)
    if namespace:
        doc["namespace"] = namespace

    response = self.mq.index(self.index).add_documents([doc], tensor_fields=["Description"])
    if isinstance(response, dict) and "items" in response and response["items"]:
        return response["items"][0]["_id"]
    else:
        raise ValueError(f"Failed to upsert text: {response}")

upsert_text_artifact(artifact, namespace=None, meta=None, **kwargs)

Upsert a text artifact into the Marqo index.

Parameters:

Name Type Description Default
artifact TextArtifact

The text artifact to be indexed.

required
namespace Optional[str]

An optional namespace for the artifact.

None
meta Optional[dict]

An optional dictionary of metadata for the artifact.

None

Returns:

Name Type Description
str str

The ID of the artifact that was added.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def upsert_text_artifact(
    self, artifact: TextArtifact, namespace: Optional[str] = None, meta: Optional[dict] = None, **kwargs
) -> str:
    """Upsert a text artifact into the Marqo index.

    Args:
        artifact: The text artifact to be indexed.
        namespace: An optional namespace for the artifact.
        meta: An optional dictionary of metadata for the artifact.

    Returns:
        str: The ID of the artifact that was added.
    """

    artifact_json = artifact.to_json()

    doc = {
        "_id": artifact.id,
        "Description": artifact.value,  # Description will be treated as tensor field
        "artifact": str(artifact_json),
        "namespace": namespace,
    }

    response = self.mq.index(self.index).add_documents([doc], tensor_fields=["Description", "artifact"])
    if isinstance(response, dict) and "items" in response and response["items"]:
        return response["items"][0]["_id"]
    else:
        raise ValueError(f"Failed to upsert text: {response}")

upsert_vector(vector, vector_id=None, namespace=None, meta=None, **kwargs)

Upsert a vector into the Marqo index.

Parameters:

Name Type Description Default
vector list[float]

The vector to be indexed.

required
vector_id Optional[str]

The ID for the vector. If None, Marqo will generate an ID.

None
namespace Optional[str]

An optional namespace for the vector.

None
meta Optional[dict]

An optional dictionary of metadata for the vector.

None

Raises:

Type Description
Exception

This function is not yet implemented.

Returns:

Type Description
str

The ID of the vector that was added.

Source code in griptape/drivers/vector/marqo_vector_store_driver.py
def upsert_vector(
    self,
    vector: list[float],
    vector_id: Optional[str] = None,
    namespace: Optional[str] = None,
    meta: Optional[dict] = None,
    **kwargs,
) -> str:
    """Upsert a vector into the Marqo index.

    Args:
        vector: The vector to be indexed.
        vector_id: The ID for the vector. If None, Marqo will generate an ID.
        namespace: An optional namespace for the vector.
        meta: An optional dictionary of metadata for the vector.

    Raises:
        Exception: This function is not yet implemented.

    Returns:
        The ID of the vector that was added.
    """

    raise NotImplementedError(f"{self.__class__.__name__} does not support upserting a vector.")