Skip to content

Loaders

__all__ = ['BaseLoader', 'BaseTextLoader', 'TextLoader', 'PdfLoader', 'WebLoader', 'SqlLoader', 'CsvLoader', 'DataFrameLoader', 'EmailLoader', 'ImageLoader', 'BlobLoader'] module-attribute

BaseLoader

Bases: ABC

Source code in griptape/loaders/base_loader.py
@define
class BaseLoader(ABC):
    futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True)
    encoding: Optional[str] = field(default=None, kw_only=True)

    @abstractmethod
    def load(self, source: Any, *args, **kwargs) -> BaseArtifact | Sequence[BaseArtifact]: ...

    def load_collection(
        self, sources: list[Any], *args, **kwargs
    ) -> Mapping[str, BaseArtifact | Sequence[BaseArtifact | Sequence[BaseArtifact]]]:
        # Create a dictionary before actually submitting the jobs to the executor
        # to avoid duplicate work.
        sources_by_key = {self.to_key(source): source for source in sources}
        return execute_futures_dict(
            {
                key: self.futures_executor.submit(self.load, source, *args, **kwargs)
                for key, source in sources_by_key.items()
            }
        )

    def to_key(self, source: Any, *args, **kwargs) -> str:
        if isinstance(source, bytes):
            return bytes_to_hash(source)
        elif isinstance(source, str):
            return str_to_hash(source)
        else:
            return str_to_hash(str(source))

encoding: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

futures_executor: futures.Executor = field(default=Factory(lambda: futures.ThreadPoolExecutor()), kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs) abstractmethod

Source code in griptape/loaders/base_loader.py
@abstractmethod
def load(self, source: Any, *args, **kwargs) -> BaseArtifact | Sequence[BaseArtifact]: ...

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/base_loader.py
def load_collection(
    self, sources: list[Any], *args, **kwargs
) -> Mapping[str, BaseArtifact | Sequence[BaseArtifact | Sequence[BaseArtifact]]]:
    # Create a dictionary before actually submitting the jobs to the executor
    # to avoid duplicate work.
    sources_by_key = {self.to_key(source): source for source in sources}
    return execute_futures_dict(
        {
            key: self.futures_executor.submit(self.load, source, *args, **kwargs)
            for key, source in sources_by_key.items()
        }
    )

to_key(source, *args, **kwargs)

Source code in griptape/loaders/base_loader.py
def to_key(self, source: Any, *args, **kwargs) -> str:
    if isinstance(source, bytes):
        return bytes_to_hash(source)
    elif isinstance(source, str):
        return str_to_hash(source)
    else:
        return str_to_hash(str(source))

BaseTextLoader

Bases: BaseLoader, ABC

Source code in griptape/loaders/base_text_loader.py
@define
class BaseTextLoader(BaseLoader, ABC):
    MAX_TOKEN_RATIO = 0.5

    tokenizer: OpenAiTokenizer = field(
        default=Factory(lambda: OpenAiTokenizer(model=OpenAiTokenizer.DEFAULT_OPENAI_GPT_3_CHAT_MODEL)), kw_only=True
    )
    max_tokens: int = field(
        default=Factory(lambda self: round(self.tokenizer.max_input_tokens * self.MAX_TOKEN_RATIO), takes_self=True),
        kw_only=True,
    )
    chunker: BaseChunker = field(
        default=Factory(
            lambda self: TextChunker(tokenizer=self.tokenizer, max_tokens=self.max_tokens), takes_self=True
        ),
        kw_only=True,
    )
    embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True)
    encoding: str = field(default="utf-8", kw_only=True)

    def load_collection(self, sources: list[Any], *args, **kwargs) -> dict[str, ErrorArtifact | list[TextArtifact]]:
        return cast(
            dict[str, Union[ErrorArtifact, list[TextArtifact]]], super().load_collection(sources, *args, **kwargs)
        )

    def _text_to_artifacts(self, text: str) -> list[TextArtifact]:
        artifacts = []

        if self.chunker:
            chunks = self.chunker.chunk(text)
        else:
            chunks = [TextArtifact(text)]

        if self.embedding_driver:
            for chunk in chunks:
                chunk.generate_embedding(self.embedding_driver)

        for chunk in chunks:
            chunk.encoding = self.encoding
            artifacts.append(chunk)

        return artifacts

MAX_TOKEN_RATIO = 0.5 class-attribute instance-attribute

chunker: BaseChunker = field(default=Factory(lambda self: TextChunker(tokenizer=self.tokenizer, max_tokens=self.max_tokens), takes_self=True), kw_only=True) class-attribute instance-attribute

embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True) class-attribute instance-attribute

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

max_tokens: int = field(default=Factory(lambda self: round(self.tokenizer.max_input_tokens * self.MAX_TOKEN_RATIO), takes_self=True), kw_only=True) class-attribute instance-attribute

tokenizer: OpenAiTokenizer = field(default=Factory(lambda: OpenAiTokenizer(model=OpenAiTokenizer.DEFAULT_OPENAI_GPT_3_CHAT_MODEL)), kw_only=True) class-attribute instance-attribute

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/base_text_loader.py
def load_collection(self, sources: list[Any], *args, **kwargs) -> dict[str, ErrorArtifact | list[TextArtifact]]:
    return cast(
        dict[str, Union[ErrorArtifact, list[TextArtifact]]], super().load_collection(sources, *args, **kwargs)
    )

BlobLoader

Bases: BaseLoader

Source code in griptape/loaders/blob_loader.py
@define
class BlobLoader(BaseLoader):
    def load(self, source: Any, *args, **kwargs) -> BlobArtifact | ErrorArtifact:
        if self.encoding is None:
            return BlobArtifact(source)
        else:
            return BlobArtifact(source, encoding=self.encoding)

    def load_collection(self, sources: list[bytes | str], *args, **kwargs) -> dict[str, BlobArtifact | ErrorArtifact]:
        return cast(dict[str, Union[BlobArtifact, ErrorArtifact]], super().load_collection(sources, *args, **kwargs))

load(source, *args, **kwargs)

Source code in griptape/loaders/blob_loader.py
def load(self, source: Any, *args, **kwargs) -> BlobArtifact | ErrorArtifact:
    if self.encoding is None:
        return BlobArtifact(source)
    else:
        return BlobArtifact(source, encoding=self.encoding)

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/blob_loader.py
def load_collection(self, sources: list[bytes | str], *args, **kwargs) -> dict[str, BlobArtifact | ErrorArtifact]:
    return cast(dict[str, Union[BlobArtifact, ErrorArtifact]], super().load_collection(sources, *args, **kwargs))

CsvLoader

Bases: BaseLoader

Source code in griptape/loaders/csv_loader.py
@define
class CsvLoader(BaseLoader):
    embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True)
    delimiter: str = field(default=",", kw_only=True)
    encoding: str = field(default="utf-8", kw_only=True)

    def load(self, source: bytes | str, *args, **kwargs) -> ErrorArtifact | list[CsvRowArtifact]:
        artifacts = []

        if isinstance(source, bytes):
            try:
                source = source.decode(encoding=self.encoding)
            except UnicodeDecodeError:
                return ErrorArtifact(f"Failed to decode bytes to string using encoding: {self.encoding}")
        elif isinstance(source, (bytearray, memoryview)):
            return ErrorArtifact(f"Unsupported source type: {type(source)}")

        reader = csv.DictReader(StringIO(source), delimiter=self.delimiter)
        chunks = [CsvRowArtifact(row) for row in reader]

        if self.embedding_driver:
            for chunk in chunks:
                chunk.generate_embedding(self.embedding_driver)

        for chunk in chunks:
            artifacts.append(chunk)

        return artifacts

    def load_collection(
        self, sources: list[bytes | str], *args, **kwargs
    ) -> dict[str, ErrorArtifact | list[CsvRowArtifact]]:
        return cast(
            dict[str, Union[ErrorArtifact, list[CsvRowArtifact]]], super().load_collection(sources, *args, **kwargs)
        )

delimiter: str = field(default=',', kw_only=True) class-attribute instance-attribute

embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True) class-attribute instance-attribute

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs)

Source code in griptape/loaders/csv_loader.py
def load(self, source: bytes | str, *args, **kwargs) -> ErrorArtifact | list[CsvRowArtifact]:
    artifacts = []

    if isinstance(source, bytes):
        try:
            source = source.decode(encoding=self.encoding)
        except UnicodeDecodeError:
            return ErrorArtifact(f"Failed to decode bytes to string using encoding: {self.encoding}")
    elif isinstance(source, (bytearray, memoryview)):
        return ErrorArtifact(f"Unsupported source type: {type(source)}")

    reader = csv.DictReader(StringIO(source), delimiter=self.delimiter)
    chunks = [CsvRowArtifact(row) for row in reader]

    if self.embedding_driver:
        for chunk in chunks:
            chunk.generate_embedding(self.embedding_driver)

    for chunk in chunks:
        artifacts.append(chunk)

    return artifacts

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/csv_loader.py
def load_collection(
    self, sources: list[bytes | str], *args, **kwargs
) -> dict[str, ErrorArtifact | list[CsvRowArtifact]]:
    return cast(
        dict[str, Union[ErrorArtifact, list[CsvRowArtifact]]], super().load_collection(sources, *args, **kwargs)
    )

DataFrameLoader

Bases: BaseLoader

Source code in griptape/loaders/dataframe_loader.py
@define
class DataFrameLoader(BaseLoader):
    embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True)

    def load(self, source: DataFrame, *args, **kwargs) -> list[CsvRowArtifact]:
        artifacts = []

        chunks = [CsvRowArtifact(row) for row in source.to_dict(orient="records")]

        if self.embedding_driver:
            for chunk in chunks:
                chunk.generate_embedding(self.embedding_driver)

        for chunk in chunks:
            artifacts.append(chunk)

        return artifacts

    def load_collection(self, sources: list[DataFrame], *args, **kwargs) -> dict[str, list[CsvRowArtifact]]:
        return cast(dict[str, list[CsvRowArtifact]], super().load_collection(sources, *args, **kwargs))

    def to_key(self, source: DataFrame, *args, **kwargs) -> str:
        hash_pandas_object = import_optional_dependency("pandas.core.util.hashing").hash_pandas_object

        return str_to_hash(str(hash_pandas_object(source, index=True).values))

embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs)

Source code in griptape/loaders/dataframe_loader.py
def load(self, source: DataFrame, *args, **kwargs) -> list[CsvRowArtifact]:
    artifacts = []

    chunks = [CsvRowArtifact(row) for row in source.to_dict(orient="records")]

    if self.embedding_driver:
        for chunk in chunks:
            chunk.generate_embedding(self.embedding_driver)

    for chunk in chunks:
        artifacts.append(chunk)

    return artifacts

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/dataframe_loader.py
def load_collection(self, sources: list[DataFrame], *args, **kwargs) -> dict[str, list[CsvRowArtifact]]:
    return cast(dict[str, list[CsvRowArtifact]], super().load_collection(sources, *args, **kwargs))

to_key(source, *args, **kwargs)

Source code in griptape/loaders/dataframe_loader.py
def to_key(self, source: DataFrame, *args, **kwargs) -> str:
    hash_pandas_object = import_optional_dependency("pandas.core.util.hashing").hash_pandas_object

    return str_to_hash(str(hash_pandas_object(source, index=True).values))

EmailLoader

Bases: BaseLoader

Source code in griptape/loaders/email_loader.py
@define
class EmailLoader(BaseLoader):
    @define(frozen=True)
    class EmailQuery:
        """An email retrieval query

        Attributes:
            label: Label to retrieve emails from such as 'INBOX' or 'SENT'.
            key: Optional key for filtering such as 'FROM' or 'SUBJECT'.
            search_criteria: Optional search criteria to filter emails by key.
            max_count: Optional max email count.
        """

        label: str = field(kw_only=True)
        key: Optional[str] = field(default=None, kw_only=True)
        search_criteria: Optional[str] = field(default=None, kw_only=True)
        max_count: Optional[int] = field(default=None, kw_only=True)

    imap_url: str = field(kw_only=True)
    username: str = field(kw_only=True)
    password: str = field(kw_only=True)

    def load(self, source: EmailQuery, *args, **kwargs) -> ListArtifact | ErrorArtifact:
        mailparser = import_optional_dependency("mailparser")
        label, key, search_criteria, max_count = astuple(source)

        artifacts = []
        try:
            with imaplib.IMAP4_SSL(self.imap_url) as client:
                client.login(self.username, self.password)

                mailbox = client.select(f'"{label}"', readonly=True)
                if mailbox[0] != "OK":
                    raise Exception(mailbox[1][0].decode())

                if key and search_criteria:
                    _typ, [message_numbers] = client.search(None, key, f'"{search_criteria}"')
                    messages_count = self._count_messages(message_numbers)
                elif len(mailbox) > 1 and mailbox[1] and mailbox[1][0] is not None:
                    messages_count = int(mailbox[1][0])
                else:
                    raise Exception("unable to parse number of messages")

                top_n = max(0, messages_count - max_count) if max_count else 0
                for i in range(messages_count, top_n, -1):
                    result, data = client.fetch(str(i), "(RFC822)")

                    if data is None or not data or data[0] is None:
                        continue

                    message = mailparser.parse_from_bytes(data[0][1])

                    # Note: mailparser only populates the text_plain field
                    # if the message content type is explicitly set to 'text/plain'.
                    if message.text_plain:
                        artifacts.append(TextArtifact("\n".join(message.text_plain)))

                client.close()

                return ListArtifact(artifacts)
        except Exception as e:
            logging.error(e)
            return ErrorArtifact(f"error retrieving email: {e}")

    def _count_messages(self, message_numbers: bytes):
        return len(list(filter(None, message_numbers.decode().split(" "))))

    def load_collection(self, sources: list[EmailQuery], *args, **kwargs) -> dict[str, ListArtifact | ErrorArtifact]:
        return cast(dict[str, Union[ListArtifact, ErrorArtifact]], super().load_collection(sources, *args, **kwargs))

imap_url: str = field(kw_only=True) class-attribute instance-attribute

password: str = field(kw_only=True) class-attribute instance-attribute

username: str = field(kw_only=True) class-attribute instance-attribute

EmailQuery

An email retrieval query

Attributes:

Name Type Description
label str

Label to retrieve emails from such as 'INBOX' or 'SENT'.

key Optional[str]

Optional key for filtering such as 'FROM' or 'SUBJECT'.

search_criteria Optional[str]

Optional search criteria to filter emails by key.

max_count Optional[int]

Optional max email count.

Source code in griptape/loaders/email_loader.py
@define(frozen=True)
class EmailQuery:
    """An email retrieval query

    Attributes:
        label: Label to retrieve emails from such as 'INBOX' or 'SENT'.
        key: Optional key for filtering such as 'FROM' or 'SUBJECT'.
        search_criteria: Optional search criteria to filter emails by key.
        max_count: Optional max email count.
    """

    label: str = field(kw_only=True)
    key: Optional[str] = field(default=None, kw_only=True)
    search_criteria: Optional[str] = field(default=None, kw_only=True)
    max_count: Optional[int] = field(default=None, kw_only=True)
key: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute
label: str = field(kw_only=True) class-attribute instance-attribute
max_count: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute
search_criteria: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs)

Source code in griptape/loaders/email_loader.py
def load(self, source: EmailQuery, *args, **kwargs) -> ListArtifact | ErrorArtifact:
    mailparser = import_optional_dependency("mailparser")
    label, key, search_criteria, max_count = astuple(source)

    artifacts = []
    try:
        with imaplib.IMAP4_SSL(self.imap_url) as client:
            client.login(self.username, self.password)

            mailbox = client.select(f'"{label}"', readonly=True)
            if mailbox[0] != "OK":
                raise Exception(mailbox[1][0].decode())

            if key and search_criteria:
                _typ, [message_numbers] = client.search(None, key, f'"{search_criteria}"')
                messages_count = self._count_messages(message_numbers)
            elif len(mailbox) > 1 and mailbox[1] and mailbox[1][0] is not None:
                messages_count = int(mailbox[1][0])
            else:
                raise Exception("unable to parse number of messages")

            top_n = max(0, messages_count - max_count) if max_count else 0
            for i in range(messages_count, top_n, -1):
                result, data = client.fetch(str(i), "(RFC822)")

                if data is None or not data or data[0] is None:
                    continue

                message = mailparser.parse_from_bytes(data[0][1])

                # Note: mailparser only populates the text_plain field
                # if the message content type is explicitly set to 'text/plain'.
                if message.text_plain:
                    artifacts.append(TextArtifact("\n".join(message.text_plain)))

            client.close()

            return ListArtifact(artifacts)
    except Exception as e:
        logging.error(e)
        return ErrorArtifact(f"error retrieving email: {e}")

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/email_loader.py
def load_collection(self, sources: list[EmailQuery], *args, **kwargs) -> dict[str, ListArtifact | ErrorArtifact]:
    return cast(dict[str, Union[ListArtifact, ErrorArtifact]], super().load_collection(sources, *args, **kwargs))

ImageLoader

Bases: BaseLoader

Loads images into image artifacts.

Attributes:

Name Type Description
format Optional[str]

If provided, attempts to ensure image artifacts are in this format when loaded. For example, when set to 'PNG', loading image.jpg will return an ImageArtifact containing the image bytes in PNG format.

Source code in griptape/loaders/image_loader.py
@define
class ImageLoader(BaseLoader):
    """Loads images into image artifacts.

    Attributes:
        format: If provided, attempts to ensure image artifacts are in this format when loaded.
                For example, when set to 'PNG', loading image.jpg will return an ImageArtifact containing the image
                    bytes in PNG format.
    """

    format: Optional[str] = field(default=None, kw_only=True)

    FORMAT_TO_MIME_TYPE = {
        "bmp": "image/bmp",
        "gif": "image/gif",
        "jpeg": "image/jpeg",
        "png": "image/png",
        "tiff": "image/tiff",
        "webp": "image/webp",
    }

    def load(self, source: bytes, *args, **kwargs) -> ImageArtifact:
        Image = import_optional_dependency("PIL.Image")
        image = Image.open(BytesIO(source))

        # Normalize format only if requested.
        if self.format is not None:
            byte_stream = BytesIO()
            image.save(byte_stream, format=self.format)
            image = Image.open(byte_stream)
            source = byte_stream.getvalue()

        image_artifact = ImageArtifact(source, format=image.format.lower(), width=image.width, height=image.height)

        return image_artifact

    def _get_mime_type(self, image_format: str | None) -> str:
        if image_format is None:
            raise ValueError("image_format is None")

        if image_format.lower() not in self.FORMAT_TO_MIME_TYPE:
            raise ValueError(f"Unsupported image format {image_format}")

        return self.FORMAT_TO_MIME_TYPE[image_format.lower()]

    def load_collection(self, sources: list[bytes], *args, **kwargs) -> dict[str, ImageArtifact]:
        return cast(dict[str, ImageArtifact], super().load_collection(sources, *args, **kwargs))

FORMAT_TO_MIME_TYPE = {'bmp': 'image/bmp', 'gif': 'image/gif', 'jpeg': 'image/jpeg', 'png': 'image/png', 'tiff': 'image/tiff', 'webp': 'image/webp'} class-attribute instance-attribute

format: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs)

Source code in griptape/loaders/image_loader.py
def load(self, source: bytes, *args, **kwargs) -> ImageArtifact:
    Image = import_optional_dependency("PIL.Image")
    image = Image.open(BytesIO(source))

    # Normalize format only if requested.
    if self.format is not None:
        byte_stream = BytesIO()
        image.save(byte_stream, format=self.format)
        image = Image.open(byte_stream)
        source = byte_stream.getvalue()

    image_artifact = ImageArtifact(source, format=image.format.lower(), width=image.width, height=image.height)

    return image_artifact

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/image_loader.py
def load_collection(self, sources: list[bytes], *args, **kwargs) -> dict[str, ImageArtifact]:
    return cast(dict[str, ImageArtifact], super().load_collection(sources, *args, **kwargs))

PdfLoader

Bases: BaseTextLoader

Source code in griptape/loaders/pdf_loader.py
@define
class PdfLoader(BaseTextLoader):
    chunker: PdfChunker = field(
        default=Factory(lambda self: PdfChunker(tokenizer=self.tokenizer, max_tokens=self.max_tokens), takes_self=True),
        kw_only=True,
    )
    encoding: None = field(default=None, kw_only=True)

    def load(
        self, source: bytes, password: Optional[str] = None, *args, **kwargs
    ) -> ErrorArtifact | list[TextArtifact]:
        PdfReader = import_optional_dependency("pypdf").PdfReader
        reader = PdfReader(BytesIO(source), strict=True, password=password)
        return self._text_to_artifacts("\n".join([p.extract_text() for p in reader.pages]))

    def load_collection(self, sources: list[bytes], *args, **kwargs) -> dict[str, ErrorArtifact | list[TextArtifact]]:
        return cast(
            dict[str, Union[ErrorArtifact, list[TextArtifact]]], super().load_collection(sources, *args, **kwargs)
        )

chunker: PdfChunker = field(default=Factory(lambda self: PdfChunker(tokenizer=self.tokenizer, max_tokens=self.max_tokens), takes_self=True), kw_only=True) class-attribute instance-attribute

encoding: None = field(default=None, kw_only=True) class-attribute instance-attribute

load(source, password=None, *args, **kwargs)

Source code in griptape/loaders/pdf_loader.py
def load(
    self, source: bytes, password: Optional[str] = None, *args, **kwargs
) -> ErrorArtifact | list[TextArtifact]:
    PdfReader = import_optional_dependency("pypdf").PdfReader
    reader = PdfReader(BytesIO(source), strict=True, password=password)
    return self._text_to_artifacts("\n".join([p.extract_text() for p in reader.pages]))

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/pdf_loader.py
def load_collection(self, sources: list[bytes], *args, **kwargs) -> dict[str, ErrorArtifact | list[TextArtifact]]:
    return cast(
        dict[str, Union[ErrorArtifact, list[TextArtifact]]], super().load_collection(sources, *args, **kwargs)
    )

SqlLoader

Bases: BaseLoader

Source code in griptape/loaders/sql_loader.py
@define
class SqlLoader(BaseLoader):
    sql_driver: BaseSqlDriver = field(kw_only=True)
    embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True)

    def load(self, source: str, *args, **kwargs) -> list[CsvRowArtifact]:
        rows = self.sql_driver.execute_query(source)
        artifacts = []

        if rows:
            chunks = [CsvRowArtifact(row.cells) for row in rows]
        else:
            chunks = []

        if self.embedding_driver:
            for chunk in chunks:
                chunk.generate_embedding(self.embedding_driver)

        for chunk in chunks:
            artifacts.append(chunk)

        return artifacts

    def load_collection(self, sources: list[str], *args, **kwargs) -> dict[str, list[CsvRowArtifact]]:
        return cast(dict[str, list[CsvRowArtifact]], super().load_collection(sources, *args, **kwargs))

embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True) class-attribute instance-attribute

sql_driver: BaseSqlDriver = field(kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs)

Source code in griptape/loaders/sql_loader.py
def load(self, source: str, *args, **kwargs) -> list[CsvRowArtifact]:
    rows = self.sql_driver.execute_query(source)
    artifacts = []

    if rows:
        chunks = [CsvRowArtifact(row.cells) for row in rows]
    else:
        chunks = []

    if self.embedding_driver:
        for chunk in chunks:
            chunk.generate_embedding(self.embedding_driver)

    for chunk in chunks:
        artifacts.append(chunk)

    return artifacts

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/sql_loader.py
def load_collection(self, sources: list[str], *args, **kwargs) -> dict[str, list[CsvRowArtifact]]:
    return cast(dict[str, list[CsvRowArtifact]], super().load_collection(sources, *args, **kwargs))

TextLoader

Bases: BaseTextLoader

Source code in griptape/loaders/text_loader.py
@define
class TextLoader(BaseTextLoader):
    MAX_TOKEN_RATIO = 0.5

    tokenizer: OpenAiTokenizer = field(
        default=Factory(lambda: OpenAiTokenizer(model=OpenAiTokenizer.DEFAULT_OPENAI_GPT_3_CHAT_MODEL)), kw_only=True
    )
    max_tokens: int = field(
        default=Factory(lambda self: round(self.tokenizer.max_input_tokens * self.MAX_TOKEN_RATIO), takes_self=True),
        kw_only=True,
    )
    chunker: TextChunker = field(
        default=Factory(
            lambda self: TextChunker(tokenizer=self.tokenizer, max_tokens=self.max_tokens), takes_self=True
        ),
        kw_only=True,
    )
    embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True)
    encoding: str = field(default="utf-8", kw_only=True)

    def load(self, source: bytes | str, *args, **kwargs) -> ErrorArtifact | list[TextArtifact]:
        if isinstance(source, bytes):
            try:
                source = source.decode(encoding=self.encoding)
            except UnicodeDecodeError:
                return ErrorArtifact(f"Failed to decode bytes to string using encoding: {self.encoding}")
        elif isinstance(source, (bytearray, memoryview)):
            return ErrorArtifact(f"Unsupported source type: {type(source)}")

        return self._text_to_artifacts(source)

    def load_collection(
        self, sources: list[bytes | str], *args, **kwargs
    ) -> dict[str, ErrorArtifact | list[TextArtifact]]:
        return cast(
            dict[str, Union[ErrorArtifact, list[TextArtifact]]], super().load_collection(sources, *args, **kwargs)
        )

MAX_TOKEN_RATIO = 0.5 class-attribute instance-attribute

chunker: TextChunker = field(default=Factory(lambda self: TextChunker(tokenizer=self.tokenizer, max_tokens=self.max_tokens), takes_self=True), kw_only=True) class-attribute instance-attribute

embedding_driver: Optional[BaseEmbeddingDriver] = field(default=None, kw_only=True) class-attribute instance-attribute

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

max_tokens: int = field(default=Factory(lambda self: round(self.tokenizer.max_input_tokens * self.MAX_TOKEN_RATIO), takes_self=True), kw_only=True) class-attribute instance-attribute

tokenizer: OpenAiTokenizer = field(default=Factory(lambda: OpenAiTokenizer(model=OpenAiTokenizer.DEFAULT_OPENAI_GPT_3_CHAT_MODEL)), kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs)

Source code in griptape/loaders/text_loader.py
def load(self, source: bytes | str, *args, **kwargs) -> ErrorArtifact | list[TextArtifact]:
    if isinstance(source, bytes):
        try:
            source = source.decode(encoding=self.encoding)
        except UnicodeDecodeError:
            return ErrorArtifact(f"Failed to decode bytes to string using encoding: {self.encoding}")
    elif isinstance(source, (bytearray, memoryview)):
        return ErrorArtifact(f"Unsupported source type: {type(source)}")

    return self._text_to_artifacts(source)

load_collection(sources, *args, **kwargs)

Source code in griptape/loaders/text_loader.py
def load_collection(
    self, sources: list[bytes | str], *args, **kwargs
) -> dict[str, ErrorArtifact | list[TextArtifact]]:
    return cast(
        dict[str, Union[ErrorArtifact, list[TextArtifact]]], super().load_collection(sources, *args, **kwargs)
    )

WebLoader

Bases: BaseTextLoader

Source code in griptape/loaders/web_loader.py
@define
class WebLoader(BaseTextLoader):
    web_scraper_driver: BaseWebScraperDriver = field(
        default=Factory(lambda: TrafilaturaWebScraperDriver()), kw_only=True
    )

    def load(self, source: str, *args, **kwargs) -> ErrorArtifact | list[TextArtifact]:
        single_chunk_text_artifact = self.web_scraper_driver.scrape_url(source)
        return self._text_to_artifacts(single_chunk_text_artifact.value)

web_scraper_driver: BaseWebScraperDriver = field(default=Factory(lambda: TrafilaturaWebScraperDriver()), kw_only=True) class-attribute instance-attribute

load(source, *args, **kwargs)

Source code in griptape/loaders/web_loader.py
def load(self, source: str, *args, **kwargs) -> ErrorArtifact | list[TextArtifact]:
    single_chunk_text_artifact = self.web_scraper_driver.scrape_url(source)
    return self._text_to_artifacts(single_chunk_text_artifact.value)