A wrapper for Structures that converts BaseChunkEvent
s into an iterator of TextArtifacts.
It achieves this by running the Structure in a separate thread, listening for events from the Structure,
and yielding those events.
See relevant Stack Overflow post: https://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators
Attributes:
Source code in griptape/utils/stream.py
| @define
class Stream:
"""A wrapper for Structures that converts `BaseChunkEvent`s into an iterator of TextArtifacts.
It achieves this by running the Structure in a separate thread, listening for events from the Structure,
and yielding those events.
See relevant Stack Overflow post: https://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators
Attributes:
structure: The Structure to wrap.
_event_queue: A queue to hold events from the Structure.
"""
structure: Structure = field()
@structure.validator # pyright: ignore[reportAttributeAccessIssue]
def validate_structure(self, _: Attribute, structure: Structure) -> None:
from griptape.tasks import PromptTask
streaming_tasks = [
task for task in structure.tasks if isinstance(task, PromptTask) and task.prompt_driver.stream
]
if not streaming_tasks:
raise ValueError("Structure does not have any streaming tasks, enable with stream=True")
_event_queue: Queue[BaseEvent] = field(default=Factory(lambda: Queue()))
def run(self, *args) -> Iterator[TextArtifact]:
t = Thread(target=self._run_structure, args=args)
t.start()
action_str = ""
while True:
event = self._event_queue.get()
if isinstance(event, FinishStructureRunEvent):
break
elif isinstance(event, FinishPromptEvent):
yield TextArtifact(value="\n")
elif isinstance(event, TextChunkEvent):
yield TextArtifact(value=event.token)
elif isinstance(event, ActionChunkEvent):
if event.tag is not None and event.name is not None and event.path is not None:
yield TextArtifact(f"{event.name}.{event.tag} ({event.path})")
if event.partial_input is not None:
action_str += event.partial_input
try:
yield TextArtifact(json.dumps(json.loads(action_str), indent=2))
action_str = ""
except Exception:
pass
t.join()
def _run_structure(self, *args) -> None:
def event_handler(event: BaseEvent) -> None:
self._event_queue.put(event)
stream_event_listener = EventListener(
on_event=event_handler,
event_types=[BaseChunkEvent, FinishPromptEvent, FinishStructureRunEvent],
)
EventBus.add_event_listener(stream_event_listener)
self.structure.run(*args)
EventBus.remove_event_listener(stream_event_listener)
|
structure: Structure = field()
class-attribute
instance-attribute
run(*args)
Source code in griptape/utils/stream.py
| def run(self, *args) -> Iterator[TextArtifact]:
t = Thread(target=self._run_structure, args=args)
t.start()
action_str = ""
while True:
event = self._event_queue.get()
if isinstance(event, FinishStructureRunEvent):
break
elif isinstance(event, FinishPromptEvent):
yield TextArtifact(value="\n")
elif isinstance(event, TextChunkEvent):
yield TextArtifact(value=event.token)
elif isinstance(event, ActionChunkEvent):
if event.tag is not None and event.name is not None and event.path is not None:
yield TextArtifact(f"{event.name}.{event.tag} ({event.path})")
if event.partial_input is not None:
action_str += event.partial_input
try:
yield TextArtifact(json.dumps(json.loads(action_str), indent=2))
action_str = ""
except Exception:
pass
t.join()
|
validate_structure(_, structure)
Source code in griptape/utils/stream.py
| @structure.validator # pyright: ignore[reportAttributeAccessIssue]
def validate_structure(self, _: Attribute, structure: Structure) -> None:
from griptape.tasks import PromptTask
streaming_tasks = [
task for task in structure.tasks if isinstance(task, PromptTask) and task.prompt_driver.stream
]
if not streaming_tasks:
raise ValueError("Structure does not have any streaming tasks, enable with stream=True")
|