Skip to content

loaders

__all__ = ['BaseLoader', 'BaseFileLoader', 'TextLoader', 'JsonLoader', 'PdfLoader', 'WebLoader', 'SqlLoader', 'CsvLoader', 'EmailLoader', 'ImageLoader', 'AudioLoader', 'BlobLoader'] module-attribute

AudioLoader

Bases: BaseFileLoader[AudioArtifact]

Loads audio content into audio artifacts.

Source code in griptape/loaders/audio_loader.py
@define
class AudioLoader(BaseFileLoader[AudioArtifact]):
    """Loads audio content into audio artifacts."""

    def try_parse(self, data: bytes) -> AudioArtifact:
        filetype_guess = filetype.guess(data)
        if filetype_guess is None:
            raise ValueError("Could not determine the file type of the audio data")
        return AudioArtifact(data, format=filetype_guess.extension)

try_parse(data)

Source code in griptape/loaders/audio_loader.py
def try_parse(self, data: bytes) -> AudioArtifact:
    filetype_guess = filetype.guess(data)
    if filetype_guess is None:
        raise ValueError("Could not determine the file type of the audio data")
    return AudioArtifact(data, format=filetype_guess.extension)

BaseFileLoader

Bases: BaseLoader[Union[str, PathLike], bytes, A], ABC

Source code in griptape/loaders/base_file_loader.py
@define
class BaseFileLoader(BaseLoader[Union[str, PathLike], bytes, A], ABC):
    file_manager_driver: BaseFileManagerDriver = field(
        default=Factory(lambda: LocalFileManagerDriver()),
        kw_only=True,
    )
    encoding: str = field(default="utf-8", kw_only=True)

    def fetch(self, source: str | PathLike) -> bytes:
        data = self.file_manager_driver.load_file(str(source)).value
        if isinstance(data, str):
            return data.encode(self.encoding)
        else:
            return data

    def save(self, destination: str | PathLike, artifact: A) -> None:
        """Saves the Artifact to a destination."""
        artifact.encoding = self.encoding
        self.file_manager_driver.save_file(str(destination), artifact.to_bytes())

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/base_file_loader.py
def fetch(self, source: str | PathLike) -> bytes:
    data = self.file_manager_driver.load_file(str(source)).value
    if isinstance(data, str):
        return data.encode(self.encoding)
    else:
        return data

save(destination, artifact)

Saves the Artifact to a destination.

Source code in griptape/loaders/base_file_loader.py
def save(self, destination: str | PathLike, artifact: A) -> None:
    """Saves the Artifact to a destination."""
    artifact.encoding = self.encoding
    self.file_manager_driver.save_file(str(destination), artifact.to_bytes())

BaseLoader

Bases: FuturesExecutorMixin, ABC, Generic[S, F, A]

Fetches data from a source, parses it, and returns an Artifact.

Attributes:

Name Type Description
reference Optional[Reference]

The optional Reference to set on the Artifact.

Source code in griptape/loaders/base_loader.py
@define
class BaseLoader(FuturesExecutorMixin, ABC, Generic[S, F, A]):
    """Fetches data from a source, parses it, and returns an Artifact.

    Attributes:
        reference: The optional `Reference` to set on the Artifact.
    """

    reference: Optional[Reference] = field(default=None, kw_only=True)

    def load(self, source: S) -> A:
        data = self.fetch(source)

        return self.parse(data)

    @abstractmethod
    def fetch(self, source: S) -> F:
        """Fetches data from the source."""

    ...

    def parse(self, data: F) -> A:
        """Parses the fetched data and returns an Artifact."""
        artifact = self.try_parse(data)

        artifact.reference = self.reference

        return artifact

    def try_parse(self, data: F) -> A:
        """Parses the fetched data and returns an Artifact."""
        # TODO: Mark as abstract method for griptape 2.0
        raise NotImplementedError()

    def load_collection(
        self,
        sources: list[Any],
    ) -> Mapping[str, A]:
        """Loads a collection of sources and returns a dictionary of Artifacts."""
        # Create a dictionary before actually submitting the jobs to the executor
        # to avoid duplicate work.
        sources_by_key = {self.to_key(source): source for source in sources}

        with self.create_futures_executor() as futures_executor:
            return execute_futures_dict(
                {
                    key: futures_executor.submit(with_contextvars(self.load), source)
                    for key, source in sources_by_key.items()
                },
            )

    def to_key(self, source: S) -> str:
        """Converts the source to a key for the collection."""
        if isinstance(source, bytes):
            return bytes_to_hash(source)
        else:
            return str_to_hash(str(source))

reference: Optional[Reference] = field(default=None, kw_only=True) class-attribute instance-attribute

fetch(source) abstractmethod

Fetches data from the source.

Source code in griptape/loaders/base_loader.py
@abstractmethod
def fetch(self, source: S) -> F:
    """Fetches data from the source."""

load(source)

Source code in griptape/loaders/base_loader.py
def load(self, source: S) -> A:
    data = self.fetch(source)

    return self.parse(data)

load_collection(sources)

Loads a collection of sources and returns a dictionary of Artifacts.

Source code in griptape/loaders/base_loader.py
def load_collection(
    self,
    sources: list[Any],
) -> Mapping[str, A]:
    """Loads a collection of sources and returns a dictionary of Artifacts."""
    # Create a dictionary before actually submitting the jobs to the executor
    # to avoid duplicate work.
    sources_by_key = {self.to_key(source): source for source in sources}

    with self.create_futures_executor() as futures_executor:
        return execute_futures_dict(
            {
                key: futures_executor.submit(with_contextvars(self.load), source)
                for key, source in sources_by_key.items()
            },
        )

parse(data)

Parses the fetched data and returns an Artifact.

Source code in griptape/loaders/base_loader.py
def parse(self, data: F) -> A:
    """Parses the fetched data and returns an Artifact."""
    artifact = self.try_parse(data)

    artifact.reference = self.reference

    return artifact

to_key(source)

Converts the source to a key for the collection.

Source code in griptape/loaders/base_loader.py
def to_key(self, source: S) -> str:
    """Converts the source to a key for the collection."""
    if isinstance(source, bytes):
        return bytes_to_hash(source)
    else:
        return str_to_hash(str(source))

try_parse(data)

Parses the fetched data and returns an Artifact.

Source code in griptape/loaders/base_loader.py
def try_parse(self, data: F) -> A:
    """Parses the fetched data and returns an Artifact."""
    # TODO: Mark as abstract method for griptape 2.0
    raise NotImplementedError()

BlobLoader

Bases: BaseFileLoader[BlobArtifact]

Source code in griptape/loaders/blob_loader.py
@define
class BlobLoader(BaseFileLoader[BlobArtifact]):
    def try_parse(self, data: bytes) -> BlobArtifact:
        if self.encoding is None:
            return BlobArtifact(data)
        else:
            return BlobArtifact(data, encoding=self.encoding)

try_parse(data)

Source code in griptape/loaders/blob_loader.py
def try_parse(self, data: bytes) -> BlobArtifact:
    if self.encoding is None:
        return BlobArtifact(data)
    else:
        return BlobArtifact(data, encoding=self.encoding)

CsvLoader

Bases: BaseFileLoader[ListArtifact[TextArtifact]]

Source code in griptape/loaders/csv_loader.py
@define
class CsvLoader(BaseFileLoader[ListArtifact[TextArtifact]]):
    delimiter: str = field(default=",", kw_only=True)
    encoding: str = field(default="utf-8", kw_only=True)
    format_row: Callable[[dict], str] = field(
        default=lambda value: "\n".join(f"{key}: {val}" for key, val in value.items()), kw_only=True
    )

    def try_parse(self, data: bytes) -> ListArtifact[TextArtifact]:
        reader = csv.DictReader(StringIO(data.decode(self.encoding)), delimiter=self.delimiter)

        return ListArtifact(
            [TextArtifact(self.format_row(row), meta={"row_num": row_num}) for row_num, row in enumerate(reader)]
        )

delimiter: str = field(default=',', kw_only=True) class-attribute instance-attribute

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

format_row: Callable[[dict], str] = field(default=lambda value: '\n'.join(f'{key}: {val}' for (key, val) in value.items()), kw_only=True) class-attribute instance-attribute

try_parse(data)

Source code in griptape/loaders/csv_loader.py
def try_parse(self, data: bytes) -> ListArtifact[TextArtifact]:
    reader = csv.DictReader(StringIO(data.decode(self.encoding)), delimiter=self.delimiter)

    return ListArtifact(
        [TextArtifact(self.format_row(row), meta={"row_num": row_num}) for row_num, row in enumerate(reader)]
    )

EmailLoader

Bases: BaseLoader['EmailLoader.EmailQuery', list[bytes], ListArtifact]

Source code in griptape/loaders/email_loader.py
@define
class EmailLoader(BaseLoader["EmailLoader.EmailQuery", list[bytes], ListArtifact]):  # pyright: ignore[reportGeneralTypeIssues]
    @define(frozen=True)
    class EmailQuery:
        """An email retrieval query.

        Attributes:
            label: Label to retrieve emails from such as 'INBOX' or 'SENT'.
            key: Optional key for filtering such as 'FROM' or 'SUBJECT'.
            search_criteria: Optional search criteria to filter emails by key.
            max_count: Optional max email count.
        """

        label: str = field(kw_only=True)
        key: Optional[str] = field(default=None, kw_only=True)
        search_criteria: Optional[str] = field(default=None, kw_only=True)
        max_count: Optional[int] = field(default=None, kw_only=True)

    imap_url: str = field(kw_only=True)
    username: str = field(kw_only=True)
    password: str = field(kw_only=True)

    def fetch(self, source: EmailLoader.EmailQuery) -> list[bytes]:
        label, key, search_criteria, max_count = astuple(source)

        mail_bytes = []
        with imaplib.IMAP4_SSL(self.imap_url) as client:
            client.login(self.username, self.password)

            mailbox = client.select(f'"{label}"', readonly=True)
            if mailbox[0] != "OK":
                raise Exception(mailbox[1][0].decode())  # pyright: ignore[reportOptionalMemberAccess] Unsure what mailbox[1][0] is, so leaving as-is

            if key and search_criteria:
                _typ, [message_numbers] = client.search(None, key, f'"{search_criteria}"')
                messages_count = self._count_messages(message_numbers)
            elif len(mailbox) > 1 and mailbox[1] and mailbox[1][0] is not None:
                messages_count = int(mailbox[1][0])
            else:
                raise Exception("unable to parse number of messages")

            top_n = max(0, messages_count - max_count) if max_count else 0
            for i in range(messages_count, top_n, -1):
                _result, data = client.fetch(str(i), "(RFC822)")

                if data is None or not data or data[0] is None:
                    continue

                mail_bytes.append(data[0][1])

            client.close()

        return mail_bytes

    def try_parse(self, data: list[bytes]) -> ListArtifact[TextArtifact]:
        mailparser = import_optional_dependency("mailparser")
        artifacts = []
        for byte in data:
            message = mailparser.parse_from_bytes(byte)

            # Note: mailparser only populates the text_plain field
            # if the message content type is explicitly set to 'text/plain'.
            if message.text_plain:
                artifacts.append(TextArtifact("\n".join(message.text_plain)))

        return ListArtifact(artifacts)

    def _count_messages(self, message_numbers: bytes) -> int:
        return len(list(filter(None, message_numbers.decode().split(" "))))

imap_url: str = field(kw_only=True) class-attribute instance-attribute

password: str = field(kw_only=True) class-attribute instance-attribute

username: str = field(kw_only=True) class-attribute instance-attribute

EmailQuery

An email retrieval query.

Attributes:

Name Type Description
label str

Label to retrieve emails from such as 'INBOX' or 'SENT'.

key Optional[str]

Optional key for filtering such as 'FROM' or 'SUBJECT'.

search_criteria Optional[str]

Optional search criteria to filter emails by key.

max_count Optional[int]

Optional max email count.

Source code in griptape/loaders/email_loader.py
@define(frozen=True)
class EmailQuery:
    """An email retrieval query.

    Attributes:
        label: Label to retrieve emails from such as 'INBOX' or 'SENT'.
        key: Optional key for filtering such as 'FROM' or 'SUBJECT'.
        search_criteria: Optional search criteria to filter emails by key.
        max_count: Optional max email count.
    """

    label: str = field(kw_only=True)
    key: Optional[str] = field(default=None, kw_only=True)
    search_criteria: Optional[str] = field(default=None, kw_only=True)
    max_count: Optional[int] = field(default=None, kw_only=True)
key: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute
label: str = field(kw_only=True) class-attribute instance-attribute
max_count: Optional[int] = field(default=None, kw_only=True) class-attribute instance-attribute
search_criteria: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/email_loader.py
def fetch(self, source: EmailLoader.EmailQuery) -> list[bytes]:
    label, key, search_criteria, max_count = astuple(source)

    mail_bytes = []
    with imaplib.IMAP4_SSL(self.imap_url) as client:
        client.login(self.username, self.password)

        mailbox = client.select(f'"{label}"', readonly=True)
        if mailbox[0] != "OK":
            raise Exception(mailbox[1][0].decode())  # pyright: ignore[reportOptionalMemberAccess] Unsure what mailbox[1][0] is, so leaving as-is

        if key and search_criteria:
            _typ, [message_numbers] = client.search(None, key, f'"{search_criteria}"')
            messages_count = self._count_messages(message_numbers)
        elif len(mailbox) > 1 and mailbox[1] and mailbox[1][0] is not None:
            messages_count = int(mailbox[1][0])
        else:
            raise Exception("unable to parse number of messages")

        top_n = max(0, messages_count - max_count) if max_count else 0
        for i in range(messages_count, top_n, -1):
            _result, data = client.fetch(str(i), "(RFC822)")

            if data is None or not data or data[0] is None:
                continue

            mail_bytes.append(data[0][1])

        client.close()

    return mail_bytes

try_parse(data)

Source code in griptape/loaders/email_loader.py
def try_parse(self, data: list[bytes]) -> ListArtifact[TextArtifact]:
    mailparser = import_optional_dependency("mailparser")
    artifacts = []
    for byte in data:
        message = mailparser.parse_from_bytes(byte)

        # Note: mailparser only populates the text_plain field
        # if the message content type is explicitly set to 'text/plain'.
        if message.text_plain:
            artifacts.append(TextArtifact("\n".join(message.text_plain)))

    return ListArtifact(artifacts)

ImageLoader

Bases: BaseFileLoader[ImageArtifact]

Loads images into image artifacts.

Attributes:

Name Type Description
format Optional[str]

If provided, attempts to ensure image artifacts are in this format when loaded. For example, when set to 'PNG', loading image.jpg will return an ImageArtifact containing the image bytes in PNG format.

Source code in griptape/loaders/image_loader.py
@define
class ImageLoader(BaseFileLoader[ImageArtifact]):
    """Loads images into image artifacts.

    Attributes:
        format: If provided, attempts to ensure image artifacts are in this format when loaded.
                For example, when set to 'PNG', loading image.jpg will return an ImageArtifact containing the image
                    bytes in PNG format.
    """

    format: Optional[str] = field(default=None, kw_only=True)

    def try_parse(self, data: bytes) -> ImageArtifact:
        pil_image = import_optional_dependency("PIL.Image")
        image = pil_image.open(BytesIO(data))

        # Normalize format only if requested.
        if self.format is not None:
            byte_stream = BytesIO()
            image.save(byte_stream, format=self.format)
            image = pil_image.open(byte_stream)
            data = byte_stream.getvalue()

        return ImageArtifact(data, format=image.format.lower(), width=image.width, height=image.height)

format: Optional[str] = field(default=None, kw_only=True) class-attribute instance-attribute

try_parse(data)

Source code in griptape/loaders/image_loader.py
def try_parse(self, data: bytes) -> ImageArtifact:
    pil_image = import_optional_dependency("PIL.Image")
    image = pil_image.open(BytesIO(data))

    # Normalize format only if requested.
    if self.format is not None:
        byte_stream = BytesIO()
        image.save(byte_stream, format=self.format)
        image = pil_image.open(byte_stream)
        data = byte_stream.getvalue()

    return ImageArtifact(data, format=image.format.lower(), width=image.width, height=image.height)

JsonLoader

Bases: BaseFileLoader[JsonArtifact]

Source code in griptape/loaders/json_loader.py
@define
class JsonLoader(BaseFileLoader[JsonArtifact]):
    def parse(self, data: bytes) -> JsonArtifact:
        return JsonArtifact(json.loads(data), encoding=self.encoding)

parse(data)

Source code in griptape/loaders/json_loader.py
def parse(self, data: bytes) -> JsonArtifact:
    return JsonArtifact(json.loads(data), encoding=self.encoding)

PdfLoader

Bases: BaseFileLoader

Source code in griptape/loaders/pdf_loader.py
@define
class PdfLoader(BaseFileLoader):
    def try_parse(
        self,
        data: bytes,
        *,
        password: Optional[str] = None,
    ) -> ListArtifact:
        pypdf = import_optional_dependency("pypdf")
        reader = pypdf.PdfReader(BytesIO(data), strict=True, password=password)
        pages = [TextArtifact(p.extract_text()) for p in reader.pages]

        return ListArtifact(pages)

try_parse(data, *, password=None)

Source code in griptape/loaders/pdf_loader.py
def try_parse(
    self,
    data: bytes,
    *,
    password: Optional[str] = None,
) -> ListArtifact:
    pypdf = import_optional_dependency("pypdf")
    reader = pypdf.PdfReader(BytesIO(data), strict=True, password=password)
    pages = [TextArtifact(p.extract_text()) for p in reader.pages]

    return ListArtifact(pages)

SqlLoader

Bases: BaseLoader[str, list[RowResult], ListArtifact[TextArtifact]]

Source code in griptape/loaders/sql_loader.py
@define
class SqlLoader(BaseLoader[str, list[BaseSqlDriver.RowResult], ListArtifact[TextArtifact]]):
    sql_driver: BaseSqlDriver = field(kw_only=True)
    format_row: Callable[[dict], str] = field(
        default=lambda value: "\n".join(f"{key}: {val}" for key, val in value.items()), kw_only=True
    )

    def fetch(self, source: str) -> list[BaseSqlDriver.RowResult]:
        return self.sql_driver.execute_query(source) or []

    def parse(self, data: list[BaseSqlDriver.RowResult]) -> ListArtifact[TextArtifact]:
        return ListArtifact([TextArtifact(self.format_row(row.cells)) for row in data])

format_row: Callable[[dict], str] = field(default=lambda value: '\n'.join(f'{key}: {val}' for (key, val) in value.items()), kw_only=True) class-attribute instance-attribute

sql_driver: BaseSqlDriver = field(kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/sql_loader.py
def fetch(self, source: str) -> list[BaseSqlDriver.RowResult]:
    return self.sql_driver.execute_query(source) or []

parse(data)

Source code in griptape/loaders/sql_loader.py
def parse(self, data: list[BaseSqlDriver.RowResult]) -> ListArtifact[TextArtifact]:
    return ListArtifact([TextArtifact(self.format_row(row.cells)) for row in data])

TextLoader

Bases: BaseFileLoader[TextArtifact]

Source code in griptape/loaders/text_loader.py
@define
class TextLoader(BaseFileLoader[TextArtifact]):
    encoding: str = field(default="utf-8", kw_only=True)

    def try_parse(self, data: str | bytes) -> TextArtifact:
        if isinstance(data, str):
            return TextArtifact(data, encoding=self.encoding)
        else:
            return TextArtifact(data.decode(self.encoding), encoding=self.encoding)

encoding: str = field(default='utf-8', kw_only=True) class-attribute instance-attribute

try_parse(data)

Source code in griptape/loaders/text_loader.py
def try_parse(self, data: str | bytes) -> TextArtifact:
    if isinstance(data, str):
        return TextArtifact(data, encoding=self.encoding)
    else:
        return TextArtifact(data.decode(self.encoding), encoding=self.encoding)

WebLoader

Bases: BaseLoader[str, str, TextArtifact]

Source code in griptape/loaders/web_loader.py
@define
class WebLoader(BaseLoader[str, str, TextArtifact]):
    web_scraper_driver: BaseWebScraperDriver = field(
        default=Factory(lambda: TrafilaturaWebScraperDriver()),
        kw_only=True,
    )

    def fetch(self, source: str) -> str:
        return self.web_scraper_driver.fetch_url(source)

    def try_parse(self, data: str) -> TextArtifact:
        return self.web_scraper_driver.extract_page(data)

web_scraper_driver: BaseWebScraperDriver = field(default=Factory(lambda: TrafilaturaWebScraperDriver()), kw_only=True) class-attribute instance-attribute

fetch(source)

Source code in griptape/loaders/web_loader.py
def fetch(self, source: str) -> str:
    return self.web_scraper_driver.fetch_url(source)

try_parse(data)

Source code in griptape/loaders/web_loader.py
def try_parse(self, data: str) -> TextArtifact:
    return self.web_scraper_driver.extract_page(data)