Skip to content

Events

__all__ = ['BaseEvent', 'BaseTaskEvent', 'BaseActionSubtaskEvent', 'StartTaskEvent', 'FinishTaskEvent', 'StartActionSubtaskEvent', 'FinishActionSubtaskEvent', 'BasePromptEvent', 'StartPromptEvent', 'FinishPromptEvent', 'StartStructureRunEvent', 'FinishStructureRunEvent', 'CompletionChunkEvent', 'EventListener', 'StartImageGenerationEvent', 'FinishImageGenerationEvent'] module-attribute

BaseActionSubtaskEvent

Bases: BaseTaskEvent, ABC

Source code in griptape/griptape/events/base_action_subtask_event.py
@define
class BaseActionSubtaskEvent(BaseTaskEvent, ABC):
    subtask_parent_task_id: str | None = field(kw_only=True)
    subtask_thought: str | None = field(kw_only=True)
    subtask_action_name: str | None = field(kw_only=True)
    subtask_action_path: str | None = field(kw_only=True)
    subtask_action_input: dict | None = field(kw_only=True)

    @classmethod
    def from_task(cls, task: ActionSubtask) -> BaseActionSubtaskEvent:
        return cls(
            task_id=task.id,
            task_parent_ids=task.parent_ids,
            task_child_ids=task.child_ids,
            task_input=task.input,
            task_output=task.output,
            subtask_parent_task_id=task.parent_task_id,
            subtask_thought=task.thought,
            subtask_action_name=task.action_name,
            subtask_action_path=task.action_path,
            subtask_action_input=task.action_input,
        )

subtask_action_input: dict | None = field(kw_only=True) class-attribute instance-attribute

subtask_action_name: str | None = field(kw_only=True) class-attribute instance-attribute

subtask_action_path: str | None = field(kw_only=True) class-attribute instance-attribute

subtask_parent_task_id: str | None = field(kw_only=True) class-attribute instance-attribute

subtask_thought: str | None = field(kw_only=True) class-attribute instance-attribute

from_task(task) classmethod

Source code in griptape/griptape/events/base_action_subtask_event.py
@classmethod
def from_task(cls, task: ActionSubtask) -> BaseActionSubtaskEvent:
    return cls(
        task_id=task.id,
        task_parent_ids=task.parent_ids,
        task_child_ids=task.child_ids,
        task_input=task.input,
        task_output=task.output,
        subtask_parent_task_id=task.parent_task_id,
        subtask_thought=task.thought,
        subtask_action_name=task.action_name,
        subtask_action_path=task.action_path,
        subtask_action_input=task.action_input,
    )

BaseEvent

Bases: ABC

Source code in griptape/griptape/events/base_event.py
@define
class BaseEvent(ABC):
    timestamp: float = field(default=Factory(lambda: time.time()), kw_only=True)
    type: str = field(default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True)

    @classmethod
    def from_dict(cls, event_dict: dict) -> BaseEvent:
        from griptape.schemas import (
            StartPromptEventSchema,
            FinishPromptEventSchema,
            StartTaskEventSchema,
            FinishTaskEventSchema,
            StartActionSubtaskEventSchema,
            FinishActionSubtaskEventSchema,
            StartStructureRunEventSchema,
            FinishStructureRunEventSchema,
            CompletionChunkEventSchema,
        )

        class_registry.register("StartPromptEvent", StartPromptEventSchema)
        class_registry.register("FinishPromptEvent", FinishPromptEventSchema)
        class_registry.register("StartTaskEvent", StartTaskEventSchema)
        class_registry.register("FinishTaskEvent", FinishTaskEventSchema)
        class_registry.register("StartActionSubtaskEvent", StartActionSubtaskEventSchema)
        class_registry.register("FinishActionSubtaskEvent", FinishActionSubtaskEventSchema)
        class_registry.register("StartStructureRunEvent", StartStructureRunEventSchema)
        class_registry.register("FinishStructureRunEvent", FinishStructureRunEventSchema)
        class_registry.register("CompletionChunkEvent", CompletionChunkEventSchema)

        try:
            return class_registry.get_class(event_dict["type"])().load(event_dict)
        except RegistryError:
            raise ValueError("Unsupported event type")

    @classmethod
    def from_json(cls, artifact_str: str) -> BaseEvent:
        return cls.from_dict(json.loads(artifact_str))

    def __str__(self) -> str:
        return json.dumps(self.to_dict())

    def to_json(self) -> str:
        return json.dumps(self.to_dict())

    @abstractmethod
    def to_dict(self) -> dict:
        ...

timestamp: float = field(default=Factory(lambda : time.time()), kw_only=True) class-attribute instance-attribute

type: str = field(default=Factory(lambda : self.__class__.__name__, takes_self=True), kw_only=True) class-attribute instance-attribute

__str__()

Source code in griptape/griptape/events/base_event.py
def __str__(self) -> str:
    return json.dumps(self.to_dict())

from_dict(event_dict) classmethod

Source code in griptape/griptape/events/base_event.py
@classmethod
def from_dict(cls, event_dict: dict) -> BaseEvent:
    from griptape.schemas import (
        StartPromptEventSchema,
        FinishPromptEventSchema,
        StartTaskEventSchema,
        FinishTaskEventSchema,
        StartActionSubtaskEventSchema,
        FinishActionSubtaskEventSchema,
        StartStructureRunEventSchema,
        FinishStructureRunEventSchema,
        CompletionChunkEventSchema,
    )

    class_registry.register("StartPromptEvent", StartPromptEventSchema)
    class_registry.register("FinishPromptEvent", FinishPromptEventSchema)
    class_registry.register("StartTaskEvent", StartTaskEventSchema)
    class_registry.register("FinishTaskEvent", FinishTaskEventSchema)
    class_registry.register("StartActionSubtaskEvent", StartActionSubtaskEventSchema)
    class_registry.register("FinishActionSubtaskEvent", FinishActionSubtaskEventSchema)
    class_registry.register("StartStructureRunEvent", StartStructureRunEventSchema)
    class_registry.register("FinishStructureRunEvent", FinishStructureRunEventSchema)
    class_registry.register("CompletionChunkEvent", CompletionChunkEventSchema)

    try:
        return class_registry.get_class(event_dict["type"])().load(event_dict)
    except RegistryError:
        raise ValueError("Unsupported event type")

from_json(artifact_str) classmethod

Source code in griptape/griptape/events/base_event.py
@classmethod
def from_json(cls, artifact_str: str) -> BaseEvent:
    return cls.from_dict(json.loads(artifact_str))

to_dict() abstractmethod

Source code in griptape/griptape/events/base_event.py
@abstractmethod
def to_dict(self) -> dict:
    ...

to_json()

Source code in griptape/griptape/events/base_event.py
def to_json(self) -> str:
    return json.dumps(self.to_dict())

BasePromptEvent

Bases: BaseEvent, ABC

Source code in griptape/griptape/events/base_prompt_event.py
7
8
9
@define
class BasePromptEvent(BaseEvent, ABC):
    token_count: int = field(kw_only=True)

token_count: int = field(kw_only=True) class-attribute instance-attribute

BaseTaskEvent

Bases: BaseEvent, ABC

Source code in griptape/griptape/events/base_task_event.py
@define
class BaseTaskEvent(BaseEvent, ABC):
    task_id: str = field(kw_only=True)
    task_parent_ids: list[str] = field(kw_only=True)
    task_child_ids: list[str] = field(kw_only=True)

    task_input: BaseArtifact = field(kw_only=True)
    task_output: BaseArtifact | None = field(kw_only=True)

    @classmethod
    def from_task(cls, task: BaseTask) -> BaseTaskEvent:
        return cls(
            task_id=task.id,
            task_parent_ids=task.parent_ids,
            task_child_ids=task.child_ids,
            task_input=task.input,
            task_output=task.output,
        )

task_child_ids: list[str] = field(kw_only=True) class-attribute instance-attribute

task_id: str = field(kw_only=True) class-attribute instance-attribute

task_input: BaseArtifact = field(kw_only=True) class-attribute instance-attribute

task_output: BaseArtifact | None = field(kw_only=True) class-attribute instance-attribute

task_parent_ids: list[str] = field(kw_only=True) class-attribute instance-attribute

from_task(task) classmethod

Source code in griptape/griptape/events/base_task_event.py
@classmethod
def from_task(cls, task: BaseTask) -> BaseTaskEvent:
    return cls(
        task_id=task.id,
        task_parent_ids=task.parent_ids,
        task_child_ids=task.child_ids,
        task_input=task.input,
        task_output=task.output,
    )

CompletionChunkEvent

Bases: BaseEvent

Source code in griptape/griptape/events/completion_chunk_event.py
@define
class CompletionChunkEvent(BaseEvent):
    token: str = field(kw_only=True)

    def to_dict(self):
        from griptape.schemas import CompletionChunkEventSchema

        return dict(CompletionChunkEventSchema().dump(self))

token: str = field(kw_only=True) class-attribute instance-attribute

to_dict()

Source code in griptape/griptape/events/completion_chunk_event.py
def to_dict(self):
    from griptape.schemas import CompletionChunkEventSchema

    return dict(CompletionChunkEventSchema().dump(self))

EventListener

Source code in griptape/griptape/events/event_listener.py
6
7
8
9
@define
class EventListener:
    handler: Callable[[BaseEvent], Any] = field()
    event_types: Optional[list[Type[BaseEvent]]] = field(default=None, kw_only=True)

event_types: Optional[list[Type[BaseEvent]]] = field(default=None, kw_only=True) class-attribute instance-attribute

handler: Callable[[BaseEvent], Any] = field() class-attribute instance-attribute

FinishActionSubtaskEvent

Bases: BaseActionSubtaskEvent

Source code in griptape/griptape/events/finish_action_subtask_event.py
@define
class FinishActionSubtaskEvent(BaseActionSubtaskEvent):
    def to_dict(self) -> dict:
        from griptape.schemas import FinishActionSubtaskEventSchema

        return dict(FinishActionSubtaskEventSchema().dump(self))

to_dict()

Source code in griptape/griptape/events/finish_action_subtask_event.py
def to_dict(self) -> dict:
    from griptape.schemas import FinishActionSubtaskEventSchema

    return dict(FinishActionSubtaskEventSchema().dump(self))

FinishImageGenerationEvent

Bases: BaseImageGenerationEvent

Source code in griptape/griptape/events/finish_image_generation_event.py
@define
class FinishImageGenerationEvent(BaseImageGenerationEvent):
    def to_dict(self) -> dict:
        from griptape.schemas import FinishImageGenerationEventSchema

        return dict(FinishImageGenerationEventSchema().dump(self))

to_dict()

Source code in griptape/griptape/events/finish_image_generation_event.py
def to_dict(self) -> dict:
    from griptape.schemas import FinishImageGenerationEventSchema

    return dict(FinishImageGenerationEventSchema().dump(self))

FinishPromptEvent

Bases: BasePromptEvent

Source code in griptape/griptape/events/finish_prompt_event.py
@define
class FinishPromptEvent(BasePromptEvent):
    result: str = field(kw_only=True)

    def to_dict(self) -> dict:
        from griptape.schemas import FinishPromptEventSchema

        return dict(FinishPromptEventSchema().dump(self))

result: str = field(kw_only=True) class-attribute instance-attribute

to_dict()

Source code in griptape/griptape/events/finish_prompt_event.py
def to_dict(self) -> dict:
    from griptape.schemas import FinishPromptEventSchema

    return dict(FinishPromptEventSchema().dump(self))

FinishStructureRunEvent

Bases: BaseEvent

Source code in griptape/griptape/events/finish_structure_run_event.py
@define
class FinishStructureRunEvent(BaseEvent):
    def to_dict(self) -> dict:
        from griptape.schemas import FinishStructureRunEventSchema

        return dict(FinishStructureRunEventSchema().dump(self))

to_dict()

Source code in griptape/griptape/events/finish_structure_run_event.py
def to_dict(self) -> dict:
    from griptape.schemas import FinishStructureRunEventSchema

    return dict(FinishStructureRunEventSchema().dump(self))

FinishTaskEvent

Bases: BaseTaskEvent

Source code in griptape/griptape/events/finish_task_event.py
@define
class FinishTaskEvent(BaseTaskEvent):
    def to_dict(self) -> dict:
        from griptape.schemas import FinishTaskEventSchema

        return dict(FinishTaskEventSchema().dump(self))

to_dict()

Source code in griptape/griptape/events/finish_task_event.py
def to_dict(self) -> dict:
    from griptape.schemas import FinishTaskEventSchema

    return dict(FinishTaskEventSchema().dump(self))

StartActionSubtaskEvent

Bases: BaseActionSubtaskEvent

Source code in griptape/griptape/events/start_action_subtask_event.py
@define
class StartActionSubtaskEvent(BaseActionSubtaskEvent):
    def to_dict(self) -> dict:
        from griptape.schemas import StartActionSubtaskEventSchema

        return dict(StartActionSubtaskEventSchema().dump(self))

to_dict()

Source code in griptape/griptape/events/start_action_subtask_event.py
def to_dict(self) -> dict:
    from griptape.schemas import StartActionSubtaskEventSchema

    return dict(StartActionSubtaskEventSchema().dump(self))

StartImageGenerationEvent

Bases: BaseImageGenerationEvent

Source code in griptape/griptape/events/start_image_generation_event.py
@define
class StartImageGenerationEvent(BaseImageGenerationEvent):
    prompts: list[str] = field(kw_only=True)
    negative_prompts: list[str] | None = field(default=None, kw_only=True)

    def to_dict(self) -> dict:
        from griptape.schemas import StartImageGenerationEventSchema

        return dict(StartImageGenerationEventSchema().dump(self))

negative_prompts: list[str] | None = field(default=None, kw_only=True) class-attribute instance-attribute

prompts: list[str] = field(kw_only=True) class-attribute instance-attribute

to_dict()

Source code in griptape/griptape/events/start_image_generation_event.py
def to_dict(self) -> dict:
    from griptape.schemas import StartImageGenerationEventSchema

    return dict(StartImageGenerationEventSchema().dump(self))

StartPromptEvent

Bases: BasePromptEvent

Source code in griptape/griptape/events/start_prompt_event.py
@define
class StartPromptEvent(BasePromptEvent):
    prompt_stack: PromptStack = field(kw_only=True)
    prompt: str = field(kw_only=True)

    def to_dict(self) -> dict:
        from griptape.schemas import StartPromptEventSchema

        return dict(StartPromptEventSchema().dump(self))

prompt: str = field(kw_only=True) class-attribute instance-attribute

prompt_stack: PromptStack = field(kw_only=True) class-attribute instance-attribute

to_dict()

Source code in griptape/griptape/events/start_prompt_event.py
def to_dict(self) -> dict:
    from griptape.schemas import StartPromptEventSchema

    return dict(StartPromptEventSchema().dump(self))

StartStructureRunEvent

Bases: BaseEvent

Source code in griptape/griptape/events/start_structure_run_event.py
@define
class StartStructureRunEvent(BaseEvent):
    def to_dict(self) -> dict:
        from griptape.schemas import StartStructureRunEventSchema

        return dict(StartStructureRunEventSchema().dump(self))

to_dict()

Source code in griptape/griptape/events/start_structure_run_event.py
def to_dict(self) -> dict:
    from griptape.schemas import StartStructureRunEventSchema

    return dict(StartStructureRunEventSchema().dump(self))

StartTaskEvent

Bases: BaseTaskEvent

Source code in griptape/griptape/events/start_task_event.py
@define
class StartTaskEvent(BaseTaskEvent):
    def to_dict(self) -> dict:
        from griptape.schemas import StartTaskEventSchema

        return dict(StartTaskEventSchema().dump(self))

to_dict()

Source code in griptape/griptape/events/start_task_event.py
def to_dict(self) -> dict:
    from griptape.schemas import StartTaskEventSchema

    return dict(StartTaskEventSchema().dump(self))