Event Listener Drivers
Overview
Event Listener Drivers are used to send Griptape Events to external services.
You can instantiate Drivers and pass them to Event Listeners in your Structure:
import os
from griptape.drivers import AmazonSqsEventListenerDriver
from griptape.events import EventBus, EventListener
from griptape.rules import Rule
from griptape.structures import Agent
EventBus.add_event_listeners(
[
EventListener(
driver=AmazonSqsEventListenerDriver(
queue_url=os.environ["AMAZON_SQS_QUEUE_URL"],
),
),
]
)
agent = Agent(
rules=[
Rule(value="You will be provided with a block of text, and your task is to extract a list of keywords from it.")
],
)
agent.run(
"""Black-on-black ware is a 20th- and 21st-century pottery tradition developed by the Puebloan Native American ceramic artists in Northern New Mexico.
Traditional reduction-fired blackware has been made for centuries by pueblo artists.
Black-on-black ware of the past century is produced with a smooth surface, with the designs applied through selective burnishing or the application of refractory slip.
Another style involves carving or incising designs and selectively polishing the raised areas.
For generations several families from Kha'po Owingeh and P'ohwhóge Owingeh pueblos have been making black-on-black ware with the techniques passed down from matriarch potters. Artists from other pueblos have also produced black-on-black ware.
Several contemporary artists have created works honoring the pottery of their ancestors."""
)
Or use them independently:
from griptape.artifacts import TextArtifact
from griptape.drivers import GriptapeCloudEventListenerDriver
from griptape.events import FinishStructureRunEvent
# By default, GriptapeCloudEventListenerDriver uses the api key provided
# in the GT_CLOUD_API_KEY environment variable.
event_driver = GriptapeCloudEventListenerDriver()
done_event = FinishStructureRunEvent(
output_task_input=TextArtifact("Just started!"),
output_task_output=TextArtifact("All done!"),
)
event_driver.publish_event(done_event)
Event Listener Drivers
Griptape offers the following Event Listener Drivers for forwarding Griptape Events.
Amazon SQS
Info
This driver requires the drivers-event-listener-amazon-sqs
extra.
The AmazonSqsEventListenerDriver sends Events to an Amazon SQS queue.
import os
from griptape.drivers import AmazonSqsEventListenerDriver
from griptape.events import EventBus, EventListener
from griptape.rules import Rule
from griptape.structures import Agent
EventBus.add_event_listeners(
[
EventListener(
driver=AmazonSqsEventListenerDriver(
queue_url=os.environ["AMAZON_SQS_QUEUE_URL"],
),
),
]
)
agent = Agent(
rules=[
Rule(value="You will be provided with a block of text, and your task is to extract a list of keywords from it.")
],
)
agent.run(
"""Black-on-black ware is a 20th- and 21st-century pottery tradition developed by the Puebloan Native American ceramic artists in Northern New Mexico.
Traditional reduction-fired blackware has been made for centuries by pueblo artists.
Black-on-black ware of the past century is produced with a smooth surface, with the designs applied through selective burnishing or the application of refractory slip.
Another style involves carving or incising designs and selectively polishing the raised areas.
For generations several families from Kha'po Owingeh and P'ohwhóge Owingeh pueblos have been making black-on-black ware with the techniques passed down from matriarch potters. Artists from other pueblos have also produced black-on-black ware.
Several contemporary artists have created works honoring the pottery of their ancestors."""
)
AWS IoT
Info
This driver requires the drivers-event-listener-amazon-iot
extra.
The AwsIotCoreEventListenerDriver sends Events to the AWS IoT Message Broker.
import os
from griptape.configs import Defaults
from griptape.configs.drivers import DriversConfig
from griptape.drivers import AwsIotCoreEventListenerDriver, OpenAiChatPromptDriver
from griptape.events import EventBus, EventListener, FinishStructureRunEvent
from griptape.rules import Rule
from griptape.structures import Agent
Defaults.drivers_config = DriversConfig(prompt_driver=OpenAiChatPromptDriver(model="gpt-3.5-turbo", temperature=0.7))
EventBus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=AwsIotCoreEventListenerDriver(
topic=os.environ["AWS_IOT_CORE_TOPIC"],
iot_endpoint=os.environ["AWS_IOT_CORE_ENDPOINT"],
),
),
]
)
agent = Agent(
rules=[Rule(value="You will be provided with a text, and your task is to extract the airport codes from it.")],
)
agent.run("I want to fly from Orlando to Boston")
Griptape Cloud
The GriptapeCloudEventListenerDriver sends Events to Griptape Cloud.
Note
This Driver is required when using the Griptape Cloud Managed Structures feature. For local development, you can use the Skatepark Emulator.
from griptape.drivers import GriptapeCloudEventListenerDriver
from griptape.events import EventBus, EventListener, FinishStructureRunEvent
from griptape.structures import Agent
EventBus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
# By default, GriptapeCloudEventListenerDriver uses the api key provided
# in the GT_CLOUD_API_KEY environment variable.
driver=GriptapeCloudEventListenerDriver(),
),
]
)
agent = Agent()
agent.run("Create a list of 8 questions for an interview with a science fiction author.")
Webhook Event Listener Driver
The WebhookEventListenerDriver sends Events to any Webhook URL.
import os
from griptape.drivers import WebhookEventListenerDriver
from griptape.events import EventBus, EventListener, FinishStructureRunEvent
from griptape.structures import Agent
EventBus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=WebhookEventListenerDriver(
webhook_url=os.environ["WEBHOOK_URL"],
),
),
]
)
agent = Agent()
agent.run("Analyze the pros and cons of remote work vs. office work")
Pusher
Info
This driver requires the drivers-event-listener-pusher
extra.
The PusherEventListenerDriver sends Events to Pusher.
import os
from griptape.drivers import PusherEventListenerDriver
from griptape.events import EventBus, EventListener, FinishStructureRunEvent
from griptape.structures import Agent
EventBus.add_event_listeners(
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=PusherEventListenerDriver(
batched=False,
app_id=os.environ["PUSHER_APP_ID"],
key=os.environ["PUSHER_KEY"],
secret=os.environ["PUSHER_SECRET"],
cluster=os.environ["PUSHER_CLUSTER"],
channel="my-channel",
event_name="my-event",
),
),
],
)
agent = Agent()
agent.run("Analyze the pros and cons of remote work vs. office work")