Skip to content

stream

Stream

A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts.

Attributes:

Name Type Description
structure Structure

The Structure to wrap.

Source code in griptape/utils/stream.py
@define
class Stream:
    """A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts.

    Attributes:
        structure: The Structure to wrap.
    """

    structure: Structure = field()

    def run(self, *args) -> Iterator[TextArtifact]:
        action_str = ""

        for event in self.structure.run_stream(
            *args, event_types=[TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]
        ):
            if isinstance(event, FinishStructureRunEvent):
                break
            elif isinstance(event, FinishPromptEvent):
                yield TextArtifact(value="\n")
            elif isinstance(event, TextChunkEvent):
                yield TextArtifact(value=event.token)
            elif isinstance(event, ActionChunkEvent):
                if event.tag is not None and event.name is not None and event.path is not None:
                    yield TextArtifact(f"{event.name}.{event.tag} ({event.path})")
                if event.partial_input is not None:
                    action_str += event.partial_input
                    try:
                        yield TextArtifact(json.dumps(json.loads(action_str), indent=2))
                        action_str = ""
                    except Exception:
                        pass

structure: Structure = field() class-attribute instance-attribute

run(*args)

Source code in griptape/utils/stream.py
def run(self, *args) -> Iterator[TextArtifact]:
    action_str = ""

    for event in self.structure.run_stream(
        *args, event_types=[TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]
    ):
        if isinstance(event, FinishStructureRunEvent):
            break
        elif isinstance(event, FinishPromptEvent):
            yield TextArtifact(value="\n")
        elif isinstance(event, TextChunkEvent):
            yield TextArtifact(value=event.token)
        elif isinstance(event, ActionChunkEvent):
            if event.tag is not None and event.name is not None and event.path is not None:
                yield TextArtifact(f"{event.name}.{event.tag} ({event.path})")
            if event.partial_input is not None:
                action_str += event.partial_input
                try:
                    yield TextArtifact(json.dumps(json.loads(action_str), indent=2))
                    action_str = ""
                except Exception:
                    pass