Skip to content

event_bus

EventBus = _EventBus() module-attribute

_event_listeners = ContextVar('event_listeners', default=None) module-attribute

_EventBus

Bases: SingletonMixin

Source code in griptape/events/event_bus.py
@define
class _EventBus(SingletonMixin):
    @property
    def event_listeners(self) -> list[EventListener]:
        event_listeners_val = _event_listeners.get()
        if event_listeners_val is None:
            event_listeners_val = []
            _event_listeners.set(event_listeners_val)
        return event_listeners_val

    @event_listeners.setter
    def event_listeners(self, event_listeners: list[EventListener]) -> None:
        _event_listeners.set(event_listeners)

    def add_event_listeners(self, event_listeners: list[EventListener]) -> list[EventListener]:
        return [self.add_event_listener(event_listener) for event_listener in event_listeners]

    def remove_event_listeners(self, event_listeners: list[EventListener]) -> None:
        for event_listener in event_listeners:
            self.remove_event_listener(event_listener)

    def add_event_listener(self, event_listener: EventListener) -> EventListener:
        if event_listener not in self.event_listeners:
            self.event_listeners = self.event_listeners + [event_listener]

        return event_listener

    def remove_event_listener(self, event_listener: EventListener) -> None:
        if event_listener in self.event_listeners:
            self.event_listeners = [listener for listener in self.event_listeners if listener != event_listener]

    def publish_event(self, event: BaseEvent, *, flush: bool = False) -> None:
        for event_listener in self.event_listeners:
            event_listener.publish_event(event, flush=flush)

    def clear_event_listeners(self) -> None:
        self.event_listeners = []

event_listeners property writable

add_event_listener(event_listener)

Source code in griptape/events/event_bus.py
def add_event_listener(self, event_listener: EventListener) -> EventListener:
    if event_listener not in self.event_listeners:
        self.event_listeners = self.event_listeners + [event_listener]

    return event_listener

add_event_listeners(event_listeners)

Source code in griptape/events/event_bus.py
def add_event_listeners(self, event_listeners: list[EventListener]) -> list[EventListener]:
    return [self.add_event_listener(event_listener) for event_listener in event_listeners]

clear_event_listeners()

Source code in griptape/events/event_bus.py
def clear_event_listeners(self) -> None:
    self.event_listeners = []

publish_event(event, *, flush=False)

Source code in griptape/events/event_bus.py
def publish_event(self, event: BaseEvent, *, flush: bool = False) -> None:
    for event_listener in self.event_listeners:
        event_listener.publish_event(event, flush=flush)

remove_event_listener(event_listener)

Source code in griptape/events/event_bus.py
def remove_event_listener(self, event_listener: EventListener) -> None:
    if event_listener in self.event_listeners:
        self.event_listeners = [listener for listener in self.event_listeners if listener != event_listener]

remove_event_listeners(event_listeners)

Source code in griptape/events/event_bus.py
def remove_event_listeners(self, event_listeners: list[EventListener]) -> None:
    for event_listener in event_listeners:
        self.remove_event_listener(event_listener)