Skip to content

Stream

Stream

A wrapper for Structures that converts CompletionChunkEvents into an iterator of TextArtifacts.

It achieves this by running the Structure in a separate thread, listening for events from the Structure, and yielding those events.

See relevant Stack Overflow post: https://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators

Attributes:

Name Type Description
structure Structure

The Structure to wrap.

_event_queue Queue[BaseEvent]

A queue to hold events from the Structure.

Source code in griptape/utils/stream.py
@define
class Stream:
    """A wrapper for Structures that converts `CompletionChunkEvent`s into an iterator of TextArtifacts.

    It achieves this by running the Structure in a separate thread, listening for events from the Structure,
    and yielding those events.

    See relevant Stack Overflow post: https://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators

    Attributes:
        structure: The Structure to wrap.
        _event_queue: A queue to hold events from the Structure.
    """

    structure: Structure = field()

    @structure.validator  # pyright: ignore
    def validate_structure(self, _, structure: Structure):
        if structure and not structure.config.global_drivers.prompt_driver.stream:
            raise ValueError("prompt driver does not have streaming enabled, enable with stream=True")

    _event_queue: Queue[BaseEvent] = field(default=Factory(lambda: Queue()))

    def run(self, *args) -> Iterator[TextArtifact]:
        t = Thread(target=self._run_structure, args=args)
        t.start()

        while True:
            event = self._event_queue.get()
            if isinstance(event, FinishStructureRunEvent):
                break
            elif isinstance(event, FinishPromptEvent):
                yield TextArtifact(value="\n")
            elif isinstance(event, CompletionChunkEvent):
                yield TextArtifact(value=event.token)
        t.join()

    def _run_structure(self, *args):
        def event_handler(event: BaseEvent):
            self._event_queue.put(event)

        stream_event_listener = EventListener(
            handler=event_handler, event_types=[CompletionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]
        )
        self.structure.add_event_listener(stream_event_listener)

        self.structure.run(*args)

        self.structure.remove_event_listener(stream_event_listener)

structure: Structure = field() class-attribute instance-attribute

run(*args)

Source code in griptape/utils/stream.py
def run(self, *args) -> Iterator[TextArtifact]:
    t = Thread(target=self._run_structure, args=args)
    t.start()

    while True:
        event = self._event_queue.get()
        if isinstance(event, FinishStructureRunEvent):
            break
        elif isinstance(event, FinishPromptEvent):
            yield TextArtifact(value="\n")
        elif isinstance(event, CompletionChunkEvent):
            yield TextArtifact(value=event.token)
    t.join()

validate_structure(_, structure)

Source code in griptape/utils/stream.py
@structure.validator  # pyright: ignore
def validate_structure(self, _, structure: Structure):
    if structure and not structure.config.global_drivers.prompt_driver.stream:
        raise ValueError("prompt driver does not have streaming enabled, enable with stream=True")