Skip to content

Events

__all__ = ['BaseEvent', 'BaseTaskEvent', 'StartTaskEvent', 'FinishTaskEvent', 'BaseActionsSubtaskEvent', 'StartActionsSubtaskEvent', 'FinishActionsSubtaskEvent', 'BasePromptEvent', 'StartPromptEvent', 'FinishPromptEvent', 'StartStructureRunEvent', 'FinishStructureRunEvent', 'CompletionChunkEvent', 'EventListener', 'StartImageGenerationEvent', 'FinishImageGenerationEvent', 'StartImageQueryEvent', 'FinishImageQueryEvent'] module-attribute

BaseActionsSubtaskEvent

Bases: BaseTaskEvent, ABC

Source code in griptape/events/base_actions_subtask_event.py
@define
class BaseActionsSubtaskEvent(BaseTaskEvent, ABC):
    subtask_parent_task_id: Optional[str] = field(kw_only=True, metadata={"serializable": True})
    subtask_thought: Optional[str] = field(kw_only=True, metadata={"serializable": True})
    subtask_actions: Optional[list[dict]] = field(kw_only=True, metadata={"serializable": True})

    @classmethod
    def from_task(cls, task: BaseTask) -> BaseActionsSubtaskEvent:
        if not isinstance(task, ActionsSubtask):
            raise ValueError("Event must be of instance ActionSubtask.")
        return cls(
            task_id=task.id,
            task_parent_ids=task.parent_ids,
            task_child_ids=task.child_ids,
            task_input=task.input,
            task_output=task.output,
            subtask_parent_task_id=task.parent_task_id,
            subtask_thought=task.thought,
            subtask_actions=task.actions_to_dicts(),
        )

subtask_actions: Optional[list[dict]] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

subtask_parent_task_id: Optional[str] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

subtask_thought: Optional[str] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

from_task(task) classmethod

Source code in griptape/events/base_actions_subtask_event.py
@classmethod
def from_task(cls, task: BaseTask) -> BaseActionsSubtaskEvent:
    if not isinstance(task, ActionsSubtask):
        raise ValueError("Event must be of instance ActionSubtask.")
    return cls(
        task_id=task.id,
        task_parent_ids=task.parent_ids,
        task_child_ids=task.child_ids,
        task_input=task.input,
        task_output=task.output,
        subtask_parent_task_id=task.parent_task_id,
        subtask_thought=task.thought,
        subtask_actions=task.actions_to_dicts(),
    )

BaseEvent

Bases: SerializableMixin, ABC

Source code in griptape/events/base_event.py
@define
class BaseEvent(SerializableMixin, ABC):
    timestamp: float = field(default=Factory(lambda: time.time()), kw_only=True, metadata={"serializable": True})

timestamp: float = field(default=Factory(lambda: time.time()), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

BasePromptEvent

Bases: BaseEvent, ABC

Source code in griptape/events/base_prompt_event.py
@define
class BasePromptEvent(BaseEvent, ABC):
    model: str = field(kw_only=True, metadata={"serializable": True})
    token_count: int = field(kw_only=True, metadata={"serializable": True})

model: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

token_count: int = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

BaseTaskEvent

Bases: BaseEvent, ABC

Source code in griptape/events/base_task_event.py
@define
class BaseTaskEvent(BaseEvent, ABC):
    task_id: str = field(kw_only=True, metadata={"serializable": True})
    task_parent_ids: list[str] = field(kw_only=True, metadata={"serializable": True})
    task_child_ids: list[str] = field(kw_only=True, metadata={"serializable": True})

    task_input: Union[BaseArtifact, tuple[BaseArtifact, ...], tuple[BaseArtifact, Sequence[BaseArtifact]]] = field(
        kw_only=True, metadata={"serializable": True}
    )
    task_output: Optional[BaseArtifact] = field(kw_only=True, metadata={"serializable": True})

task_child_ids: list[str] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

task_id: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

task_input: Union[BaseArtifact, tuple[BaseArtifact, ...], tuple[BaseArtifact, Sequence[BaseArtifact]]] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

task_output: Optional[BaseArtifact] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

task_parent_ids: list[str] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

CompletionChunkEvent

Bases: BaseEvent

Source code in griptape/events/completion_chunk_event.py
6
7
8
@define
class CompletionChunkEvent(BaseEvent):
    token: str = field(kw_only=True, metadata={"serializable": True})

token: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

EventListener

Source code in griptape/events/event_listener.py
@define
class EventListener:
    handler: Callable[[BaseEvent], Optional[dict]] = field(default=Factory(lambda: lambda event: event.to_dict()))
    event_types: Optional[list[type[BaseEvent]]] = field(default=None, kw_only=True)
    driver: Optional[BaseEventListenerDriver] = field(default=None, kw_only=True)

    def publish_event(self, event: BaseEvent) -> None:
        event_types = self.event_types

        if event_types is None or type(event) in event_types:
            event_payload = self.handler(event)
            if self.driver is not None:
                if event_payload is not None and isinstance(event_payload, dict):
                    self.driver.publish_event(event_payload)
                else:
                    self.driver.publish_event(event)

driver: Optional[BaseEventListenerDriver] = field(default=None, kw_only=True) class-attribute instance-attribute

event_types: Optional[list[type[BaseEvent]]] = field(default=None, kw_only=True) class-attribute instance-attribute

handler: Callable[[BaseEvent], Optional[dict]] = field(default=Factory(lambda: lambda event: event.to_dict())) class-attribute instance-attribute

publish_event(event)

Source code in griptape/events/event_listener.py
def publish_event(self, event: BaseEvent) -> None:
    event_types = self.event_types

    if event_types is None or type(event) in event_types:
        event_payload = self.handler(event)
        if self.driver is not None:
            if event_payload is not None and isinstance(event_payload, dict):
                self.driver.publish_event(event_payload)
            else:
                self.driver.publish_event(event)

FinishActionsSubtaskEvent

Bases: BaseActionsSubtaskEvent

Source code in griptape/events/finish_actions_subtask_event.py
6
7
8
@define
class FinishActionsSubtaskEvent(BaseActionsSubtaskEvent):
    ...

FinishImageGenerationEvent

Bases: BaseImageGenerationEvent

Source code in griptape/events/finish_image_generation_event.py
@define
class FinishImageGenerationEvent(BaseImageGenerationEvent):
    ...

FinishImageQueryEvent

Bases: BaseImageQueryEvent

Source code in griptape/events/finish_image_query_event.py
6
7
8
@define
class FinishImageQueryEvent(BaseImageQueryEvent):
    result: str = field(kw_only=True, metadata={"serializable": True})

result: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

FinishPromptEvent

Bases: BasePromptEvent

Source code in griptape/events/finish_prompt_event.py
5
6
7
@define
class FinishPromptEvent(BasePromptEvent):
    result: str = field(kw_only=True, metadata={"serializable": True})

result: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

FinishStructureRunEvent

Bases: BaseEvent

Source code in griptape/events/finish_structure_run_event.py
@define
class FinishStructureRunEvent(BaseEvent):
    structure_id: Optional[str] = field(kw_only=True, default=None, metadata={"serializable": True})
    output_task_input: Union[
        BaseArtifact, tuple[BaseArtifact, ...], tuple[BaseArtifact, Sequence[BaseArtifact]]
    ] = field(kw_only=True, metadata={"serializable": True})
    output_task_output: Optional[BaseArtifact] = field(kw_only=True, metadata={"serializable": True})

output_task_input: Union[BaseArtifact, tuple[BaseArtifact, ...], tuple[BaseArtifact, Sequence[BaseArtifact]]] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

output_task_output: Optional[BaseArtifact] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

structure_id: Optional[str] = field(kw_only=True, default=None, metadata={'serializable': True}) class-attribute instance-attribute

FinishTaskEvent

Bases: BaseTaskEvent

Source code in griptape/events/finish_task_event.py
6
7
8
@define
class FinishTaskEvent(BaseTaskEvent):
    ...

StartActionsSubtaskEvent

Bases: BaseActionsSubtaskEvent

Source code in griptape/events/start_actions_subtask_event.py
6
7
8
@define
class StartActionsSubtaskEvent(BaseActionsSubtaskEvent):
    ...

StartImageGenerationEvent

Bases: BaseImageGenerationEvent

Source code in griptape/events/start_image_generation_event.py
@define
class StartImageGenerationEvent(BaseImageGenerationEvent):
    prompts: list[str] = field(kw_only=True, metadata={"serializable": True})
    negative_prompts: Optional[list[str]] = field(default=None, kw_only=True, metadata={"serializable": True})

negative_prompts: Optional[list[str]] = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

prompts: list[str] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

StartImageQueryEvent

Bases: BaseImageQueryEvent

Source code in griptape/events/start_image_query_event.py
6
7
8
9
@define
class StartImageQueryEvent(BaseImageQueryEvent):
    query: str = field(kw_only=True, metadata={"serializable": True})
    images_info: list[str] = field(kw_only=True, metadata={"serializable": True})

images_info: list[str] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

query: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

StartPromptEvent

Bases: BasePromptEvent

Source code in griptape/events/start_prompt_event.py
@define
class StartPromptEvent(BasePromptEvent):
    prompt_stack: PromptStack = field(kw_only=True, metadata={"serializable": True})
    prompt: str = field(kw_only=True, metadata={"serializable": True})

prompt: str = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

prompt_stack: PromptStack = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

StartStructureRunEvent

Bases: BaseEvent

Source code in griptape/events/start_structure_run_event.py
@define
class StartStructureRunEvent(BaseEvent):
    structure_id: Optional[str] = field(kw_only=True, default=None, metadata={"serializable": True})
    input_task_input: Union[
        BaseArtifact, tuple[BaseArtifact, ...], tuple[BaseArtifact, Sequence[BaseArtifact]]
    ] = field(kw_only=True, metadata={"serializable": True})
    input_task_output: Optional[BaseArtifact] = field(kw_only=True, metadata={"serializable": True})

input_task_input: Union[BaseArtifact, tuple[BaseArtifact, ...], tuple[BaseArtifact, Sequence[BaseArtifact]]] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

input_task_output: Optional[BaseArtifact] = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

structure_id: Optional[str] = field(kw_only=True, default=None, metadata={'serializable': True}) class-attribute instance-attribute

StartTaskEvent

Bases: BaseTaskEvent

Source code in griptape/events/start_task_event.py
6
7
8
@define
class StartTaskEvent(BaseTaskEvent):
    ...