Skip to content

loaders

__all__ = ['BaseLoader', 'BaseFileLoader', 'TextLoader', 'PdfLoader', 'WebLoader', 'SqlLoader', 'CsvLoader', 'EmailLoader', 'ImageLoader', 'AudioLoader', 'BlobLoader'] module-attribute

AudioLoader

Bases: BaseFileLoader[AudioArtifact]

Loads audio content into audio artifacts.

Source code in griptape/loaders/audio_loader.py
@define
class AudioLoader(BaseFileLoader[AudioArtifact]):
    """Loads audio content into audio artifacts."""

    def parse(self, data: bytes) -> AudioArtifact:
        return AudioArtifact(data, format=filetype.guess(data).extension)

parse(data)

Source code in griptape/loaders/audio_loader.py
def parse(self, data: bytes) -> AudioArtifact:
    return AudioArtifact(data, format=filetype.guess(data).extension)

BaseFileLoader

Bases: BaseLoader[Union[str, PathLike], bytes, A], ABC

Source code in griptape/loaders/base_file_loader.py
@define
class BaseFileLoader(BaseLoader[Union[str, PathLike], bytes, A], ABC):
    file_manager_driver: BaseFileManagerDriver = field(
        default=Factory(lambda: LocalFileManagerDriver(workdir=None)),
        kw_only=True,
    )
    encoding: str = field(default="utf-8", kw_only=True)

    def fetch(self, source: str | PathLike | bytes) -> bytes:
        if isinstance(source, bytes):
            deprecation_warn(
                "Using bytes as the source is deprecated and will be removed in a future release. "
                "Please use a string or PathLike object instead."
            )
            return source

        data = self.file_manager_driver.load_file(str(source)).value
        if isinstance(data, str):
            return data.encode(self.encoding)
        else:
            return data

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver(workdir=None)), kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/base_file_loader.py
def fetch(self, source: str | PathLike | bytes) -> bytes:
    if isinstance(source, bytes):
        deprecation_warn(
            "Using bytes as the source is deprecated and will be removed in a future release. "
            "Please use a string or PathLike object instead."
        )
        return source

    data = self.file_manager_driver.load_file(str(source)).value
    if isinstance(data, str):
        return data.encode(self.encoding)
    else:
        return data

BaseLoader

Bases: FuturesExecutorMixin, ABC, Generic[S, F, A]

Fetches data from a source, parses it, and returns an Artifact.

Attributes:

Name Type Description
reference Optional[Reference]

The optional Reference to set on the Artifact.

Source code in griptape/loaders/base_loader.py
@define
class BaseLoader(FuturesExecutorMixin, ABC, Generic[S, F, A]):
    """Fetches data from a source, parses it, and returns an Artifact.

    Attributes:
        reference: The optional `Reference` to set on the Artifact.
    """

    reference: Optional[Reference] = field(default=None, kw_only=True)

    def load(self, source: S) -> A:
        data = self.fetch(source)

        artifact = self.parse(data)

        artifact.reference = self.reference

        return artifact

    @abstractmethod
    def fetch(self, source: S) -> F:
        """Fetches data from the source."""

    ...

    @abstractmethod
    def parse(self, data: F) -> A:
        """Parses the fetched data and returns an Artifact."""

    ...

    def load_collection(
        self,
        sources: list[Any],
    ) -> Mapping[str, A]:
        """Loads a collection of sources and returns a dictionary of Artifacts."""
        # Create a dictionary before actually submitting the jobs to the executor
        # to avoid duplicate work.
        sources_by_key = {self.to_key(source): source for source in sources}

        return execute_futures_dict(
            {key: self.futures_executor.submit(self.load, source) for key, source in sources_by_key.items()},
        )

    def to_key(self, source: S) -> str:
        """Converts the source to a key for the collection."""
        if isinstance(source, bytes):
            return bytes_to_hash(source)
        else:
            return str_to_hash(str(source))

reference: Optional[Reference] = field(default=None, kw_only=True) class-attribute instance-attribute

fetch(source) abstractmethod

Fetches data from the source.

Source code in griptape/loaders/base_loader.py
@abstractmethod
def fetch(self, source: S) -> F:
    """Fetches data from the source."""

load(source)

Source code in griptape/loaders/base_loader.py
def load(self, source: S) -> A:
    data = self.fetch(source)

    artifact = self.parse(data)

    artifact.reference = self.reference

    return artifact

load_collection(sources)

Loads a collection of sources and returns a dictionary of Artifacts.

Source code in griptape/loaders/base_loader.py
def load_collection(
    self,
    sources: list[Any],
) -> Mapping[str, A]:
    """Loads a collection of sources and returns a dictionary of Artifacts."""
    # Create a dictionary before actually submitting the jobs to the executor
    # to avoid duplicate work.
    sources_by_key = {self.to_key(source): source for source in sources}

    return execute_futures_dict(
        {key: self.futures_executor.submit(self.load, source) for key, source in sources_by_key.items()},
    )

parse(data) abstractmethod

Parses the fetched data and returns an Artifact.

Source code in griptape/loaders/base_loader.py
@abstractmethod
def parse(self, data: F) -> A:
    """Parses the fetched data and returns an Artifact."""

to_key(source)

Converts the source to a key for the collection.

Source code in griptape/loaders/base_loader.py
def to_key(self, source: S) -> str:
    """Converts the source to a key for the collection."""
    if isinstance(source, bytes):
        return bytes_to_hash(source)
    else:
        return str_to_hash(str(source))

BlobLoader

Bases: BaseFileLoader[BlobArtifact]

Source code in griptape/loaders/blob_loader.py
@define
class BlobLoader(BaseFileLoader[BlobArtifact]):
    def parse(self, data: bytes) -> BlobArtifact:
        if self.encoding is None:
            return BlobArtifact(data)
        else:
            return BlobArtifact(data, encoding=self.encoding)

parse(data)

Source code in griptape/loaders/blob_loader.py
def parse(self, data: bytes) -> BlobArtifact:
    if self.encoding is None:
        return BlobArtifact(data)
    else:
        return BlobArtifact(data, encoding=self.encoding)

CsvLoader

Bases: BaseFileLoader[ListArtifact[TextArtifact]]

Source code in griptape/loaders/csv_loader.py
@define
class CsvLoader(BaseFileLoader[ListArtifact[TextArtifact]]):
    delimiter: str = field(default=",", kw_only=True)
    encoding: str = field(default="utf-8", kw_only=True)
    formatter_fn: Callable[[dict], str] = field(
        default=lambda value: "\n".join(f"{key}: {val}" for key, val in value.items()), kw_only=True
    )

    def parse(self, data: bytes) -> ListArtifact[TextArtifact]:
        reader = csv.DictReader(StringIO(data.decode(self.encoding)), delimiter=self.delimiter)

        return ListArtifact(
            [TextArtifact(self.formatter_fn(row), meta={"row_num": row_num}) for row_num, row in enumerate(reader)]
        )

delimiter: str = field(default=',', kw_only=True) class-attribute instance-attribute

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

formatter_fn: Callable[[dict], str] = field(default=lambda value: '\n'.join(f'{key}: {val}' for (key, val) in value.items()), kw_only=True) class-attribute instance-attribute

parse(data)

Source code in griptape/loaders/csv_loader.py
def parse(self, data: bytes) -> ListArtifact[TextArtifact]:
    reader = csv.DictReader(StringIO(data.decode(self.encoding)), delimiter=self.delimiter)

    return ListArtifact(
        [TextArtifact(self.formatter_fn(row), meta={"row_num": row_num}) for row_num, row in enumerate(reader)]
    )

EmailLoader

Bases: BaseLoader['EmailLoader.EmailQuery', list[bytes], ListArtifact]

Source code in griptape/loaders/email_loader.py
@define
class EmailLoader(BaseLoader["EmailLoader.EmailQuery", list[bytes], ListArtifact]):  # pyright: ignore[reportGeneralTypeIssues]
    @define(frozen=True)
    class EmailQuery:
        """An email retrieval query.

        Attributes:
            label: Label to retrieve emails from such as 'INBOX' or 'SENT'.
            key: Optional key for filtering such as 'FROM' or 'SUBJECT'.
            search_criteria: Optional search criteria to filter emails by key.
            max_count: Optional max email count.
        """

        label: str = field(kw_only=True)
        key: Optional[str] = field(default=None, kw_only=True)
        search_criteria: Optional[str] = field(default=None, kw_only=True)
        max_count: Optional[int] = field(default=None, kw_only=True)

    imap_url: str = field(kw_only=True)
    username: str = field(kw_only=True)
    password: str = field(kw_only=True)

    def fetch(self, source: EmailLoader.EmailQuery) -> list[bytes]:
        label, key, search_criteria, max_count = astuple(source)

        mail_bytes = []
        with imaplib.IMAP4_SSL(self.imap_url) as client:
            client.login(self.username, self.password)

            mailbox = client.select(f'"{label}"', readonly=True)
            if mailbox[0] != "OK":
                raise Exception(mailbox[1][0].decode())

            if key and search_criteria:
                _typ, [message_numbers] = client.search(None, key, f'"{search_criteria}"')
                messages_count = self._count_messages(message_numbers)
            elif len(mailbox) > 1 and mailbox[1] and mailbox[1][0] is not None:
                messages_count = int(mailbox[1][0])
            else:
                raise Exception("unable to parse number of messages")

            top_n = max(0, messages_count - max_count) if max_count else 0
            for i in range(messages_count, top_n, -1):
                _result, data = client.fetch(str(i), "(RFC822)")

                if data is None or not data or data[0] is None:
                    continue

                mail_bytes.append(data[0][1])

            client.close()

        return mail_bytes

    def parse(self, data: list[bytes]) -> ListArtifact[TextArtifact]:
        mailparser = import_optional_dependency("mailparser")
        artifacts = []
        for byte in data:
            message = mailparser.parse_from_bytes(byte)

            # Note: mailparser only populates the text_plain field
            # if the message content type is explicitly set to 'text/plain'.
            if message.text_plain:
                artifacts.append(TextArtifact("\n".join(message.text_plain)))

        return ListArtifact(artifacts)

    def _count_messages(self, message_numbers: bytes) -> int:
        return len(list(filter(None, message_numbers.decode().split(" "))))

imap_url: str = field(kw_only=True) class-attribute instance-attribute

password: str = field(kw_only=True) class-attribute instance-attribute

username: str = field(kw_only=True) class-attribute instance-attribute

EmailQuery

An email retrieval query.

Attributes:

Name Type Description
label str

Label to retrieve emails from such as 'INBOX' or 'SENT'.

key Optional[str]

Optional key for filtering such as 'FROM' or 'SUBJECT'.

search_criteria Optional[str]

Optional search criteria to filter emails by key.

max_count Optional[int]

Optional max email count.

Source code in griptape/loaders/email_loader.py
@define(frozen=True)
class EmailQuery:
    """An email retrieval query.

    Attributes:
        label: Label to retrieve emails from such as 'INBOX' or 'SENT'.
        key: Optional key for filtering such as 'FROM' or 'SUBJECT'.
        search_criteria: Optional search criteria to filter emails by key.
        max_count: Optional max email count.
    """

    label: str = field(kw_only=True)
    key: Optional[str] = field(default=None, kw_only=True)
    search_criteria: Optional[str] = field(default=None, kw_only=True)
    max_count: Optional[int] = field(default=None, kw_only=True)
key: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute
label: str = field(kw_only=True) class-attribute instance-attribute
max_count: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute
search_criteria: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/email_loader.py
def fetch(self, source: EmailLoader.EmailQuery) -> list[bytes]:
    label, key, search_criteria, max_count = astuple(source)

    mail_bytes = []
    with imaplib.IMAP4_SSL(self.imap_url) as client:
        client.login(self.username, self.password)

        mailbox = client.select(f'"{label}"', readonly=True)
        if mailbox[0] != "OK":
            raise Exception(mailbox[1][0].decode())

        if key and search_criteria:
            _typ, [message_numbers] = client.search(None, key, f'"{search_criteria}"')
            messages_count = self._count_messages(message_numbers)
        elif len(mailbox) > 1 and mailbox[1] and mailbox[1][0] is not None:
            messages_count = int(mailbox[1][0])
        else:
            raise Exception("unable to parse number of messages")

        top_n = max(0, messages_count - max_count) if max_count else 0
        for i in range(messages_count, top_n, -1):
            _result, data = client.fetch(str(i), "(RFC822)")

            if data is None or not data or data[0] is None:
                continue

            mail_bytes.append(data[0][1])

        client.close()

    return mail_bytes

parse(data)

Source code in griptape/loaders/email_loader.py
def parse(self, data: list[bytes]) -> ListArtifact[TextArtifact]:
    mailparser = import_optional_dependency("mailparser")
    artifacts = []
    for byte in data:
        message = mailparser.parse_from_bytes(byte)

        # Note: mailparser only populates the text_plain field
        # if the message content type is explicitly set to 'text/plain'.
        if message.text_plain:
            artifacts.append(TextArtifact("\n".join(message.text_plain)))

    return ListArtifact(artifacts)

ImageLoader

Bases: BaseFileLoader[ImageArtifact]

Loads images into image artifacts.

Attributes:

Name Type Description
format Optional[str]

If provided, attempts to ensure image artifacts are in this format when loaded. For example, when set to 'PNG', loading image.jpg will return an ImageArtifact containing the image bytes in PNG format.

Source code in griptape/loaders/image_loader.py
@define
class ImageLoader(BaseFileLoader[ImageArtifact]):
    """Loads images into image artifacts.

    Attributes:
        format: If provided, attempts to ensure image artifacts are in this format when loaded.
                For example, when set to 'PNG', loading image.jpg will return an ImageArtifact containing the image
                    bytes in PNG format.
    """

    format: Optional[str] = field(default=None, kw_only=True)

    def parse(self, data: bytes) -> ImageArtifact:
        pil_image = import_optional_dependency("PIL.Image")
        image = pil_image.open(BytesIO(data))

        # Normalize format only if requested.
        if self.format is not None:
            byte_stream = BytesIO()
            image.save(byte_stream, format=self.format)
            image = pil_image.open(byte_stream)
            data = byte_stream.getvalue()

        return ImageArtifact(data, format=image.format.lower(), width=image.width, height=image.height)

format: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

parse(data)

Source code in griptape/loaders/image_loader.py
def parse(self, data: bytes) -> ImageArtifact:
    pil_image = import_optional_dependency("PIL.Image")
    image = pil_image.open(BytesIO(data))

    # Normalize format only if requested.
    if self.format is not None:
        byte_stream = BytesIO()
        image.save(byte_stream, format=self.format)
        image = pil_image.open(byte_stream)
        data = byte_stream.getvalue()

    return ImageArtifact(data, format=image.format.lower(), width=image.width, height=image.height)

PdfLoader

Bases: BaseFileLoader

Source code in griptape/loaders/pdf_loader.py
@define
class PdfLoader(BaseFileLoader):
    def parse(
        self,
        data: bytes,
        *,
        password: Optional[str] = None,
    ) -> ListArtifact:
        pypdf = import_optional_dependency("pypdf")
        reader = pypdf.PdfReader(BytesIO(data), strict=True, password=password)
        pages = [TextArtifact(p.extract_text()) for p in reader.pages]

        return ListArtifact(pages)

parse(data, *, password=None)

Source code in griptape/loaders/pdf_loader.py
def parse(
    self,
    data: bytes,
    *,
    password: Optional[str] = None,
) -> ListArtifact:
    pypdf = import_optional_dependency("pypdf")
    reader = pypdf.PdfReader(BytesIO(data), strict=True, password=password)
    pages = [TextArtifact(p.extract_text()) for p in reader.pages]

    return ListArtifact(pages)

SqlLoader

Bases: BaseLoader[str, list[RowResult], ListArtifact[TextArtifact]]

Source code in griptape/loaders/sql_loader.py
@define
class SqlLoader(BaseLoader[str, list[BaseSqlDriver.RowResult], ListArtifact[TextArtifact]]):
    sql_driver: BaseSqlDriver = field(kw_only=True)
    formatter_fn: Callable[[dict], str] = field(
        default=lambda value: "\n".join(f"{key}: {val}" for key, val in value.items()), kw_only=True
    )

    def fetch(self, source: str) -> list[BaseSqlDriver.RowResult]:
        return self.sql_driver.execute_query(source) or []

    def parse(self, data: list[BaseSqlDriver.RowResult]) -> ListArtifact[TextArtifact]:
        return ListArtifact([TextArtifact(self.formatter_fn(row.cells)) for row in data])

formatter_fn: Callable[[dict], str] = field(default=lambda value: '\n'.join(f'{key}: {val}' for (key, val) in value.items()), kw_only=True) class-attribute instance-attribute

sql_driver: BaseSqlDriver = field(kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/sql_loader.py
def fetch(self, source: str) -> list[BaseSqlDriver.RowResult]:
    return self.sql_driver.execute_query(source) or []

parse(data)

Source code in griptape/loaders/sql_loader.py
def parse(self, data: list[BaseSqlDriver.RowResult]) -> ListArtifact[TextArtifact]:
    return ListArtifact([TextArtifact(self.formatter_fn(row.cells)) for row in data])

TextLoader

Bases: BaseFileLoader[TextArtifact]

Source code in griptape/loaders/text_loader.py
@define
class TextLoader(BaseFileLoader[TextArtifact]):
    encoding: str = field(default="utf-8", kw_only=True)

    def parse(self, data: str | bytes) -> TextArtifact:
        if isinstance(data, str):
            return TextArtifact(data, encoding=self.encoding)
        else:
            return TextArtifact(data.decode(self.encoding), encoding=self.encoding)

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

parse(data)

Source code in griptape/loaders/text_loader.py
def parse(self, data: str | bytes) -> TextArtifact:
    if isinstance(data, str):
        return TextArtifact(data, encoding=self.encoding)
    else:
        return TextArtifact(data.decode(self.encoding), encoding=self.encoding)

WebLoader

Bases: BaseLoader[str, str, TextArtifact]

Source code in griptape/loaders/web_loader.py
@define
class WebLoader(BaseLoader[str, str, TextArtifact]):
    web_scraper_driver: BaseWebScraperDriver = field(
        default=Factory(lambda: TrafilaturaWebScraperDriver()),
        kw_only=True,
    )

    def fetch(self, source: str) -> str:
        return self.web_scraper_driver.fetch_url(source)

    def parse(self, data: str) -> TextArtifact:
        return self.web_scraper_driver.extract_page(data)

web_scraper_driver: BaseWebScraperDriver = field(default=Factory(lambda: TrafilaturaWebScraperDriver()), kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/web_loader.py
def fetch(self, source: str) -> str:
    return self.web_scraper_driver.fetch_url(source)

parse(data)

Source code in griptape/loaders/web_loader.py
def parse(self, data: str) -> TextArtifact:
    return self.web_scraper_driver.extract_page(data)