Skip to content

Utils

__all__ = ['Conversation', 'ManifestValidator', 'PythonRunner', 'CommandRunner', 'minify_json', 'J2', 'Chat', 'str_to_hash', 'import_optional_dependency', 'execute_futures_dict', 'TokenCounter', 'PromptStack', 'remove_null_values_in_dict_recursively', 'Stream', 'constants'] module-attribute

Chat

Source code in griptape/griptape/utils/chat.py
@define
class Chat:
    structure: Structure = field()
    exit_keywords: list[str] = field(default=["exit"], kw_only=True)
    exiting_text: str = field(default="Exiting...", kw_only=True)
    processing_text: str = field(default="Thinking...", kw_only=True)
    intro_text: str | None = field(default=None, kw_only=True)
    prompt_prefix: str = field(default="User: ", kw_only=True)
    response_prefix: str = field(default="Assistant: ", kw_only=True)
    output_fn: Callable[[str], None] = field(
        default=Factory(lambda self: self.default_output_fn, takes_self=True), kw_only=True
    )

    def default_output_fn(self, text: str) -> None:
        if self.structure.prompt_driver.stream:
            print(text, end="", flush=True)
        else:
            print(text)

    def start(self) -> None:
        if self.intro_text:
            self.output_fn(self.intro_text)
        while True:
            question = input(self.prompt_prefix)

            if question.lower() in self.exit_keywords:
                self.output_fn(self.exiting_text)
                break

            if self.structure.prompt_driver.stream:
                self.output_fn(self.processing_text + "\n")
                stream = Stream(self.structure).run(question)
                first_chunk = next(stream)
                self.output_fn(self.response_prefix + first_chunk.value)
                for chunk in stream:
                    self.output_fn(chunk.value)
            else:
                self.output_fn(self.processing_text)
                self.output_fn(f"{self.response_prefix}{self.structure.run(question).output_task.output.to_text()}")

exit_keywords: list[str] = field(default=['exit'], kw_only=True) class-attribute instance-attribute

exiting_text: str = field(default='Exiting...', kw_only=True) class-attribute instance-attribute

intro_text: str | None = field(default=None, kw_only=True) class-attribute instance-attribute

output_fn: Callable[[str], None] = field(default=Factory(lambda : self.default_output_fn, takes_self=True), kw_only=True) class-attribute instance-attribute

processing_text: str = field(default='Thinking...', kw_only=True) class-attribute instance-attribute

prompt_prefix: str = field(default='User: ', kw_only=True) class-attribute instance-attribute

response_prefix: str = field(default='Assistant: ', kw_only=True) class-attribute instance-attribute

structure: Structure = field() class-attribute instance-attribute

default_output_fn(text)

Source code in griptape/griptape/utils/chat.py
def default_output_fn(self, text: str) -> None:
    if self.structure.prompt_driver.stream:
        print(text, end="", flush=True)
    else:
        print(text)

start()

Source code in griptape/griptape/utils/chat.py
def start(self) -> None:
    if self.intro_text:
        self.output_fn(self.intro_text)
    while True:
        question = input(self.prompt_prefix)

        if question.lower() in self.exit_keywords:
            self.output_fn(self.exiting_text)
            break

        if self.structure.prompt_driver.stream:
            self.output_fn(self.processing_text + "\n")
            stream = Stream(self.structure).run(question)
            first_chunk = next(stream)
            self.output_fn(self.response_prefix + first_chunk.value)
            for chunk in stream:
                self.output_fn(chunk.value)
        else:
            self.output_fn(self.processing_text)
            self.output_fn(f"{self.response_prefix}{self.structure.run(question).output_task.output.to_text()}")

CommandRunner

Source code in griptape/griptape/utils/command_runner.py
@define
class CommandRunner:
    def run(self, command: str) -> BaseArtifact:
        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        stdout, stderr = process.communicate()

        if len(stderr) == 0:
            return TextArtifact(stdout.strip().decode())
        else:
            return ErrorArtifact(f"error: {stderr.strip()}")

run(command)

Source code in griptape/griptape/utils/command_runner.py
def run(self, command: str) -> BaseArtifact:
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    stdout, stderr = process.communicate()

    if len(stderr) == 0:
        return TextArtifact(stdout.strip().decode())
    else:
        return ErrorArtifact(f"error: {stderr.strip()}")

Conversation

Source code in griptape/griptape/utils/conversation.py
@define(frozen=True)
class Conversation:
    memory: ConversationMemory = field()

    def lines(self) -> list[str]:
        lines = []

        for run in self.memory.runs:
            lines.append(f"Q: {run.input}")
            lines.append(f"A: {run.output}")

        return lines

    def prompt_stack(self) -> list[str]:
        lines = []

        for stack in self.memory.to_prompt_stack().inputs:
            lines.append(f"{stack.role}: {stack.content}")

        return lines

    def __str__(self) -> str:
        return str.join("\n", self.lines())

memory: ConversationMemory = field() class-attribute instance-attribute

__str__()

Source code in griptape/griptape/utils/conversation.py
def __str__(self) -> str:
    return str.join("\n", self.lines())

lines()

Source code in griptape/griptape/utils/conversation.py
def lines(self) -> list[str]:
    lines = []

    for run in self.memory.runs:
        lines.append(f"Q: {run.input}")
        lines.append(f"A: {run.output}")

    return lines

prompt_stack()

Source code in griptape/griptape/utils/conversation.py
def prompt_stack(self) -> list[str]:
    lines = []

    for stack in self.memory.to_prompt_stack().inputs:
        lines.append(f"{stack.role}: {stack.content}")

    return lines

J2

Source code in griptape/griptape/utils/j2.py
@define(frozen=True)
class J2:
    template_name: Optional[str] = field(default=None)
    templates_dir: str = field(default=abs_path("templates"), kw_only=True)
    environment: Environment = field(
        default=Factory(
            lambda self: Environment(loader=FileSystemLoader(self.templates_dir), trim_blocks=True, lstrip_blocks=True),
            takes_self=True,
        ),
        kw_only=True,
    )

    def render(self, **kwargs) -> str:
        return self.environment.get_template(self.template_name).render(kwargs).rstrip()

    def render_from_string(self, value: str, **kwargs) -> str:
        return self.environment.from_string(value).render(kwargs)

environment: Environment = field(default=Factory(lambda : Environment(loader=FileSystemLoader(self.templates_dir), trim_blocks=True, lstrip_blocks=True), takes_self=True), kw_only=True) class-attribute instance-attribute

template_name: Optional[str] = field(default=None) class-attribute instance-attribute

templates_dir: str = field(default=abs_path('templates'), kw_only=True) class-attribute instance-attribute

render(**kwargs)

Source code in griptape/griptape/utils/j2.py
def render(self, **kwargs) -> str:
    return self.environment.get_template(self.template_name).render(kwargs).rstrip()

render_from_string(value, **kwargs)

Source code in griptape/griptape/utils/j2.py
def render_from_string(self, value: str, **kwargs) -> str:
    return self.environment.from_string(value).render(kwargs)

ManifestValidator

Source code in griptape/griptape/utils/manifest_validator.py
4
5
6
7
8
9
class ManifestValidator:
    def validate(self, manifest: dict) -> dict:
        return self.schema().validate(manifest)

    def schema(self) -> Schema:
        return Schema({"version": "v1", "name": str, "description": str, "contact_email": str, "legal_info_url": str})

schema()

Source code in griptape/griptape/utils/manifest_validator.py
def schema(self) -> Schema:
    return Schema({"version": "v1", "name": str, "description": str, "contact_email": str, "legal_info_url": str})

validate(manifest)

Source code in griptape/griptape/utils/manifest_validator.py
def validate(self, manifest: dict) -> dict:
    return self.schema().validate(manifest)

PromptStack

Source code in griptape/griptape/utils/prompt_stack.py
@define
class PromptStack:
    GENERIC_ROLE = "generic"
    USER_ROLE = "user"
    ASSISTANT_ROLE = "assistant"
    SYSTEM_ROLE = "system"

    @dataclass
    class Input:
        content: str | list[dict]
        role: str

        def is_generic(self) -> bool:
            return self.role == PromptStack.GENERIC_ROLE

        def is_system(self) -> bool:
            return self.role == PromptStack.SYSTEM_ROLE

        def is_user(self) -> bool:
            return self.role == PromptStack.USER_ROLE

        def is_assistant(self) -> bool:
            return self.role == PromptStack.ASSISTANT_ROLE

    inputs: list[Input] = field(factory=list, kw_only=True)

    def add_input(self, content: str, role: str) -> Input:
        self.inputs.append(self.Input(content=content, role=role))

        return self.inputs[-1]

    def add_generic_input(self, content: str) -> Input:
        return self.add_input(content, self.GENERIC_ROLE)

    def add_system_input(self, content: str) -> Input:
        return self.add_input(content, self.SYSTEM_ROLE)

    def add_user_input(self, content: str) -> Input:
        return self.add_input(content, self.USER_ROLE)

    def add_assistant_input(self, content: str) -> Input:
        return self.add_input(content, self.ASSISTANT_ROLE)

    def add_conversation_memory(self, memory: ConversationMemory, index: int | None = None) -> list[Input]:
        """Add the Conversation Memory runs to the Prompt Stack.

        If autoprune is enabled, this will fit as many Conversation Memory runs into the Prompt Stack
        as possible without exceeding the token limit.

        Args:
            memory: The Conversation Memory to add the Prompt Stack to.
            index: Optional index to insert the Conversation Memory runs at.
                   Defaults to appending to the end of the Prompt Stack.
        """
        num_runs_to_fit_in_prompt = len(memory.runs)

        if memory.autoprune and hasattr(memory, "structure"):
            should_prune = True
            prompt_driver = memory.structure.prompt_driver
            temp_stack = PromptStack()

            # Try to determine how many Conversation Memory runs we can
            # fit into the Prompt Stack without exceeding the token limit.
            while should_prune and num_runs_to_fit_in_prompt > 0:
                temp_stack.inputs = self.inputs.copy()

                # Add n runs from Conversation Memory.
                # Where we insert into the Prompt Stack doesn't matter here
                # since we only care about the total token count.
                memory_inputs = memory.to_prompt_stack(num_runs_to_fit_in_prompt).inputs
                temp_stack.inputs.extend(memory_inputs)

                # Convert the prompt stack into tokens left.
                prompt_string = prompt_driver.prompt_stack_to_string(temp_stack)
                tokens_left = prompt_driver.tokenizer.count_tokens_left(prompt_string)
                if tokens_left > 0:
                    # There are still tokens left, no need to prune.
                    should_prune = False
                else:
                    # There were not any tokens left, prune one run and try again.
                    num_runs_to_fit_in_prompt -= 1

        if num_runs_to_fit_in_prompt:
            memory_inputs = memory.to_prompt_stack(num_runs_to_fit_in_prompt).inputs
            if index:
                self.inputs[index:index] = memory_inputs
            else:
                self.inputs.extend(memory_inputs)
        return self.inputs

ASSISTANT_ROLE = 'assistant' class-attribute instance-attribute

GENERIC_ROLE = 'generic' class-attribute instance-attribute

SYSTEM_ROLE = 'system' class-attribute instance-attribute

USER_ROLE = 'user' class-attribute instance-attribute

inputs: list[Input] = field(factory=list, kw_only=True) class-attribute instance-attribute

Input dataclass

Source code in griptape/griptape/utils/prompt_stack.py
@dataclass
class Input:
    content: str | list[dict]
    role: str

    def is_generic(self) -> bool:
        return self.role == PromptStack.GENERIC_ROLE

    def is_system(self) -> bool:
        return self.role == PromptStack.SYSTEM_ROLE

    def is_user(self) -> bool:
        return self.role == PromptStack.USER_ROLE

    def is_assistant(self) -> bool:
        return self.role == PromptStack.ASSISTANT_ROLE
content: str | list[dict] instance-attribute
role: str instance-attribute
is_assistant()
Source code in griptape/griptape/utils/prompt_stack.py
def is_assistant(self) -> bool:
    return self.role == PromptStack.ASSISTANT_ROLE
is_generic()
Source code in griptape/griptape/utils/prompt_stack.py
def is_generic(self) -> bool:
    return self.role == PromptStack.GENERIC_ROLE
is_system()
Source code in griptape/griptape/utils/prompt_stack.py
def is_system(self) -> bool:
    return self.role == PromptStack.SYSTEM_ROLE
is_user()
Source code in griptape/griptape/utils/prompt_stack.py
def is_user(self) -> bool:
    return self.role == PromptStack.USER_ROLE

add_assistant_input(content)

Source code in griptape/griptape/utils/prompt_stack.py
def add_assistant_input(self, content: str) -> Input:
    return self.add_input(content, self.ASSISTANT_ROLE)

add_conversation_memory(memory, index=None)

Add the Conversation Memory runs to the Prompt Stack.

If autoprune is enabled, this will fit as many Conversation Memory runs into the Prompt Stack as possible without exceeding the token limit.

Parameters:

Name Type Description Default
memory ConversationMemory

The Conversation Memory to add the Prompt Stack to.

required
index int | None

Optional index to insert the Conversation Memory runs at. Defaults to appending to the end of the Prompt Stack.

None
Source code in griptape/griptape/utils/prompt_stack.py
def add_conversation_memory(self, memory: ConversationMemory, index: int | None = None) -> list[Input]:
    """Add the Conversation Memory runs to the Prompt Stack.

    If autoprune is enabled, this will fit as many Conversation Memory runs into the Prompt Stack
    as possible without exceeding the token limit.

    Args:
        memory: The Conversation Memory to add the Prompt Stack to.
        index: Optional index to insert the Conversation Memory runs at.
               Defaults to appending to the end of the Prompt Stack.
    """
    num_runs_to_fit_in_prompt = len(memory.runs)

    if memory.autoprune and hasattr(memory, "structure"):
        should_prune = True
        prompt_driver = memory.structure.prompt_driver
        temp_stack = PromptStack()

        # Try to determine how many Conversation Memory runs we can
        # fit into the Prompt Stack without exceeding the token limit.
        while should_prune and num_runs_to_fit_in_prompt > 0:
            temp_stack.inputs = self.inputs.copy()

            # Add n runs from Conversation Memory.
            # Where we insert into the Prompt Stack doesn't matter here
            # since we only care about the total token count.
            memory_inputs = memory.to_prompt_stack(num_runs_to_fit_in_prompt).inputs
            temp_stack.inputs.extend(memory_inputs)

            # Convert the prompt stack into tokens left.
            prompt_string = prompt_driver.prompt_stack_to_string(temp_stack)
            tokens_left = prompt_driver.tokenizer.count_tokens_left(prompt_string)
            if tokens_left > 0:
                # There are still tokens left, no need to prune.
                should_prune = False
            else:
                # There were not any tokens left, prune one run and try again.
                num_runs_to_fit_in_prompt -= 1

    if num_runs_to_fit_in_prompt:
        memory_inputs = memory.to_prompt_stack(num_runs_to_fit_in_prompt).inputs
        if index:
            self.inputs[index:index] = memory_inputs
        else:
            self.inputs.extend(memory_inputs)
    return self.inputs

add_generic_input(content)

Source code in griptape/griptape/utils/prompt_stack.py
def add_generic_input(self, content: str) -> Input:
    return self.add_input(content, self.GENERIC_ROLE)

add_input(content, role)

Source code in griptape/griptape/utils/prompt_stack.py
def add_input(self, content: str, role: str) -> Input:
    self.inputs.append(self.Input(content=content, role=role))

    return self.inputs[-1]

add_system_input(content)

Source code in griptape/griptape/utils/prompt_stack.py
def add_system_input(self, content: str) -> Input:
    return self.add_input(content, self.SYSTEM_ROLE)

add_user_input(content)

Source code in griptape/griptape/utils/prompt_stack.py
def add_user_input(self, content: str) -> Input:
    return self.add_input(content, self.USER_ROLE)

PythonRunner

Source code in griptape/griptape/utils/python_runner.py
@define
class PythonRunner:
    libs: dict[str, str] = field(factory=dict, kw_only=True)

    def run(self, code: str) -> str:
        global_stdout = sys.stdout
        sys.stdout = local_stdout = StringIO()

        try:
            for lib, alias in self.libs.items():
                imported_lib = importlib.import_module(lib)
                globals()[alias] = imported_lib

            exec(f"print({code})", {}, {alias: eval(alias) for alias in self.libs.values()})

            output = local_stdout.getvalue()
        except Exception as e:
            output = str(e)
        finally:
            sys.stdout = global_stdout

        return output.strip()

libs: dict[str, str] = field(factory=dict, kw_only=True) class-attribute instance-attribute

run(code)

Source code in griptape/griptape/utils/python_runner.py
def run(self, code: str) -> str:
    global_stdout = sys.stdout
    sys.stdout = local_stdout = StringIO()

    try:
        for lib, alias in self.libs.items():
            imported_lib = importlib.import_module(lib)
            globals()[alias] = imported_lib

        exec(f"print({code})", {}, {alias: eval(alias) for alias in self.libs.values()})

        output = local_stdout.getvalue()
    except Exception as e:
        output = str(e)
    finally:
        sys.stdout = global_stdout

    return output.strip()

Stream

A wrapper for Structures that converts CompletionChunkEvents into an iterator of TextArtifacts.

It achieves this by running the Structure in a separate thread, listening for events from the Structure, and yielding those events.

See relevant Stack Overflow post: https://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators

Attributes:

Name Type Description
structure Structure

The Structure to wrap.

_event_queue Queue[BaseEvent]

A queue to hold events from the Structure.

Source code in griptape/griptape/utils/stream.py
@define
class Stream:
    """A wrapper for Structures that converts `CompletionChunkEvent`s into an iterator of TextArtifacts.

    It achieves this by running the Structure in a separate thread, listening for events from the Structure,
    and yielding those events.

    See relevant Stack Overflow post: https://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators

    Attributes:
        structure: The Structure to wrap.
        _event_queue: A queue to hold events from the Structure.
    """

    structure: Structure = field()

    @structure.validator  # pyright: ignore
    def validate_structure(self, _, structure: Structure):
        if structure and not structure.prompt_driver.stream:
            raise ValueError("prompt driver does not have streaming enabled, enable with stream=True")

    _event_queue: Queue[BaseEvent] = field(default=Factory(lambda: Queue()))

    def run(self, *args) -> Iterator[TextArtifact]:
        t = Thread(target=self._run_structure, args=args)
        t.start()

        while True:
            event = self._event_queue.get()
            if isinstance(event, FinishStructureRunEvent):
                break
            elif isinstance(event, FinishPromptEvent):
                yield TextArtifact(value="\n")
            elif isinstance(event, CompletionChunkEvent):
                yield TextArtifact(value=event.token)
        t.join()

    def _run_structure(self, *args):
        def event_handler(event: BaseEvent):
            self._event_queue.put(event)

        stream_event_listener = EventListener(
            event_handler, event_types=[CompletionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]
        )
        self.structure.add_event_listener(stream_event_listener)

        self.structure.run(*args)

        self.structure.remove_event_listener(stream_event_listener)

structure: Structure = field() class-attribute instance-attribute

run(*args)

Source code in griptape/griptape/utils/stream.py
def run(self, *args) -> Iterator[TextArtifact]:
    t = Thread(target=self._run_structure, args=args)
    t.start()

    while True:
        event = self._event_queue.get()
        if isinstance(event, FinishStructureRunEvent):
            break
        elif isinstance(event, FinishPromptEvent):
            yield TextArtifact(value="\n")
        elif isinstance(event, CompletionChunkEvent):
            yield TextArtifact(value=event.token)
    t.join()

validate_structure(_, structure)

Source code in griptape/griptape/utils/stream.py
@structure.validator  # pyright: ignore
def validate_structure(self, _, structure: Structure):
    if structure and not structure.prompt_driver.stream:
        raise ValueError("prompt driver does not have streaming enabled, enable with stream=True")

TokenCounter

Source code in griptape/griptape/utils/token_counter.py
@define
class TokenCounter:
    tokens: int = field(default=0, kw_only=True)

    def add_tokens(self, new_tokens: int) -> int:
        self.tokens += new_tokens

        return self.tokens

tokens: int = field(default=0, kw_only=True) class-attribute instance-attribute

add_tokens(new_tokens)

Source code in griptape/griptape/utils/token_counter.py
def add_tokens(self, new_tokens: int) -> int:
    self.tokens += new_tokens

    return self.tokens

execute_futures_dict(fs_dict)

Source code in griptape/griptape/utils/futures.py
def execute_futures_dict(fs_dict: dict[str, futures.Future[T]]) -> dict[str, T]:
    futures.wait(fs_dict.values(), timeout=None, return_when=futures.ALL_COMPLETED)

    return {key: future.result() for key, future in fs_dict.items()}

import_optional_dependency(name)

Import an optional dependency.

If a dependency is missing, an ImportError with a nice message will be raised.

Parameters:

Name Type Description Default
name str

The module name.

required

Returns: The imported module, when found. None is returned when the package is not found and errors is False.

Source code in griptape/griptape/utils/import_utils.py
def import_optional_dependency(name: str) -> Optional[ModuleType]:
    """Import an optional dependency.

    If a dependency is missing, an ImportError with a nice message will be raised.

    Args:
        name: The module name.
    Returns:
        The imported module, when found.
        None is returned when the package is not found and `errors` is False.
    """

    package_name = INSTALL_MAPPING.get(name)
    install_name = package_name if package_name is not None else name

    msg = f"Missing optional dependency: '{install_name}'. " f"Use poetry or pip to install '{install_name}'."
    try:
        module = import_module(name)
    except ImportError:
        raise ImportError(msg)

    return module

minify_json(value)

Source code in griptape/griptape/utils/__init__.py
def minify_json(value: str) -> str:
    return json.dumps(json.loads(value), separators=(",", ":"))

remove_null_values_in_dict_recursively(d)

Source code in griptape/griptape/utils/dict_utils.py
1
2
3
4
5
def remove_null_values_in_dict_recursively(d):
    if isinstance(d, dict):
        return {k: remove_null_values_in_dict_recursively(v) for k, v in d.items() if v is not None}
    else:
        return d

str_to_hash(text, hash_algorithm='sha256')

Source code in griptape/griptape/utils/hash.py
4
5
6
7
8
9
def str_to_hash(text: str, hash_algorithm: str = "sha256") -> str:
    m = hashlib.new(hash_algorithm)

    m.update(text.encode())

    return m.hexdigest()