Package ai_shell

Filesystem shell tools for OpenAI Assistant

ai_shell

OpenAI-centric shell for giving safe, chat-optimized, filesystem access to an Assistant as a "tool".

Even if you trust the bot to run bash directly on your machine or docker container, standard tools will run up your bill with excess tokens in the reply, or a command generates too few tokens and the bot doesn't know what is going on.

This is an alternative to code_interpreter, tools running code in docker container locally, or tools running arbitrary shell code locally.

Installation

pip install ai_shell

Usage

See these full examples. As long as the OPENAI_API_KEY environment variable is set, you can run these examples.

To execute demo bots, run these commands and follow initialization instructions if needed. They all expect to manipulate python code in an /src/ folder.

python -m ai_shell.demo_bots.docs_writer_bot
python -m ai_shell.demo_bots.pylint_bot
python -m ai_shell.demo_bots.test_writer_bot
python -m ai_shell.demo_bots.tool_tester_bot
python -m ai_shell.demo_bots.todo_bot

This is the python interface to the tools, how you're expected to wire up the tool to your bot.

import ai_shell

cat = ai_shell.CatTool(".")
print(cat.cat(["file.py"]))
print(cat.cat_markdown(["file.py"]))

ls = ai_shell.LsTool(".")
print(ls.ls("docs"))
print(ls.ls_markdown("docs"))

This is the smallest example to illustrate basic capabilities, also see here.

import asyncio
import ai_shell


async def main():
    def static_keep_going(toolkit: ai_shell.ToolKit):
        usage = toolkit.get_tool_usage_for("ls")
        if usage["count"] > 0:
            return (
                "Great job! You've used ls. Summarize in paragraph form and we're done."
            )
        return (
            "You haven't used the ls tool yet. Do you have access to the ls tool? If"
            " there is a problem report it to the report_text tool to end the session."
        )

    # Creates temporary bots
    bot = ai_shell.TaskBot(
        ai_shell.Config(),
        name="Folder inspection bot.",
        bot_instructions="Run the ls tool and tell me what you see.",
        model="gpt-3.5-turbo-1106",
        dialog_logger_md=ai_shell.DialogLoggerWithMarkdown("./tmp"),
    )
    await bot.initialize()
    the_ask = f"""You are in the './' folder. You do not need to guess the pwd, it is './'. 
    Run ls and tell me what you see in paragraph format."""
    await bot.basic_tool_loop(
        the_ask=the_ask,
        root_folder="./src",
        tool_names=[
            "ls",
            "report_text",
        ],
        keep_going_prompt=static_keep_going,
    )


if __name__ == "__main__":
    asyncio.run(main())

This is the cli interface, which is intended for testing, not for bot usage.

ais cat_markdown --file-paths pyproject.toml

Features in Brief

  • Many cli-like tools interfaces, such as ls, cat, grep, head, tail, and git.
  • OpenAI glue for all cli tools.
  • UX with a bot in mind.
  • Security with mischievous but not especially malicious bot in mind.
  • Bot (Assistant) boilerplate help
  • Support for bots doing one shot tool use and goal function driven tool use.
  • Bot have extensibility points.
  • TODO: plugin system for tools.

Analogues supported today

Directories: ls, find

Files: cat, grep, head, tail

Editing: sed, ed, edlin, patch, replace, insert, rewrite, write new

Data: cut

Other: pycat, token counter, git

Tasking: todo

n.b. Every file is read and written as utf-8 strings.

Prior Art

ai_shell draws inspiration from various command-line interface (CLI) tools and shell environments, integrating features from traditional shells with OpenAI's language models. It is designed to provide an easy and secure interface for AI-assisted file system interactions, keeping in mind both usability and safety.

Documentation

Document Editing

The bot has a hard time with editing files.

Some top level strategies

  • editor-like tools, eg ed and edlin
  • text replacement tools, e.g. sed, regex, replace
  • full document replacement
  • write only new files
  • multiline insert tool
  • diff/patch tool
  • well known tools vs simpler ad-hoc tools
  • validated format
  • Guided edit

Editors

The bot cannot consistently edit a document with ed or edlin. It gets confused about the state of the document, the syntax of editor, etc.

Text Replacers

It gets confused about the syntax of sed and regex.

It also has to keep straight complex escaping patterns as it write JSON that has to be escaped for JSON, which wraps editor syntax, which can included regex.

Full Text replacement

It constantly confused rewriting an entire document with modifications with writing just the modified lines, creating non-stop mass, accidental deletions.

Writing only new files

This shift the burden of merging document to a human.

Multiline Insert

This is similar to the sed and regex tool, but with simpler syntax. The cost for simplicity is that the bot has no background knowledge of how to use the multiline insert tool, while it has lots of knowledge of sed and regex.

Success vs Failure

When the bot gets a success flag, it thinks the job is done and stops early. When the bot gets a failure, it sometimes assumes the task is impossible and gives up. This is mitigated a little bit by returning the entire document after each edit so it can see what happened.

Diff/Patch

To create a well known Patch format, the bot has to be able to count lines perfectly. It often can't so the Patch tool rejects the patches and after a few attempts the bot gives up. Common patch tools provide almost no feedback because the bot is failing in a way that normal tools don't, so why would they provide helpful feedback?

Validated Format

In the case of, say python code, the syntax can be validated and if the bot messed up the document, it can be reverted adn the bot can try again.

Guided Edit

An example of guided edit is feeding the bot one line or paragraph of text and asking it to transform it or otherwise do something with it. Then ordinary code merges it in a predictable format. For example, show the bot a function, then ask for a docstring.

Features

General Features

  • Tools are similar to common shell commands like ls, grep, head, tail, and git for easy use by the bot.
  • Handle bot "accessibility". The bot can't see, hear, or interact with the console display.
  • Designed to minimize token usage in both input and output
  • Supports maximum row response for output.
  • Supports mime_types
  • Outputs in Markdown to reduce token repetition through sections and subsection headers.
  • Implementation is simplified with no command chaining.

Text and Size Management

  • TODO: Maximum output limits, both by config and by bot.
  • Capability to count tokens, with a maximum limit and fallbacks to word or byte count.
  • Implements whitespace and other lossless/lossy text compressions.

Source Code Display and Analysis

  • Converts code to Markdown (.md) files, including minified versions.
  • TODO: AST (Abstract Syntax Tree) display and search capabilities using pycodesearch.
  • TODO: Features for displaying source code of functions, classes in imported modules.
  • TODO: Simplified directory listings (dir(module)) and module help (help(module)) focusing on key elements.

Security Constraints

General Security Measures

  • Provides multiple output formats: plain text, light markdown, and JSON (structured objects).
  • TODO: Ignores files specified in .gitignore.
  • TODO: Skips files hidden by the operating system.
  • Prevents parent directory traversal. Disallows file system modification outside of specified folders.
  • No direct shell access and write permissions are restricted.
  • Limited write access to source code, isolated within a specific git branch.
  • File writes are permitted only in a designated branch.
  • No network access.

Bots and Subbots

(in progress)

I created this so I could create a swarm of bots to collaboratively work on a task. As it turns out, sub-bots or subbots, would be useful for tool usage.

Subbots

  • tool selection. Too many tools or extraneous tools confuse the bot.
  • prompt improvement. The bot can often write a better prompt.
  • todo management. Splitting up tasks and managing them and doing the task makes the task execution part worse.

Subbot usage requires an editable config file for storing assistant ids.

Programmer's Manual

Possible goals

  • Write a single shot bot, that uses tools but does not loop.
  • Write a tool using bot. It uses tools, loops and has a goal function.
  • Use the tools in the toolkit with your own bot framework

Extension Points

  • Goal checking functions
  • New tools with plugins
  • External tools (with merge request)

Use Cases for ai_shell

Good uses

It is a library for making more bots, especially if the bot needs shell access and either that is dangerous or the bot has a hard time with the complexities of a real shell.

Bad uses

It in no way is trying to compete with unix tools, faithfully reproduce their behavior, etc.

Bots to add types

  • Add typehints
  • Do typehints make sense

Bots to add documentation

  • Add docstrings
  • Convert docstrings to particular format
  • Check if docstrings still match the content of the function
  • Add intra-code comments

Bot to do tests

  • Add doctests
  • Add unit tests
  • Add examples (e.g. if name == main, at end of file)

Work lint issues

  • Run pylint, bandit, mypy, etc, work issues

Code Generation

A code generation tool generates all the OpenAI assistant code

  • jsonschema, as generated from the method signatures
  • toolkit, the python tool invocation code, as generated from the jsonschema
  • a CLI interface in __main__.py that invokes the tool for human testing, not intended for bot use.

JSON

The tools expect chaotic JSON from the bot.

  • truncated JSON
  • Invalid escape codes
  • Function args sometimes are the wrong type, e.g. string instead of list of strings.

All messages returned are JSON, even if the return value is plaintext markdown, it is wrapped in a layer of JSON.

Error messages for the bot also need to be json, so I used RFC-7807 as inspiration.

Toolkit Safety

The bot sometimes attempts to call tools that don't exist, e.g. parallel Sometimes this means the bot knows about a feature not yet known to the openai client. To be safe, you have to whitelist tools that the bot can use. Otherwise it could potentially call a tool by guessing its name.

Mime Types

"Mime types for API". Rather than give the bot two or more formats, I will implement a mimetype, so that the APIs are configured to either return markdown or structure, depending on config. The mime type is still wrapped in JSON! Just less of it.

The bot sometimes fills in mime type when it is submitting a file thinking that the mime type is for classifying input.

Editing

The bot finds it difficult to edit files. For many techniques, it can get close. In chat dialogs, the bot wants to regenerate the whole document to edit even a single character. Or it wants to use ad hoc diffs. This is problematic for programatic use.

Linux-Like Editing Strategies

The bot knows unix tools and often can get really close to using a function that behaves like that tool.

  • ed. Ed is such a bad editor even the bot has hard time with it.
  • edlin/dedlin. Edlin's commands almost can be used as an executable script.
  • patch. git diffs/unidiffs. The bot can sometimes generate a diff. Sometimes the diff is corrupt because it requires careful line counting. Sometimes the bot create a diff for a hallucinated target file.
  • sed. Fancy replace.

Simpler Ad Hoc Tools.

  • replace. This has no equivalent in unix, it is a replacer that doesn't require line numbers.
  • insert. This has no equivalent in unix, it is an inserter that doesn't require line numbers.
  • rewrite. This has no equivalent in unix, it is a tool to rewrite a file, or create a new one, e.g. a corrected copy.

A surprising with all of these approaches is that the bot doesn't check its work (won't run a cat after an edit), and often just assumes everything worked because it didn't get an error message. It assumes that the success message means the user really is happy and the task is done.

Viewing Strategies

  • Cat.
  • Head/Tail.
  • PyCat. Work in progress, view python code either aggregated or compressed.

File Browsing Strategies

  • Ls. Returns directory tree if bot tries to browse a directory that doesn't exist.
  • Grep.
  • Find.
  • Cut. Primitive CSV browser

Safety Tools

  • Git. Particularly to allow for commit/revert
  • Diff tool.

Logging

At least three kinds of logging

  • REST API calls. Each API call to OpenAPI written as JSON document.
  • Tool commands. This is a nearly executable bash script to replay what the bot did.
  • The dialog. This is a chat log style log.

Config

Config is intended to bound the behavior of certain tools, to persist a bot and to enable helper bots.

Helper bots attempt to help the bot use tools by letting specialized bot focus on a narrow part of the problem of using tool.

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

[1.0.4] - 2024-01-20

Added

  • Started tracking changes.
Expand source code
"""
Filesystem shell tools for OpenAI Assistant


.. include:: ../README.md

.. include:: ../docs/DocumentEditing.md

.. include:: ../docs/Features.md

.. include:: ../docs/Manual.md

.. include:: ../docs/UseCases.md

.. include:: ../docs/Design.md

.. include:: ../CHANGELOG.md
"""
from ai_shell.ai_logs.log_to_markdown import DialogLoggerWithMarkdown
from ai_shell.ai_logs.logging_utils import configure_logging
from ai_shell.answer_tool import AnswerCollectorTool
from ai_shell.bot_glue.bot import TaskBot
from ai_shell.cat_tool import CatTool
from ai_shell.cut_tool import CutTool
from ai_shell.ed_tool import EdTool
from ai_shell.edlin_tool import EdlinTool
from ai_shell.externals import pytest_call
from ai_shell.externals.black_call import invoke_black
from ai_shell.externals.pygount_call import count_lines_of_code
from ai_shell.externals.pylint_call import invoke_pylint
from ai_shell.find_tool import FindTool
from ai_shell.git_tool import GitTool
from ai_shell.grep_tool import GrepTool
from ai_shell.head_tail_tool import HeadTailTool
from ai_shell.insert_tool import InsertTool
from ai_shell.ls_tool import LsTool
from ai_shell.openai_toolkit import ToolKit
from ai_shell.openai_tools import ALL_TOOLS, initialize_all_tools, initialize_recommended_tools
from ai_shell.patch_tool import PatchTool
from ai_shell.pycat_tool import PyCatTool
from ai_shell.pytest_tool import PytestTool
from ai_shell.replace_tool import ReplaceTool
from ai_shell.rewrite_tool import RewriteTool
from ai_shell.sed_tool import SedTool
from ai_shell.todo_tool import TodoTool
from ai_shell.token_tool import TokenCounterTool
from ai_shell.utils.config_manager import Config
from ai_shell.utils.cwd_utils import change_directory

__all__ = [
    "CatTool",
    "CutTool",
    "FindTool",
    "GrepTool",
    "HeadTailTool",
    "LsTool",
    "GitTool",
    "TokenCounterTool",
    "PatchTool",
    "RewriteTool",
    "PyCatTool",
    "EdTool",
    "EdlinTool",
    "ToolKit",
    "SedTool",
    "ReplaceTool",
    "InsertTool",
    "TodoTool",
    "AnswerCollectorTool",
    "PytestTool",
    # Tool and general config
    "initialize_all_tools",
    "initialize_recommended_tools",
    "Config",
    "ALL_TOOLS",
    # logging support
    "configure_logging",
    "DialogLoggerWithMarkdown",
    # bot support
    "TaskBot",
    # goal checker tools
    "invoke_pylint",
    "pytest_call",
    "invoke_black",
    "count_lines_of_code",
    # misc that could have been 3rd party
    "change_directory",
]

Sub-modules

ai_shell.ai_logs
ai_shell.answer_tool

Call a tool, but actually the tool is just a way to get a structured response …

ai_shell.backup_restore
ai_shell.bot_glue

Just enough bot code to exercise the tools. Bots are not the focus of this project, but without a bot you can't check if the tools are any good for a …

ai_shell.cat_tool

Cat optimized for AI prompts.

ai_shell.code_generate

Generate code for AI Shell using the docstrings as source data …

ai_shell.cut_tool

AI optimized cut tool …

ai_shell.demo_bots
ai_shell.diff_tool

Apply patch with unidiff instead of Git

ai_shell.ed_tool

Ed is a bad editor, but it is documented …

ai_shell.edlin_tool

Give bot access to an edlin clone …

ai_shell.externals

Wrappers for external CLI tools that require subprocess calls.

ai_shell.find_tool

AI Optimized version of find, but much simpler …

ai_shell.git_tool

Wrapper around GitPython and shell commands to git.

ai_shell.grep_tool

AI optimized grep tool

ai_shell.head_tail_tool

AI optimized head/tail tool

ai_shell.import_plugins
ai_shell.insert_tool

Text editor for simple text insertion at line or context.

ai_shell.ls_tool

Directory listing tool, optimized for AI.

ai_shell.openai_schemas

jsonschema for functions

ai_shell.openai_support

All the tools are optimized for LLMs, but not openai specifically …

ai_shell.openai_toolkit

Generate code, do not edit.

ai_shell.openai_tools

All the tools are optimized for LLMs, but not openai specifically …

ai_shell.patch_tool

Let the bot patch files instead of full rewrite. Also it is an alternative to editing with ed or edlin …

ai_shell.plugins
ai_shell.pycat_tool

Cat, except optimized for python files.

ai_shell.pytest_tool

Optimized for AI version of pytest.

ai_shell.pyutils

When the bot views a file, we might want to save tokens. If it is python file, we can compress it …

ai_shell.read_py_source

Read source with tools that understand the structure of python

ai_shell.regex_tester_tool

The bot needs to be able to test regex expressions before it uses them.

ai_shell.replace_tool

Optimized for AI version of sed. For file editing …

ai_shell.rewrite_tool

For short files with lots of edits, just let the bot rewrite the file …

ai_shell.sed_tool

Optimized for AI version of sed. For file editing …

ai_shell.subbots

Bots that help other bots achieve its goals.

ai_shell.todo_tool

AI Optimized TODO tool. Alternative to JIRA or the like …

ai_shell.token_tool

Token Counting …

ai_shell.utils

Miscellaneous utilities that cluttered up the other namespaces.

Functions

def change_directory(new_path: str) ‑> collections.abc.Iterator[None]

Change the current working directory to a new path.

Args

new_path : str
The new path to change to.
Expand source code
@contextmanager
def change_directory(new_path: str) -> Iterator[None]:
    """Change the current working directory to a new path.

    Args:
        new_path (str): The new path to change to.
    """
    original_directory = os.getcwd()
    try:
        os.chdir(new_path)
        yield None
    finally:
        os.chdir(original_directory)
def configure_logging() ‑> dict[str, typing.Any]

Basic style

Expand source code
def configure_logging() -> dict[str, Any]:
    """Basic style"""
    logging_config: dict[str, Any] = {
        "version": 1,
        "disable_existing_loggers": True,
        "formatters": {
            "standard": {"format": "[%(levelname)s] %(name)s: %(message)s"},
        },
        "handlers": {
            "default": {
                "level": "DEBUG",
                "formatter": "standard",
                "class": "logging.StreamHandler",
                "stream": "ext://sys.stdout",  # Default is stderr
            },
            # "bug_trail": {
            #     "level": "DEBUG",
            #     # "formatter": "standard",
            #     "class": "bug_trail_core.BugTrailHandler",
            #     "db_path": bug_trail_config.database_path,
            #     "minimum_level": logging.DEBUG,
            # },
            # "json": {
            #     # "()": "json_file_handler_factory",
            #     "level": "DEBUG",
            #     "class": "ai_shell.utils.json_log_handler.JSONFileHandler",
            #     "directory": "api_logs",
            #     "module_name": "openai",
            # },
        },
        "loggers": {
            # root logger can capture too much
            "": {  # root logger
                "handlers": ["default", "bug_trail"],
                "level": "DEBUG",
                "propagate": False,
            },
        },
    }

    debug_level_modules: list[str] = ["__main__", "ai_shell", "minimal_example"]

    info_level_modules: list[str] = []
    warn_level_modules: list[str] = []

    # json handler
    for name in ["openai"]:
        logging_config["loggers"][name] = {
            "handlers": [],  # ["json"],
            "level": "DEBUG",
            "propagate": False,
        }

    for name in debug_level_modules:
        logging_config["loggers"][name] = {
            "handlers": [
                "default",
                # "bug_trail"
            ],
            "level": "DEBUG",
            "propagate": False,
        }

    for name in info_level_modules:
        logging_config["loggers"][name] = {
            "handlers": [
                "default",
                # "bug_trail"
            ],
            "level": "INFO",
            "propagate": False,
        }

    for name in warn_level_modules:
        logging_config["loggers"][name] = {
            "handlers": [
                "default",
                # "bug_trail"
            ],
            "level": "WARNING",
            "propagate": False,
        }
    return logging_config
def count_lines_of_code(file_path: str) ‑> pygount.analysis.SourceAnalysis

Check the lines of code in a file. File must exist.

Args

file_path : str
The path to the file.

Returns

SourceAnalysis
The analysis of the file, including line counts.
Expand source code
def count_lines_of_code(file_path: str) -> SourceAnalysis:
    """
    Check the lines of code in a file. File must exist.
    Args:
        file_path (str): The path to the file.

    Returns:
        SourceAnalysis: The analysis of the file, including line counts.
    """
    return SourceAnalysis.from_file(file_path, "pygount", encoding="utf-8")
def initialize_all_tools(skips: Optional[list[str]] = None, keeps: Optional[list[str]] = None) ‑> None

Initialize all tools

Args

skips : Optional[list[str]], optional
Tools to skip. Defaults to None.
keeps : Optional[list[str]], optional
Tools to keep. Defaults to None.
Expand source code
def initialize_all_tools(skips: Optional[list[str]] = None, keeps: Optional[list[str]] = None) -> None:
    """Initialize all tools

    Args:
        skips (Optional[list[str]], optional): Tools to skip. Defaults to None.
        keeps (Optional[list[str]], optional): Tools to keep. Defaults to None.
    """
    if keeps is not None:
        keep = keeps
    elif skips is None:
        keep = just_tool_names()
    else:
        keep = [name for name in just_tool_names() if name not in skips]

    for _ns, tools in _SCHEMAS.items():
        for name, schema in tools.items():
            function_style: dict[str, Union[str, Collection[str]]] = {"name": name}
            parameters = {"type": "object", "properties": schema["properties"], "required": schema["required"]}
            function_style["parameters"] = parameters
            function_style["description"] = schema["description"]
            if name in keep:
                ALL_TOOLS.append(function_style)
    active_tools_string = ", ".join(tool["name"] for tool in ALL_TOOLS)
    logger.info(f"Active tools {active_tools_string}")

Initialize recommended tools

Args

root_folder : str
The root folder to recommend tools for.
config : Config
The developer input that bot shouldn't set.
Expand source code
def initialize_recommended_tools(root_folder: str, config: Config) -> None:
    """Initialize recommended tools

    Args:
        root_folder (str): The root folder to recommend tools for.
        config (Config): The developer input that bot shouldn't set.
    """
    initialize_all_tools(keeps=recommendations(root_folder, config))
def invoke_black(file_path: str) ‑> CommandResult

Runs black on the file or folder. Code 128 means the file is hosed.

Args

file_path : str
The name of the module to run black on.

Returns

CommandResult
The result of the command.
Expand source code
def invoke_black(file_path: str) -> CommandResult:
    """
    Runs black on the file or folder. Code 128 means the file is hosed.

    Args:
        file_path (str): The name of the module to run black on.

    Returns:
        CommandResult: The result of the command.
    """
    command_name = "black"
    arg_string = f"'{file_path}' --check"

    return safe_subprocess(command_name, arg_string)
def invoke_pylint(module_name: str, minimum_score: float) ‑> CommandResult

Runs pylint on the module.

Args

module_name : str
The name of the module to run pylint on.
minimum_score : float
The minimum score to pass.

Returns

CommandResult
The result of the command.
Expand source code
def invoke_pylint(module_name: str, minimum_score: float) -> CommandResult:
    """
    Runs pylint on the module.

    Args:
        module_name (str): The name of the module to run pylint on.
        minimum_score (float): The minimum score to pass.

    Returns:
        CommandResult: The result of the command.
    """
    command_name = "pylint"
    arg_string = f"'{module_name}' --fail-under {minimum_score}"

    # generic response.
    return safe_subprocess(command_name, arg_string)

Classes

class AnswerCollectorTool (root_folder: str, config: Config)

Initialize the PytestTool class.

Args

root_folder : str
The root folder path for file operations. (Not used yet)
config : Config
The developer input that bot shouldn't set.
Expand source code
class AnswerCollectorTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the PytestTool class.

        Args:
            root_folder (str): The root folder path for file operations. (Not used yet)
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.comment: Optional[str] = None
        self.bool_answer: Optional[bool] = None
        self.json_answer: Optional[str] = None
        self.xml_answer: Optional[str] = None
        self.toml_answer: Optional[str] = None
        self.tuple_answer: Optional[tuple] = None
        self.set_answer: Optional[set] = None
        self.text_answer: Optional[str] = None
        self.list_answer: Optional[list[str]] = None
        self.int_answer: Optional[int] = None
        self.float_answer: Optional[float] = None
        self.dict_answer: Optional[dict[str, Any]] = None
        self.response_received = "Response received."

    def _answered(self) -> None:
        """Check if this tool has been used.

        Raises:
            TypeError: If the tool has been used. Recreate a new one after each usage.
        """
        if any(
            [
                self.comment,
                self.bool_answer is not None,
                self.json_answer,
                self.xml_answer,
                self.toml_answer,
                self.tuple_answer,
                self.set_answer,
                self.text_answer,
                self.list_answer,
                self.int_answer,
                self.float_answer,
                self.dict_answer,
            ]
        ):
            raise TypeError("This Answer tool has been used. Please create a new one for another answer.")

    @log()
    def report_list(self, answer: list[str], comment: str = "") -> str:
        """Report answer in list format.

        Args:
            answer (list[str]): The answer to be reported in list format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.list_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_int(self, answer: int, comment: str = "") -> str:
        """Report answer in integer format
        Args:
            answer (int): The answer to be reported in integer format.
            comment (str): Any comments, supplemental info about the answer.


        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.int_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_float(self, answer: float, comment: str = "") -> str:
        """Report answer in string format.

        Args:
            answer (float): The answer to be reported in float format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.float_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_dict(self, answer: dict[str, Any], comment: str = "") -> str:
        """Report answer in dict format.

        Args:
            answer (dict[str, Any]): The answer to be reported in dict format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.dict_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_text(self, answer: str, comment: str = "") -> str:
        """Report answer in string format.

        Args:
            answer (str): The answer to be reported in string format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.text_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_bool(self, answer: bool, comment: str = "") -> str:
        """Report answer in bool format.

        Args:
            answer (bool): The answer to be reported in bool format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.bool_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_tuple(self, answer: tuple, comment: str = "") -> str:
        """Report answer in tuple format.

        Args:
            answer (tuple): The answer to be reported in tuple format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.tuple_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_set(self, answer: set, comment: str = "") -> str:
        """Report answer in set format.

        Args:
            answer (set): The answer to be reported in set format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.set_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_json(self, answer: str, comment: str = "") -> str:
        """Report answer in json format.

        Args:
            answer (str): The answer to be reported in json format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.json_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_xml(self, answer: str, comment: str = "") -> str:
        """Report answer in xml format.

        Args:
            answer (str): The answer to be reported in xml format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.xml_answer = answer
        self.comment = comment
        return self.response_received

    @log()
    def report_toml(self, answer: str, comment: str = "") -> str:
        """Report answer in toml format.

        Args:
            answer (str): The answer to be reported in toml format.
            comment (str): Any comments, supplemental info about the answer.

        Returns:
            str: A string indicating that the response has been received.
        """
        self._answered()
        self.toml_answer = answer
        self.comment = comment
        return self.response_received

Methods

def report_bool(self, answer: bool, comment: str = '') ‑> str

Report answer in bool format.

Args

answer : bool
The answer to be reported in bool format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_bool(self, answer: bool, comment: str = "") -> str:
    """Report answer in bool format.

    Args:
        answer (bool): The answer to be reported in bool format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.bool_answer = answer
    self.comment = comment
    return self.response_received
def report_dict(self, answer: dict[str, typing.Any], comment: str = '') ‑> str

Report answer in dict format.

Args

answer : dict[str, Any]
The answer to be reported in dict format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_dict(self, answer: dict[str, Any], comment: str = "") -> str:
    """Report answer in dict format.

    Args:
        answer (dict[str, Any]): The answer to be reported in dict format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.dict_answer = answer
    self.comment = comment
    return self.response_received
def report_float(self, answer: float, comment: str = '') ‑> str

Report answer in string format.

Args

answer : float
The answer to be reported in float format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_float(self, answer: float, comment: str = "") -> str:
    """Report answer in string format.

    Args:
        answer (float): The answer to be reported in float format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.float_answer = answer
    self.comment = comment
    return self.response_received
def report_int(self, answer: int, comment: str = '') ‑> str

Report answer in integer format

Args

answer : int
The answer to be reported in integer format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_int(self, answer: int, comment: str = "") -> str:
    """Report answer in integer format
    Args:
        answer (int): The answer to be reported in integer format.
        comment (str): Any comments, supplemental info about the answer.


    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.int_answer = answer
    self.comment = comment
    return self.response_received
def report_json(self, answer: str, comment: str = '') ‑> str

Report answer in json format.

Args

answer : str
The answer to be reported in json format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_json(self, answer: str, comment: str = "") -> str:
    """Report answer in json format.

    Args:
        answer (str): The answer to be reported in json format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.json_answer = answer
    self.comment = comment
    return self.response_received
def report_list(self, answer: list[str], comment: str = '') ‑> str

Report answer in list format.

Args

answer : list[str]
The answer to be reported in list format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_list(self, answer: list[str], comment: str = "") -> str:
    """Report answer in list format.

    Args:
        answer (list[str]): The answer to be reported in list format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.list_answer = answer
    self.comment = comment
    return self.response_received
def report_set(self, answer: set, comment: str = '') ‑> str

Report answer in set format.

Args

answer : set
The answer to be reported in set format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_set(self, answer: set, comment: str = "") -> str:
    """Report answer in set format.

    Args:
        answer (set): The answer to be reported in set format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.set_answer = answer
    self.comment = comment
    return self.response_received
def report_text(self, answer: str, comment: str = '') ‑> str

Report answer in string format.

Args

answer : str
The answer to be reported in string format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_text(self, answer: str, comment: str = "") -> str:
    """Report answer in string format.

    Args:
        answer (str): The answer to be reported in string format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.text_answer = answer
    self.comment = comment
    return self.response_received
def report_toml(self, answer: str, comment: str = '') ‑> str

Report answer in toml format.

Args

answer : str
The answer to be reported in toml format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_toml(self, answer: str, comment: str = "") -> str:
    """Report answer in toml format.

    Args:
        answer (str): The answer to be reported in toml format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.toml_answer = answer
    self.comment = comment
    return self.response_received
def report_tuple(self, answer: tuple, comment: str = '') ‑> str

Report answer in tuple format.

Args

answer : tuple
The answer to be reported in tuple format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_tuple(self, answer: tuple, comment: str = "") -> str:
    """Report answer in tuple format.

    Args:
        answer (tuple): The answer to be reported in tuple format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.tuple_answer = answer
    self.comment = comment
    return self.response_received
def report_xml(self, answer: str, comment: str = '') ‑> str

Report answer in xml format.

Args

answer : str
The answer to be reported in xml format.
comment : str
Any comments, supplemental info about the answer.

Returns

str
A string indicating that the response has been received.
Expand source code
@log()
def report_xml(self, answer: str, comment: str = "") -> str:
    """Report answer in xml format.

    Args:
        answer (str): The answer to be reported in xml format.
        comment (str): Any comments, supplemental info about the answer.

    Returns:
        str: A string indicating that the response has been received.
    """
    self._answered()
    self.xml_answer = answer
    self.comment = comment
    return self.response_received
class CatTool (root_folder: str, config: Config)

Simulates cat cli tool.

Initialize the CatTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class CatTool:
    """
    Simulates `cat` cli tool.
    """

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the CatTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config

    @log()
    def cat_markdown(
        self,
        file_paths: list[str],
        number_lines: bool = True,
        squeeze_blank: bool = False,
    ) -> str:
        """
        Concatenates the content of given file paths and formats them as markdown.

        Args:
            file_paths (list[str]): List of file paths to concatenate.
            number_lines (bool): If True, number all output lines.
            squeeze_blank (bool): If True, consecutive blank lines are squeezed to one.

        Returns:
            str: The concatenated and formatted content as a string.
        """
        output = StringIO()
        for line in self.cat(file_paths, number_lines, squeeze_blank):
            output.write(line)
            # output.write("\n")
        output.seek(0)
        return output.read()

    @log()
    def cat(
        self,
        file_paths: list[str],
        number_lines: bool = True,
        squeeze_blank: bool = False,
    ) -> Generator[str, None, None]:
        """
        Mimics the basic functionalities of the 'cat' command in Unix.

        Args:
            file_paths (list[str]): A list of file paths to concatenate.
            number_lines (bool): If True, number all output lines.
            squeeze_blank (bool): If True, consecutive blank lines are squeezed to one.

        Returns:
            Generator[str, None, None]

        Yields:
            str: Each line of the concatenated files.
        """
        file_paths = convert_to_list(file_paths)
        for location, file_path in enumerate(file_paths):
            if file_path.startswith("./"):
                file_paths[location] = file_path[2:]

        logger.info(f"cat --file_paths {file_paths} " f"--number_lines {number_lines} --squeeze_blank {squeeze_blank}")
        for file_path in file_paths:
            if not is_file_in_root_folder(file_path, self.root_folder):
                raise TypeError("No parent folder traversals allowed")

        line_number = 1
        for glob_pattern in file_paths:
            for file_path in safe_glob(glob_pattern, self.root_folder):
                if not os.path.isabs(file_path):
                    file_path = self.root_folder + "/" + file_path
                try:
                    with open(file_path, "rb") as file:
                        for line in self._process_cat_file(file, line_number, number_lines, squeeze_blank):
                            yield line
                            line_number += 1
                except PermissionError:
                    logger.warning(f"Permission denied: {file_path}, suppressing from output.")

    def _process_cat_file(
        self,
        file: IO[bytes],
        line_number: int,
        number_lines: bool,
        squeeze_blank: bool,
    ) -> Generator[str, None, None]:
        """
        Processes a file for concatenation, applying the specified formatting.

        Args:
            file: The file object to process.
            line_number (int): Current line number for numbering lines.
            number_lines (bool): If True, number all output lines.
            squeeze_blank (bool): If True, consecutive blank lines are squeezed to one.

        Returns:
            Generator[str, None, None]: A generator of processed lines.

        Yields:
            str: Each processed line of the file.
        """
        was_blank = False
        for byte_lines in file:
            # if isinstance(byte_lines, bytes):
            line = byte_lines.decode("utf-8")  # Decode bytes to string

            # Use StringIO for memory-efficient line processing
            with StringIO() as line_buffer:
                # Normalize line endings to \n
                line = line.replace("\r\n", "\n")
                line_buffer.write(line)

                if squeeze_blank and was_blank and line.strip() == "":
                    continue  # Skip consecutive blank lines

                was_blank = line.strip() == ""

                if number_lines:
                    line_buffer.seek(0)
                    line = f"{line_number}\t{line_buffer.read()}"
                    line_number += 1
                else:
                    line = line_buffer.getvalue()

                yield line

Methods

def cat(self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False) ‑> collections.abc.Generator[str, None, None]

Mimics the basic functionalities of the 'cat' command in Unix.

Args

file_paths : list[str]
A list of file paths to concatenate.
number_lines : bool
If True, number all output lines.
squeeze_blank : bool
If True, consecutive blank lines are squeezed to one.

Returns

Generator[str, None, None]

Yields

str
Each line of the concatenated files.
Expand source code
@log()
def cat(
    self,
    file_paths: list[str],
    number_lines: bool = True,
    squeeze_blank: bool = False,
) -> Generator[str, None, None]:
    """
    Mimics the basic functionalities of the 'cat' command in Unix.

    Args:
        file_paths (list[str]): A list of file paths to concatenate.
        number_lines (bool): If True, number all output lines.
        squeeze_blank (bool): If True, consecutive blank lines are squeezed to one.

    Returns:
        Generator[str, None, None]

    Yields:
        str: Each line of the concatenated files.
    """
    file_paths = convert_to_list(file_paths)
    for location, file_path in enumerate(file_paths):
        if file_path.startswith("./"):
            file_paths[location] = file_path[2:]

    logger.info(f"cat --file_paths {file_paths} " f"--number_lines {number_lines} --squeeze_blank {squeeze_blank}")
    for file_path in file_paths:
        if not is_file_in_root_folder(file_path, self.root_folder):
            raise TypeError("No parent folder traversals allowed")

    line_number = 1
    for glob_pattern in file_paths:
        for file_path in safe_glob(glob_pattern, self.root_folder):
            if not os.path.isabs(file_path):
                file_path = self.root_folder + "/" + file_path
            try:
                with open(file_path, "rb") as file:
                    for line in self._process_cat_file(file, line_number, number_lines, squeeze_blank):
                        yield line
                        line_number += 1
            except PermissionError:
                logger.warning(f"Permission denied: {file_path}, suppressing from output.")
def cat_markdown(self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False) ‑> str

Concatenates the content of given file paths and formats them as markdown.

Args

file_paths : list[str]
List of file paths to concatenate.
number_lines : bool
If True, number all output lines.
squeeze_blank : bool
If True, consecutive blank lines are squeezed to one.

Returns

str
The concatenated and formatted content as a string.
Expand source code
@log()
def cat_markdown(
    self,
    file_paths: list[str],
    number_lines: bool = True,
    squeeze_blank: bool = False,
) -> str:
    """
    Concatenates the content of given file paths and formats them as markdown.

    Args:
        file_paths (list[str]): List of file paths to concatenate.
        number_lines (bool): If True, number all output lines.
        squeeze_blank (bool): If True, consecutive blank lines are squeezed to one.

    Returns:
        str: The concatenated and formatted content as a string.
    """
    output = StringIO()
    for line in self.cat(file_paths, number_lines, squeeze_blank):
        output.write(line)
        # output.write("\n")
    output.seek(0)
    return output.read()
class Config (config_path: str = '')

A class for managing the ai_shell.toml file.

This is for globally available things that shouldn't or can't be set by the bot.

Initialize the Config class.

Expand source code
class Config:
    """A class for managing the ai_shell.toml file.

    This is for globally available things that shouldn't or can't be set by the bot.
    """

    def __init__(self, config_path: str = "") -> None:
        """Initialize the Config class."""
        if config_path and config_path.endswith(".toml"):
            self.config_file = config_path
        elif config_path:
            self.config_file = os.path.join(config_path, "ai_shell.toml")
        else:
            self.config_file = os.getenv("CONFIG_PATH", "ai_shell.toml")
        # freeze the location of the config file
        self.config_file = os.path.abspath(self.config_file)
        self._list_data: dict[str, list[str]] = {}
        self._values_data: dict[str, str] = {}
        self._flags_data: dict[str, bool] = {
            "enable_tool_selector_bot": False,
            "enable_regex_tester_bot": False,
            "enable_prompt_improver_bot": False,
            "enable_dialog_log": False,
            "enable_shell_log": False,
            "enable_api_log": False,
            "enable_autocat": True,
        }
        self._bots_data: list = []
        self.load_config()

    def load_config(self) -> None:
        """Load the config from the config file."""
        if os.path.isfile(self.config_file):
            data = toml.load(self.config_file)
            self._flags_data = data["flags"]
            self._bots_data = data["bots"]
            self._values_data = data["values"]
            self._list_data = data["lists"]
        else:
            self.save_config()
        if len(self._bots_data) > 100:
            raise ValueError(
                f"You have too many bots. Bot persistence must be failing somewhere, see {self.config_file}"
            )

    def save_config(self):
        """Save the config to the config file."""
        if not os.path.isabs(self.config_file):
            raise ValueError("Config file path must be absolute.")
        with open(self.config_file, "w", encoding="utf-8") as f:
            toml.dump(
                {
                    "flags": self._flags_data,
                    "bots": self._bots_data,
                    "values": self._values_data,
                    "lists": self._list_data,
                },
                f,
            )

    def add_bot(self, assistant_id: str, name: str) -> None:
        """Add a bot to the config.
        Args:
            assistant_id (str): The ID of the bot.
            name (str): The name of the bot.
        """
        bot = Bot(assistant_id, name)
        self._bots_data.append(dataclasses.asdict(bot))
        self.save_config()

    def set_flag(self, flag_name: str, value: bool) -> None:
        """Set the value of the given flag.
        Args:
            flag_name (str): The name of the flag.
            value (str): The value of the flag.
        """
        self._flags_data[flag_name] = value
        self.save_config()

    def cleanup(self) -> None:
        """Remove bots that have been deleted on OpenAI's side."""
        openai.api_key = os.getenv("OPENAI_API_KEY")
        existing_bots = openai.beta.assistants.list()
        assistant_ids = [bot.id for bot in existing_bots.data]

        # Remove bots that no longer exist in OpenAI
        self._bots_data = [bot for bot in self._bots_data if bot["assistant_id"] in assistant_ids]
        self.save_config()

    def get_bots(self) -> list[Bot]:
        """Return a list of Bot objects."""
        return [Bot(**bot) for bot in self._bots_data]

    def get_bot(self, name: str) -> Optional[Bot]:
        """Return a Bot object with the given name.
        Args:
            name (str): The name of the bot.

        Returns:
            Optional[Bot]: The bot with the given name, or None if no bot with that name exists.
        """
        for bot in self._bots_data:
            if bot["name"] == name:
                return Bot(**bot)
        return None

    def get_flag(self, flag_name: str, default_value: Optional[bool] = None) -> Optional[bool]:
        """Return the value of the given flag.
        Args:
            flag_name (str): The name of the flag.
            default_value (Optional[bool], optional): The default value to return if the flag does not exist.
                                                      Defaults to None.

        Returns:
            Optional[bool]: The value of the flag, or None if the flag does not exist.
        """
        return self._flags_data.get(flag_name, default_value)

    def get_value(self, name: str, default: Optional[str] = None) -> Optional[str]:
        """Return the value of the given named value.

        Args:
            name (str): The name of the config value.
            default (Optional[str], optional): The default value to return if the value does not exist.
                                               Defaults to None.

        Returns:
            Optional[str]: The value of the named value, or None if the name does not exist.
        """
        return self._values_data.get(name, default)

    def set_list(self, list_name: str, value: list[str]) -> None:
        """Set the value of the given list of values.

        Args:
            list_name (str): The name of the config value.
            value (list[str]): The value of the list.
        """
        self._list_data[list_name] = value
        self.save_config()

    def get_required_value(self, name: str) -> str:
        """Return the value of the given named value.

        Args:
            name (str): The name of the config value.

        Returns:
            str: The value.

        Raises:
            FatalConfigurationError: If the value does not exist.
        """
        value = self._values_data.get(name, None)

        if value is None:
            raise FatalConfigurationError(f"Need {name} in config file")
        return value

    def get_list(self, list_name: str) -> list[str]:
        """Return the value of the given list of values.

        Args:
            list_name (str): The name of the config value.

        Returns:
            list[str]: The list from the config file.
        """
        return self._list_data.get(list_name, [])

Methods

def add_bot(self, assistant_id: str, name: str) ‑> None

Add a bot to the config.

Args

assistant_id : str
The ID of the bot.
name : str
The name of the bot.
Expand source code
def add_bot(self, assistant_id: str, name: str) -> None:
    """Add a bot to the config.
    Args:
        assistant_id (str): The ID of the bot.
        name (str): The name of the bot.
    """
    bot = Bot(assistant_id, name)
    self._bots_data.append(dataclasses.asdict(bot))
    self.save_config()
def cleanup(self) ‑> None

Remove bots that have been deleted on OpenAI's side.

Expand source code
def cleanup(self) -> None:
    """Remove bots that have been deleted on OpenAI's side."""
    openai.api_key = os.getenv("OPENAI_API_KEY")
    existing_bots = openai.beta.assistants.list()
    assistant_ids = [bot.id for bot in existing_bots.data]

    # Remove bots that no longer exist in OpenAI
    self._bots_data = [bot for bot in self._bots_data if bot["assistant_id"] in assistant_ids]
    self.save_config()
def get_bot(self, name: str) ‑> Optional[Bot]

Return a Bot object with the given name.

Args

name : str
The name of the bot.

Returns

Optional[Bot]
The bot with the given name, or None if no bot with that name exists.
Expand source code
def get_bot(self, name: str) -> Optional[Bot]:
    """Return a Bot object with the given name.
    Args:
        name (str): The name of the bot.

    Returns:
        Optional[Bot]: The bot with the given name, or None if no bot with that name exists.
    """
    for bot in self._bots_data:
        if bot["name"] == name:
            return Bot(**bot)
    return None
def get_bots(self) ‑> list[Bot]

Return a list of Bot objects.

Expand source code
def get_bots(self) -> list[Bot]:
    """Return a list of Bot objects."""
    return [Bot(**bot) for bot in self._bots_data]
def get_flag(self, flag_name: str, default_value: Optional[bool] = None) ‑> Optional[bool]

Return the value of the given flag.

Args

flag_name : str
The name of the flag.
default_value : Optional[bool], optional
The default value to return if the flag does not exist. Defaults to None.

Returns

Optional[bool]
The value of the flag, or None if the flag does not exist.
Expand source code
def get_flag(self, flag_name: str, default_value: Optional[bool] = None) -> Optional[bool]:
    """Return the value of the given flag.
    Args:
        flag_name (str): The name of the flag.
        default_value (Optional[bool], optional): The default value to return if the flag does not exist.
                                                  Defaults to None.

    Returns:
        Optional[bool]: The value of the flag, or None if the flag does not exist.
    """
    return self._flags_data.get(flag_name, default_value)
def get_list(self, list_name: str) ‑> list[str]

Return the value of the given list of values.

Args

list_name : str
The name of the config value.

Returns

list[str]
The list from the config file.
Expand source code
def get_list(self, list_name: str) -> list[str]:
    """Return the value of the given list of values.

    Args:
        list_name (str): The name of the config value.

    Returns:
        list[str]: The list from the config file.
    """
    return self._list_data.get(list_name, [])
def get_required_value(self, name: str) ‑> str

Return the value of the given named value.

Args

name : str
The name of the config value.

Returns

str
The value.

Raises

FatalConfigurationError
If the value does not exist.
Expand source code
def get_required_value(self, name: str) -> str:
    """Return the value of the given named value.

    Args:
        name (str): The name of the config value.

    Returns:
        str: The value.

    Raises:
        FatalConfigurationError: If the value does not exist.
    """
    value = self._values_data.get(name, None)

    if value is None:
        raise FatalConfigurationError(f"Need {name} in config file")
    return value
def get_value(self, name: str, default: Optional[str] = None) ‑> Optional[str]

Return the value of the given named value.

Args

name : str
The name of the config value.
default : Optional[str], optional
The default value to return if the value does not exist. Defaults to None.

Returns

Optional[str]
The value of the named value, or None if the name does not exist.
Expand source code
def get_value(self, name: str, default: Optional[str] = None) -> Optional[str]:
    """Return the value of the given named value.

    Args:
        name (str): The name of the config value.
        default (Optional[str], optional): The default value to return if the value does not exist.
                                           Defaults to None.

    Returns:
        Optional[str]: The value of the named value, or None if the name does not exist.
    """
    return self._values_data.get(name, default)
def load_config(self) ‑> None

Load the config from the config file.

Expand source code
def load_config(self) -> None:
    """Load the config from the config file."""
    if os.path.isfile(self.config_file):
        data = toml.load(self.config_file)
        self._flags_data = data["flags"]
        self._bots_data = data["bots"]
        self._values_data = data["values"]
        self._list_data = data["lists"]
    else:
        self.save_config()
    if len(self._bots_data) > 100:
        raise ValueError(
            f"You have too many bots. Bot persistence must be failing somewhere, see {self.config_file}"
        )
def save_config(self)

Save the config to the config file.

Expand source code
def save_config(self):
    """Save the config to the config file."""
    if not os.path.isabs(self.config_file):
        raise ValueError("Config file path must be absolute.")
    with open(self.config_file, "w", encoding="utf-8") as f:
        toml.dump(
            {
                "flags": self._flags_data,
                "bots": self._bots_data,
                "values": self._values_data,
                "lists": self._list_data,
            },
            f,
        )
def set_flag(self, flag_name: str, value: bool) ‑> None

Set the value of the given flag.

Args

flag_name : str
The name of the flag.
value : str
The value of the flag.
Expand source code
def set_flag(self, flag_name: str, value: bool) -> None:
    """Set the value of the given flag.
    Args:
        flag_name (str): The name of the flag.
        value (str): The value of the flag.
    """
    self._flags_data[flag_name] = value
    self.save_config()
def set_list(self, list_name: str, value: list[str]) ‑> None

Set the value of the given list of values.

Args

list_name : str
The name of the config value.
value : list[str]
The value of the list.
Expand source code
def set_list(self, list_name: str, value: list[str]) -> None:
    """Set the value of the given list of values.

    Args:
        list_name (str): The name of the config value.
        value (list[str]): The value of the list.
    """
    self._list_data[list_name] = value
    self.save_config()
class CutTool (root_folder: str, config: Config)

Simulates cut cli tool.

Initialize the CatTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
@dataclasses.dataclass
class CutTool:
    """
    Simulates `cut` cli tool.
    """

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the CatTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    @log()
    def cut_characters(self, file_path: str, character_ranges: str) -> str:
        """Reads a file and extracts characters based on specified ranges.

        Args:
            file_path: The name of the file to process.
            character_ranges: A string representing character ranges, e.g., "1-5,10".

        Returns:
            A string containing the selected characters from the file.
        """
        if not is_file_in_root_folder(file_path, self.root_folder):
            raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.")
        ranges = parse_ranges(character_ranges)
        output = io.StringIO()

        try:
            with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
                for line in file:
                    for i, char in enumerate(line, start=1):
                        if is_in_ranges(i, ranges):
                            output.write(char)

                    # Optionally add a newline character after each line
                    output.write("\n")
        except FileNotFoundError:
            tree_text = tree(Path(os.getcwd()))
            markdown_content = f"# File {file_path} not found. Here are all the files you can see\n\n{tree_text}"
            return markdown_content

        return output.getvalue()

    @log()
    def cut_fields(self, filename: str, field_ranges: str, delimiter: str = ",") -> str:
        """Reads a file and extracts fields based on specified ranges using the given delimiter.

        Args:
            filename: The name of the file to process.
            field_ranges: A string representing field ranges, e.g., "1-3,5".
            delimiter: A single character used as the field delimiter.

        Returns:
            A string containing the selected fields from the file.
        """
        if not is_file_in_root_folder(filename, self.root_folder):
            raise ValueError(f"File {filename} is not in root folder {self.root_folder}.")
        ranges = parse_ranges(field_ranges)
        output = io.StringIO()
        try:
            with open(filename, encoding="utf-8", errors=self.utf8_errors) as file:
                reader = csv.reader(file, delimiter=delimiter)

                for row in reader:
                    selected_fields = [field for i, field in enumerate(row, start=1) if is_in_ranges(i, ranges)]
                    output.write(delimiter.join(selected_fields) + "\n")
        except FileNotFoundError:
            # Host app should always have cwd == root dir.
            tree_text = tree(Path(os.getcwd()))
            markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}"
            return markdown_content

        return output.getvalue()

    @log()
    def cut_fields_by_name(self, filename: str, field_names: list[str], delimiter: str = ",") -> str:
        """Reads a file and extracts fields based on specified field names using the given delimiter.

        Args:
            filename(str): The name of the file to process.
            field_names(list[str]): A list of field names to extract.
            delimiter(str): A single character used as the field delimiter.

        Returns:
            A string containing the selected fields from the file.
        """
        if not is_file_in_root_folder(filename, self.root_folder):
            raise ValueError(f"File {filename} is not in root folder {self.root_folder}.")
        output = io.StringIO()

        try:
            with open(filename, encoding="utf-8", errors=self.utf8_errors) as file:
                reader = csv.DictReader(file, delimiter=delimiter)
                # field_indices = {field: i for i, field in enumerate(reader.fieldnames)}

                for row in reader:
                    selected_fields = [row[field] for field in field_names if field in row]
                    output.write(delimiter.join(selected_fields) + "\n")
        except FileNotFoundError:
            tree_text = tree(Path(os.getcwd()))
            markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}"
            return markdown_content

        return output.getvalue()

Methods

def cut_characters(self, file_path: str, character_ranges: str) ‑> str

Reads a file and extracts characters based on specified ranges.

Args

file_path
The name of the file to process.
character_ranges
A string representing character ranges, e.g., "1-5,10".

Returns

A string containing the selected characters from the file.

Expand source code
@log()
def cut_characters(self, file_path: str, character_ranges: str) -> str:
    """Reads a file and extracts characters based on specified ranges.

    Args:
        file_path: The name of the file to process.
        character_ranges: A string representing character ranges, e.g., "1-5,10".

    Returns:
        A string containing the selected characters from the file.
    """
    if not is_file_in_root_folder(file_path, self.root_folder):
        raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.")
    ranges = parse_ranges(character_ranges)
    output = io.StringIO()

    try:
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            for line in file:
                for i, char in enumerate(line, start=1):
                    if is_in_ranges(i, ranges):
                        output.write(char)

                # Optionally add a newline character after each line
                output.write("\n")
    except FileNotFoundError:
        tree_text = tree(Path(os.getcwd()))
        markdown_content = f"# File {file_path} not found. Here are all the files you can see\n\n{tree_text}"
        return markdown_content

    return output.getvalue()
def cut_fields(self, filename: str, field_ranges: str, delimiter: str = ',') ‑> str

Reads a file and extracts fields based on specified ranges using the given delimiter.

Args

filename
The name of the file to process.
field_ranges
A string representing field ranges, e.g., "1-3,5".
delimiter
A single character used as the field delimiter.

Returns

A string containing the selected fields from the file.

Expand source code
@log()
def cut_fields(self, filename: str, field_ranges: str, delimiter: str = ",") -> str:
    """Reads a file and extracts fields based on specified ranges using the given delimiter.

    Args:
        filename: The name of the file to process.
        field_ranges: A string representing field ranges, e.g., "1-3,5".
        delimiter: A single character used as the field delimiter.

    Returns:
        A string containing the selected fields from the file.
    """
    if not is_file_in_root_folder(filename, self.root_folder):
        raise ValueError(f"File {filename} is not in root folder {self.root_folder}.")
    ranges = parse_ranges(field_ranges)
    output = io.StringIO()
    try:
        with open(filename, encoding="utf-8", errors=self.utf8_errors) as file:
            reader = csv.reader(file, delimiter=delimiter)

            for row in reader:
                selected_fields = [field for i, field in enumerate(row, start=1) if is_in_ranges(i, ranges)]
                output.write(delimiter.join(selected_fields) + "\n")
    except FileNotFoundError:
        # Host app should always have cwd == root dir.
        tree_text = tree(Path(os.getcwd()))
        markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}"
        return markdown_content

    return output.getvalue()
def cut_fields_by_name(self, filename: str, field_names: list[str], delimiter: str = ',') ‑> str

Reads a file and extracts fields based on specified field names using the given delimiter.

Args

filename(str): The name of the file to process. field_names(list[str]): A list of field names to extract. delimiter(str): A single character used as the field delimiter.

Returns

A string containing the selected fields from the file.

Expand source code
@log()
def cut_fields_by_name(self, filename: str, field_names: list[str], delimiter: str = ",") -> str:
    """Reads a file and extracts fields based on specified field names using the given delimiter.

    Args:
        filename(str): The name of the file to process.
        field_names(list[str]): A list of field names to extract.
        delimiter(str): A single character used as the field delimiter.

    Returns:
        A string containing the selected fields from the file.
    """
    if not is_file_in_root_folder(filename, self.root_folder):
        raise ValueError(f"File {filename} is not in root folder {self.root_folder}.")
    output = io.StringIO()

    try:
        with open(filename, encoding="utf-8", errors=self.utf8_errors) as file:
            reader = csv.DictReader(file, delimiter=delimiter)
            # field_indices = {field: i for i, field in enumerate(reader.fieldnames)}

            for row in reader:
                selected_fields = [row[field] for field in field_names if field in row]
                output.write(delimiter.join(selected_fields) + "\n")
    except FileNotFoundError:
        tree_text = tree(Path(os.getcwd()))
        markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}"
        return markdown_content

    return output.getvalue()
class DialogLoggerWithMarkdown (base_directory: str)

A class for logging dialog interactions in Markdown format.

Attributes

bot_name : str
Name of the bot.
model : str
Model used by the bot.
bot_instructions : str
Instructions or description for the bot.
base_directory : str
Base directory for storing log files.

Methods

write_header: Writes the header information to the log file. add_user: Logs the user's input text. add_bot: Logs the bot's response text. add_toolkit: Logs the tools used in the dialog. add_tool: Logs a single tool and its arguments used in the dialog. add_tool_result: Logs the results from a tool. add_error: Logs an error that occurred during the dialog. ensure_log: Context manager to ensure the log file is closed properly.

Initializes the DialogLoggerWithMarkdown object.

Args

base_directory : str, optional
Base directory for storing log files. Defaults to LOG_FOLDER if not provided.
Expand source code
class DialogLoggerWithMarkdown:
    """
    A class for logging dialog interactions in Markdown format.

    Attributes:
        bot_name (str): Name of the bot.
        model (str): Model used by the bot.
        bot_instructions (str): Instructions or description for the bot.
        base_directory (str): Base directory for storing log files.

    Methods:
        write_header: Writes the header information to the log file.
        add_user: Logs the user's input text.
        add_bot: Logs the bot's response text.
        add_toolkit: Logs the tools used in the dialog.
        add_tool: Logs a single tool and its arguments used in the dialog.
        add_tool_result: Logs the results from a tool.
        add_error: Logs an error that occurred during the dialog.
        ensure_log: Context manager to ensure the log file is closed properly.
    """

    def __init__(self, base_directory: str) -> None:
        """
        Initializes the DialogLoggerWithMarkdown object.

        Args:
            base_directory (str, optional): Base directory for storing log files. Defaults to LOG_FOLDER if not provided.
        """
        if not base_directory:
            raise ValueError("base_directory must be provided.")
        os.makedirs(base_directory, exist_ok=True)
        self.bot_name: Optional[str] = None
        self.model: Optional[str] = None
        self.bot_instructions: Optional[str] = None
        self.base_directory = base_directory
        log_files = [f for f in os.listdir(self.base_directory) if f.endswith(".md")]
        log_number = len(log_files) + 1
        self.log_file_path = os.path.join(self.base_directory, f"dialog_{log_number}.md")
        os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True)

        # Context manager handles this, I think.
        # pylint: disable=consider-using-with
        self.log_file = open(self.log_file_path, "a", buffering=1, encoding="utf-8")
        self.pending_tools: dict[str, dict[str, str]] = {}

    def write_header(self, bot_name: str, model: str, bot_instructions: str) -> None:
        """
        Writes the header information to the log file.

        Args:
            bot_name (str): Name of the bot.
            model (str): Model used by the bot.
            bot_instructions (str): Instructions or description for the bot.
        """
        self.bot_name = bot_name
        self.model = model
        self.bot_instructions = bot_instructions
        header = f"# Bot Name: {self.bot_name}\n## Model: {self.model}\n### Instructions: {self.bot_instructions}\n\n"
        self.log_file.write(header)
        logger.info(header)

    def add_user(self, text: str) -> None:
        """
        Logs the user's input text.

        Args:
            text (str): The text input by the user.
        """
        users_message = f"**User**: {text}"
        self.log_file.write(f"{users_message}\n\n")
        logger.info(users_message)

    def add_bot(self, text: str) -> None:
        """
        Logs the bot's response text.

        Args:
            text (str): The text response from the bot.
        """
        bots_message = f"**Bot**: {text}"
        self.log_file.write(f"{bots_message}\n\n")
        logger.info(bots_message)

    def add_toolkit(self, tools: list[str]) -> None:
        """
        Logs the tools used in the dialog.

        Args:
            tools (List[str]): A list of tool names used in the dialog.
        """
        toolkit_str = "\n- ".join([f"`{tool}`" for tool in tools])
        toolkit_message = f"**Toolkit**: \n\n- {toolkit_str}"
        self.log_file.write(f"{toolkit_message}\n\n")
        logger.info(toolkit_message.replace("\n", ""))

    def add_tool(self, tool_call_id: str, tool_name: str, tool_args: str) -> None:
        """
        Logs a single tool and its arguments used in the dialog.

        Args:
            tool_call_id (str): The unique identifier for the tool call.
            tool_name (str): The name of the tool.
            tool_args (str): The arguments passed to the tool, in JSON string format.
        """
        bot_wants = f"Bot wants to use `{tool_name}`\n"
        self.log_file.write(bot_wants)
        logger.info(bot_wants)
        try:
            json_bits = json.loads(tool_args)
        except BaseException:
            self.log_file.write(f"Bot gave us Bad JSON: {tool_args}")
            self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": tool_args}
            return
        args_lines: list[str] = []
        for name, value in json_bits.items():
            if value is not None:
                pair = f"{name} : {value}"
                args_lines.append(f" - {pair}\n")
                # logger.info(pair)
        self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": "\n".join(args_lines)}

    def add_tool_result(self, tool_results: list[dict[str, Any]]) -> None:
        """
        Logs the results from a tool.

        Args:
            tool_results (List[Dict[str, Any]]): A list of dictionaries containing the tool results.
        """
        # result should always be of the same dict type
        # tool_result = {"tool_call_id": tool_call.id, "output": json_result}
        for result in tool_results:
            self.log_file.write("### Result\n\n")
            self.log_file.write(f"Tool call Id: {result['tool_call_id']}\n")
            self.log_file.write(f"Tool name: {self.pending_tools[result['tool_call_id']]['tool_name']}\n")
            self.log_file.write(f"Tool args:\n {self.pending_tools[result['tool_call_id']]['tool_args']}\n")
            del self.pending_tools["tool_call_id"]
            json_string = result["output"]
            # json.loads here should work, it isn't bot-json
            any_type = json.loads(json_string)
            if isinstance(any_type, dict):
                if "type" in any_type and "title" in any_type and "status" in any_type and "detail" in any_type:
                    self.log_file.write(f"ERROR: {any_type['type']} : {any_type['detail']}\n")
                else:
                    for key, value in any_type.items():
                        self.log_file.write(f" - {key} : {value}\n")
            else:
                self.log_file.write(f"{any_type}\n")

    def add_error(self, error: Exception) -> None:
        """
        Logs an error that occurred during the dialog.

        Args:
            error (Exception): The exception that was raised.
        """
        error_message = f"**Error**: {error}"
        self.log_file.write(f"{error_message}\n\n")
        logger.error(error_message)

    @contextmanager
    def ensure_log(self) -> Iterator[None]:
        """
        A context manager to ensure that the log file is closed properly.

        Yields:
            None: Yields control to the block of code using this context manager.

        Ensures:
            The log file is closed properly upon exiting the block of code.
        """
        try:
            yield
        finally:
            self.log_file.close()

Methods

def add_bot(self, text: str) ‑> None

Logs the bot's response text.

Args

text : str
The text response from the bot.
Expand source code
def add_bot(self, text: str) -> None:
    """
    Logs the bot's response text.

    Args:
        text (str): The text response from the bot.
    """
    bots_message = f"**Bot**: {text}"
    self.log_file.write(f"{bots_message}\n\n")
    logger.info(bots_message)
def add_error(self, error: Exception) ‑> None

Logs an error that occurred during the dialog.

Args

error : Exception
The exception that was raised.
Expand source code
def add_error(self, error: Exception) -> None:
    """
    Logs an error that occurred during the dialog.

    Args:
        error (Exception): The exception that was raised.
    """
    error_message = f"**Error**: {error}"
    self.log_file.write(f"{error_message}\n\n")
    logger.error(error_message)
def add_tool(self, tool_call_id: str, tool_name: str, tool_args: str) ‑> None

Logs a single tool and its arguments used in the dialog.

Args

tool_call_id : str
The unique identifier for the tool call.
tool_name : str
The name of the tool.
tool_args : str
The arguments passed to the tool, in JSON string format.
Expand source code
def add_tool(self, tool_call_id: str, tool_name: str, tool_args: str) -> None:
    """
    Logs a single tool and its arguments used in the dialog.

    Args:
        tool_call_id (str): The unique identifier for the tool call.
        tool_name (str): The name of the tool.
        tool_args (str): The arguments passed to the tool, in JSON string format.
    """
    bot_wants = f"Bot wants to use `{tool_name}`\n"
    self.log_file.write(bot_wants)
    logger.info(bot_wants)
    try:
        json_bits = json.loads(tool_args)
    except BaseException:
        self.log_file.write(f"Bot gave us Bad JSON: {tool_args}")
        self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": tool_args}
        return
    args_lines: list[str] = []
    for name, value in json_bits.items():
        if value is not None:
            pair = f"{name} : {value}"
            args_lines.append(f" - {pair}\n")
            # logger.info(pair)
    self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": "\n".join(args_lines)}
def add_tool_result(self, tool_results: list[dict[str, typing.Any]]) ‑> None

Logs the results from a tool.

Args

tool_results : List[Dict[str, Any]]
A list of dictionaries containing the tool results.
Expand source code
def add_tool_result(self, tool_results: list[dict[str, Any]]) -> None:
    """
    Logs the results from a tool.

    Args:
        tool_results (List[Dict[str, Any]]): A list of dictionaries containing the tool results.
    """
    # result should always be of the same dict type
    # tool_result = {"tool_call_id": tool_call.id, "output": json_result}
    for result in tool_results:
        self.log_file.write("### Result\n\n")
        self.log_file.write(f"Tool call Id: {result['tool_call_id']}\n")
        self.log_file.write(f"Tool name: {self.pending_tools[result['tool_call_id']]['tool_name']}\n")
        self.log_file.write(f"Tool args:\n {self.pending_tools[result['tool_call_id']]['tool_args']}\n")
        del self.pending_tools["tool_call_id"]
        json_string = result["output"]
        # json.loads here should work, it isn't bot-json
        any_type = json.loads(json_string)
        if isinstance(any_type, dict):
            if "type" in any_type and "title" in any_type and "status" in any_type and "detail" in any_type:
                self.log_file.write(f"ERROR: {any_type['type']} : {any_type['detail']}\n")
            else:
                for key, value in any_type.items():
                    self.log_file.write(f" - {key} : {value}\n")
        else:
            self.log_file.write(f"{any_type}\n")
def add_toolkit(self, tools: list[str]) ‑> None

Logs the tools used in the dialog.

Args

tools : List[str]
A list of tool names used in the dialog.
Expand source code
def add_toolkit(self, tools: list[str]) -> None:
    """
    Logs the tools used in the dialog.

    Args:
        tools (List[str]): A list of tool names used in the dialog.
    """
    toolkit_str = "\n- ".join([f"`{tool}`" for tool in tools])
    toolkit_message = f"**Toolkit**: \n\n- {toolkit_str}"
    self.log_file.write(f"{toolkit_message}\n\n")
    logger.info(toolkit_message.replace("\n", ""))
def add_user(self, text: str) ‑> None

Logs the user's input text.

Args

text : str
The text input by the user.
Expand source code
def add_user(self, text: str) -> None:
    """
    Logs the user's input text.

    Args:
        text (str): The text input by the user.
    """
    users_message = f"**User**: {text}"
    self.log_file.write(f"{users_message}\n\n")
    logger.info(users_message)
def ensure_log(self) ‑> collections.abc.Iterator[None]

A context manager to ensure that the log file is closed properly.

Yields

None
Yields control to the block of code using this context manager.

Ensures

The log file is closed properly upon exiting the block of code.

Expand source code
@contextmanager
def ensure_log(self) -> Iterator[None]:
    """
    A context manager to ensure that the log file is closed properly.

    Yields:
        None: Yields control to the block of code using this context manager.

    Ensures:
        The log file is closed properly upon exiting the block of code.
    """
    try:
        yield
    finally:
        self.log_file.close()
def write_header(self, bot_name: str, model: str, bot_instructions: str) ‑> None

Writes the header information to the log file.

Args

bot_name : str
Name of the bot.
model : str
Model used by the bot.
bot_instructions : str
Instructions or description for the bot.
Expand source code
def write_header(self, bot_name: str, model: str, bot_instructions: str) -> None:
    """
    Writes the header information to the log file.

    Args:
        bot_name (str): Name of the bot.
        model (str): Model used by the bot.
        bot_instructions (str): Instructions or description for the bot.
    """
    self.bot_name = bot_name
    self.model = model
    self.bot_instructions = bot_instructions
    header = f"# Bot Name: {self.bot_name}\n## Model: {self.model}\n### Instructions: {self.bot_instructions}\n\n"
    self.log_file.write(header)
    logger.info(header)
class EdTool (root_folder: str, config: Config)

A python version of ed.

Initialize the EdTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class EdTool:
    """A python version of ed."""

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the EdTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder if root_folder.endswith("/") else root_folder + "/"
        self.buffer = Buffer()
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    @log()
    def ed(self, script: str, file_name: str) -> list[str]:
        """A python version of ed.

        Args:
            script (str): Ed commands to run.
            file_name (str): Script creates or edits this file.

        Returns:
            list[str]: The output of the script.
        """
        commands = script.split("\n")

        file_name = sanitize_path(file_name)

        file_path = self.root_folder + file_name
        if os.path.isfile(file_path):
            with open(file_path, encoding="utf-8", errors=self.utf8_errors) as f:
                self.buffer = Buffer(f.readlines())

        command_lines: list[tuple[Any, list[str]]] = []
        current_command = None
        current_list: list[str] = []

        for command, upcoming in zip(commands, commands[1:], strict=False):
            command = command.strip()
            if command.startswith("# "):
                # comments, because this is unreadable without them
                continue
            if not command:
                # blank commands are noise
                continue
            if command.startswith("e "):
                # can't really load right now
                continue
            if command == ".":
                if current_command is not None:
                    command_lines.append((current_command, current_list))

                self._run_commands(command_lines)
                current_command = None
                current_list = []
                command_lines = []
                continue

            if command == "q":
                # stop processing commands
                break
            if command == "w":
                with open(file_path, "w", encoding="utf-8") as f:
                    f.writelines(self.buffer.lines)
                continue

            if command.startswith("> "):
                # always a line
                current_list.append(f"{command[2:]}\n")
            else:
                if upcoming is None or not upcoming.startswith("> "):
                    current_command = command
                    command_lines.append((current_command, current_list))
                current_command = command
                current_list = []

            # always be running commands unless we're add/editing a line next.
            if upcoming is None:
                # last command, probably a q
                self._run_commands(command_lines)
            elif current_list or upcoming.startswith("> "):
                # waiting for . or expecting more lines
                pass
            else:
                self._run_commands(command_lines)

        return self.buffer.lines

    def _run_commands(self, command_lines: (list[tuple[Any, list[str]]])) -> None:
        """Run the commands.

        Args:
            command_lines (list[tuple[Any, list[str]]]): The commands to run.
        """
        for command_now, lines_now in command_lines:
            print(command_now, "\n".join(lines_now))
            self.buffer.run(command_now, lines_now)

Methods

def ed(self, script: str, file_name: str) ‑> list[str]

A python version of ed.

Args

script : str
Ed commands to run.
file_name : str
Script creates or edits this file.

Returns

list[str]
The output of the script.
Expand source code
@log()
def ed(self, script: str, file_name: str) -> list[str]:
    """A python version of ed.

    Args:
        script (str): Ed commands to run.
        file_name (str): Script creates or edits this file.

    Returns:
        list[str]: The output of the script.
    """
    commands = script.split("\n")

    file_name = sanitize_path(file_name)

    file_path = self.root_folder + file_name
    if os.path.isfile(file_path):
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as f:
            self.buffer = Buffer(f.readlines())

    command_lines: list[tuple[Any, list[str]]] = []
    current_command = None
    current_list: list[str] = []

    for command, upcoming in zip(commands, commands[1:], strict=False):
        command = command.strip()
        if command.startswith("# "):
            # comments, because this is unreadable without them
            continue
        if not command:
            # blank commands are noise
            continue
        if command.startswith("e "):
            # can't really load right now
            continue
        if command == ".":
            if current_command is not None:
                command_lines.append((current_command, current_list))

            self._run_commands(command_lines)
            current_command = None
            current_list = []
            command_lines = []
            continue

        if command == "q":
            # stop processing commands
            break
        if command == "w":
            with open(file_path, "w", encoding="utf-8") as f:
                f.writelines(self.buffer.lines)
            continue

        if command.startswith("> "):
            # always a line
            current_list.append(f"{command[2:]}\n")
        else:
            if upcoming is None or not upcoming.startswith("> "):
                current_command = command
                command_lines.append((current_command, current_list))
            current_command = command
            current_list = []

        # always be running commands unless we're add/editing a line next.
        if upcoming is None:
            # last command, probably a q
            self._run_commands(command_lines)
        elif current_list or upcoming.startswith("> "):
            # waiting for . or expecting more lines
            pass
        else:
            self._run_commands(command_lines)

    return self.buffer.lines
class EdlinTool (root_folder: str, config: Config)

Initialize the EdlinTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class EdlinTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the EdlinTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder if root_folder.endswith("/") else root_folder + "/"
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)

    @log()
    def edlin(self, script: str, file_name: str) -> list[str]:
        r"""An improved version of the edlin.

        Args:
            script (str): Edlin commands to run.
            file_name (str): Script creates or edits this file.

        Returns:
            list[str]: The output of the script.
        """
        file_name = sanitize_path(file_name)

        if VERBOSE:
            config = dedlin.configure_logging()
            logging.config.dictConfig(config)
            logger.info("Verbose mode enabled")

        logger.info("Plain mode. UI should be dull.")
        output: list[str] = []

        the_command_generator = StringCommandGenerator(script)

        editor = dedlin.Dedlin(
            inputter=the_command_generator,
            # These should be blank, insert and edit only from the commands.
            insert_document_inputter=EmptyStringGenerator(),
            edit_document_inputter=EmptyStringGenerator(),
            outputter=lambda text, end=None: output.append(text),
            headless=True,
            untrusted_user=True,
            history=True,
        )

        # No interaction, bot can't recover, answer questions, see realtime trace!
        editor.halt_on_error = True
        editor.quit_safety = False
        editor.echo = False
        # joke!
        editor.vim_mode = False
        # This logging is not for a bot!
        editor.verbose = VERBOSE

        # pylint: disable=broad-except
        try:
            # to broad? Don't register this hook.
            # sys.excepthook = editor.save_on_crash
            editor.entry_point(
                file_name,
            )
            # Bot often forgets to save.
            editor.save_document_safe()
            # must have quit.
        except Exception as the_exception:
            editor.save_on_crash(type(the_exception), the_exception, None)

            logger.error(traceback.format_exc())
            # output.append(traceback.format_exc())
            editor.save_document_safe()
            raise
        editor.save_document_safe()
        editor.final_report()
        if self.auto_cat:
            feedback = "\n".join(output)
            contents = CatTool(self.root_folder, self.config).cat_markdown([file_name])
            return [f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"]
        return output

Methods

def edlin(self, script: str, file_name: str) ‑> list[str]

An improved version of the edlin.

Args

script : str
Edlin commands to run.
file_name : str
Script creates or edits this file.

Returns

list[str]
The output of the script.
Expand source code
@log()
def edlin(self, script: str, file_name: str) -> list[str]:
    r"""An improved version of the edlin.

    Args:
        script (str): Edlin commands to run.
        file_name (str): Script creates or edits this file.

    Returns:
        list[str]: The output of the script.
    """
    file_name = sanitize_path(file_name)

    if VERBOSE:
        config = dedlin.configure_logging()
        logging.config.dictConfig(config)
        logger.info("Verbose mode enabled")

    logger.info("Plain mode. UI should be dull.")
    output: list[str] = []

    the_command_generator = StringCommandGenerator(script)

    editor = dedlin.Dedlin(
        inputter=the_command_generator,
        # These should be blank, insert and edit only from the commands.
        insert_document_inputter=EmptyStringGenerator(),
        edit_document_inputter=EmptyStringGenerator(),
        outputter=lambda text, end=None: output.append(text),
        headless=True,
        untrusted_user=True,
        history=True,
    )

    # No interaction, bot can't recover, answer questions, see realtime trace!
    editor.halt_on_error = True
    editor.quit_safety = False
    editor.echo = False
    # joke!
    editor.vim_mode = False
    # This logging is not for a bot!
    editor.verbose = VERBOSE

    # pylint: disable=broad-except
    try:
        # to broad? Don't register this hook.
        # sys.excepthook = editor.save_on_crash
        editor.entry_point(
            file_name,
        )
        # Bot often forgets to save.
        editor.save_document_safe()
        # must have quit.
    except Exception as the_exception:
        editor.save_on_crash(type(the_exception), the_exception, None)

        logger.error(traceback.format_exc())
        # output.append(traceback.format_exc())
        editor.save_document_safe()
        raise
    editor.save_document_safe()
    editor.final_report()
    if self.auto_cat:
        feedback = "\n".join(output)
        contents = CatTool(self.root_folder, self.config).cat_markdown([file_name])
        return [f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"]
    return output
class FindTool (root_folder: str, config: Config)

Initialize the FindTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class FindTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the FindTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)

    @log()
    def find_files(
        self,
        name: Optional[str] = None,
        regex: Optional[str] = None,
        file_type: Optional[str] = None,
        size: Optional[str] = None,
    ) -> list[str]:
        """
        Recursively search for files or directories matching given criteria in a directory and its subdirectories.

        Args:
            name (Optional[str]): The exact name to match filenames against.
            regex (Optional[str]): The regex pattern to match filenames against.
            file_type (Optional[str]): The type to filter ('file' or 'directory').
            size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes.

        Returns:
            list[str]: A list of paths to files or directories that match the criteria.
        """
        logger.info(f"find --name {name} --regex {regex} --type {file_type} --size {size}")
        matching_files = []
        for root, dirs, files in os.walk(os.getcwd()):
            # Combine files and directories for type filtering
            combined = files
            if file_type == "directory":
                combined += dirs

            for entry in combined:
                full_path = os.path.join(root, entry)
                # TODO: handle this differently
                if "__pycache__" not in full_path:
                    # TODO: handle differently. The bot
                    # is put into the root_folder as cwd, so as long as there isn't .. in path we should be good.
                    # if is_file_in_root_folder(full_path, self.root_folder):
                    short_path = remove_root_folder(full_path, self.root_folder)
                    # Check for name, regex, and size match
                    if (name and fnmatch.fnmatch(entry, name)) or name is None:
                        if self._match_type_and_size(full_path, file_type, size):
                            matching_files.append(short_path)
                    elif regex and re.search(regex, entry):
                        if self._match_type_and_size(full_path, file_type, size):
                            matching_files.append(short_path)

        # Not the best way to remove hidden.
        return list(sorted(_ for _ in matching_files if not _.startswith(".")))

    def _match_type_and_size(self, path: str, file_type: Optional[str], size: Optional[str]) -> bool:
        """
        Check if a file/directory matches the specified type and size criteria.

        Args:
            path (str): The path to the file/directory.
            file_type (Optional[str]): The type to filter ('file' or 'directory').
            size (Optional[str]): The size to filter files by.

        Returns:
            bool: True if the file/directory matches the criteria, False otherwise.
        """
        if file_type:
            if file_type == "file" and not os.path.isfile(path):
                return False
            if file_type == "directory" and not os.path.isdir(path):
                return False

        if size:
            size_prefix = size[0]
            size_value = int(size[1:])
            file_size = os.path.getsize(path)

            if size_prefix == "+" and file_size <= size_value:
                return False
            if size_prefix == "-" and file_size >= size_value:
                return False
        return True

    @log()
    def find_files_markdown(
        self,
        name: Optional[str] = None,
        regex: Optional[str] = None,
        file_type: Optional[str] = None,
        size: Optional[str] = None,
    ) -> str:
        """
        Recursively search for files or directories matching given criteria in a directory and its subdirectories.

        Args:
            name (Optional[str]): The exact name to match filenames against.
            regex (Optional[str]): The regex pattern to match filenames against.
            file_type (Optional[str]): The type to filter ('file' or 'directory').
            size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes.

        Returns:
            str: Markdown of paths to files or directories that match the criteria.
        """
        output = StringIO()
        results = self.find_files(name, regex, file_type, size)
        for item in results:
            output.write(item)
            output.write("\n")
        output.seek(0)
        return output.read()

Methods

def find_files(self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None) ‑> list[str]

Recursively search for files or directories matching given criteria in a directory and its subdirectories.

Args

name : Optional[str]
The exact name to match filenames against.
regex : Optional[str]
The regex pattern to match filenames against.
file_type : Optional[str]
The type to filter ('file' or 'directory').
size : Optional[str]
The size to filter files by, e.g., '+100' for files larger than 100 bytes.

Returns

list[str]
A list of paths to files or directories that match the criteria.
Expand source code
@log()
def find_files(
    self,
    name: Optional[str] = None,
    regex: Optional[str] = None,
    file_type: Optional[str] = None,
    size: Optional[str] = None,
) -> list[str]:
    """
    Recursively search for files or directories matching given criteria in a directory and its subdirectories.

    Args:
        name (Optional[str]): The exact name to match filenames against.
        regex (Optional[str]): The regex pattern to match filenames against.
        file_type (Optional[str]): The type to filter ('file' or 'directory').
        size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes.

    Returns:
        list[str]: A list of paths to files or directories that match the criteria.
    """
    logger.info(f"find --name {name} --regex {regex} --type {file_type} --size {size}")
    matching_files = []
    for root, dirs, files in os.walk(os.getcwd()):
        # Combine files and directories for type filtering
        combined = files
        if file_type == "directory":
            combined += dirs

        for entry in combined:
            full_path = os.path.join(root, entry)
            # TODO: handle this differently
            if "__pycache__" not in full_path:
                # TODO: handle differently. The bot
                # is put into the root_folder as cwd, so as long as there isn't .. in path we should be good.
                # if is_file_in_root_folder(full_path, self.root_folder):
                short_path = remove_root_folder(full_path, self.root_folder)
                # Check for name, regex, and size match
                if (name and fnmatch.fnmatch(entry, name)) or name is None:
                    if self._match_type_and_size(full_path, file_type, size):
                        matching_files.append(short_path)
                elif regex and re.search(regex, entry):
                    if self._match_type_and_size(full_path, file_type, size):
                        matching_files.append(short_path)

    # Not the best way to remove hidden.
    return list(sorted(_ for _ in matching_files if not _.startswith(".")))
def find_files_markdown(self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None) ‑> str

Recursively search for files or directories matching given criteria in a directory and its subdirectories.

Args

name : Optional[str]
The exact name to match filenames against.
regex : Optional[str]
The regex pattern to match filenames against.
file_type : Optional[str]
The type to filter ('file' or 'directory').
size : Optional[str]
The size to filter files by, e.g., '+100' for files larger than 100 bytes.

Returns

str
Markdown of paths to files or directories that match the criteria.
Expand source code
@log()
def find_files_markdown(
    self,
    name: Optional[str] = None,
    regex: Optional[str] = None,
    file_type: Optional[str] = None,
    size: Optional[str] = None,
) -> str:
    """
    Recursively search for files or directories matching given criteria in a directory and its subdirectories.

    Args:
        name (Optional[str]): The exact name to match filenames against.
        regex (Optional[str]): The regex pattern to match filenames against.
        file_type (Optional[str]): The type to filter ('file' or 'directory').
        size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes.

    Returns:
        str: Markdown of paths to files or directories that match the criteria.
    """
    output = StringIO()
    results = self.find_files(name, regex, file_type, size)
    for item in results:
        output.write(item)
        output.write("\n")
    output.seek(0)
    return output.read()
class GitTool (root_folder: str, config: Config)

Initialize the GitTool class.

Args

root_folder : str
The root folder path for repo operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class GitTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the GitTool class.

        Args:
            root_folder (str): The root folder path for repo operations.
            config (Config): The developer input that bot shouldn't set.
        """
        # Initialize the repository
        self.repo_path = root_folder
        self.repo = Repo(root_folder)
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    def is_ignored_by_gitignore(self, file_path: str, gitignore_path: str = ".gitignore") -> bool:
        """
        Check if a file is ignored by .gitignore.

        Args:
            file_path (str): The path of the file to check.
            gitignore_path (str): The path to the .gitignore file. Defaults to '.gitignore' in the current directory.

        Returns:
            bool: True if the file is ignored, False otherwise.

        Raises:
            FileNotFoundError: If the .gitignore file is not found.
        """
        # Resolve the full path to the .gitignore file
        full_gitignore_path = os.path.join(self.repo_path, gitignore_path)

        if not os.path.isfile(full_gitignore_path):
            raise FileNotFoundError(f"No .gitignore file found at {full_gitignore_path}")

        # Normalize file path
        file_path = os.path.abspath(file_path)

        with open(full_gitignore_path, encoding="utf-8", errors=self.utf8_errors) as gitignore:
            for line in gitignore:
                line = line.strip()
                # Skip empty lines and comments
                if not line or line.startswith("#"):
                    continue

                # Convert the .gitignore pattern to a glob pattern
                gitignore_pattern = os.path.join(os.path.dirname(gitignore_path), line)

                if fnmatch.fnmatch(file_path, gitignore_pattern):
                    return True

        return False

    @log()
    def git_status(self) -> dict[str, Any]:
        """Returns the status of the repository.

        Returns:
            dict[str, Any]: Structured `git status` response
        """
        logger.info("git status")
        changed_files = [item.a_path for item in self.repo.index.diff(None)]
        untracked_files = self.repo.untracked_files
        return {"changed_files": changed_files, "untracked_files": untracked_files}

    @log()
    def get_current_branch(
        self,
    ) -> str:
        """
        Retrieves the current branch name of the repository.

        Returns:
            str: The current branch name.
        """
        logger.info("git branch --show-current")
        return self.repo.active_branch.name

    @log()
    def get_recent_commits(self, n: int = 10, short_hash: bool = False) -> list[dict[str, Any]]:
        """
        Retrieves the most recent commit hashes from the current branch.

        Args:
            n (int, optional): The number of recent commits to retrieve. Defaults to 10.
            short_hash (bool, optional): If True, return short hashes; otherwise, return full hashes. Defaults to False.

        Returns:
            list[dict[str, Any]]: A list of dictionaries, each containing 'short_hash' and 'full_hash' keys (if short_hash is True),
                          or only 'full_hash' (if short_hash is False), representing the commit hashes.
        """
        logger.info(f"git log --pretty=format:%H -n {n}")
        current_branch = self.get_current_branch()
        commits = list(self.repo.iter_commits(current_branch, max_count=n))
        if short_hash:
            return [{"short_hash": commit.hexsha[:7], "full_hash": commit.hexsha} for commit in commits]
        return [{"full_hash": commit.hexsha} for commit in commits]

    @log()
    def git_diff(self) -> list[dict[str, Any]]:
        """Returns the differences in the working directory.

        Returns:
            list[dict[str, Any]]: Structured `git diff` response
        """
        logger.info("git diff --name-only")
        diffs = self.repo.git.diff("HEAD", name_only=True).splitlines()
        return [{"file": diff} for diff in diffs]

    @log()
    def git_log_file(self, filename: str) -> list[dict[str, Any]]:
        """Returns the commit history for a specific file.

        Args:
            filename (str): The path to the file.

        Returns:
            list[dict[str, Any]]: Structured `git log` response
        """
        logger.info(f"git log --pretty=format:%H -n 1 {filename}")
        commits = self.repo.git.log("--pretty=format:%H - %an, %ar : %s", filename).splitlines()
        return [{"commit": commit} for commit in commits]

    @log()
    def git_log_search(self, search_string: str) -> list[dict[str, Any]]:
        """Returns the commit history that matches the search string.

        Args:
            search_string (str): The search string.

        Returns:
            list of dict: Structured `git log` response
        """
        logger.info(f"git log --pretty=format:%H -S {search_string}")
        commits = self.repo.git.log("-S", search_string, "--pretty=format:%H - %an, %ar : %s").splitlines()
        return [{"commit": commit} for commit in commits]

    @log()
    def git_show(self) -> list[dict[str, Any]]:
        """Shows various types of objects (commits, tags, etc.).

        Returns:
            list[dict[str, Any]]: Structured `git show` response
        """
        logger.info("git show --pretty=format:%H -n 1")
        show_data = self.repo.git.show("--pretty=format:%H - %an, %ar : %s", n=1).splitlines()
        return [{"data": data} for data in show_data]

    @log()
    def git_diff_commit(self, commit1: str, commit2: str) -> list[dict[str, Any]]:
        """Shows changes between two commits.

        Args:
            commit1 (str): First commit
            commit2 (str): Second commit

        Returns:
            list[dict[str, Any]]: Structured `git diff` response
        """
        logger.info(f"git diff --name-only {commit1} {commit2}")
        diffs = self.repo.git.diff(commit1, commit2, name_only=True).splitlines()
        return [{"file": diff} for diff in diffs]

Methods

def get_current_branch(self) ‑> str

Retrieves the current branch name of the repository.

Returns

str
The current branch name.
Expand source code
@log()
def get_current_branch(
    self,
) -> str:
    """
    Retrieves the current branch name of the repository.

    Returns:
        str: The current branch name.
    """
    logger.info("git branch --show-current")
    return self.repo.active_branch.name
def get_recent_commits(self, n: int = 10, short_hash: bool = False) ‑> list[dict[str, typing.Any]]

Retrieves the most recent commit hashes from the current branch.

Args

n : int, optional
The number of recent commits to retrieve. Defaults to 10.
short_hash : bool, optional
If True, return short hashes; otherwise, return full hashes. Defaults to False.

Returns

list[dict[str, Any]]
A list of dictionaries, each containing 'short_hash' and 'full_hash' keys (if short_hash is True), or only 'full_hash' (if short_hash is False), representing the commit hashes.
Expand source code
@log()
def get_recent_commits(self, n: int = 10, short_hash: bool = False) -> list[dict[str, Any]]:
    """
    Retrieves the most recent commit hashes from the current branch.

    Args:
        n (int, optional): The number of recent commits to retrieve. Defaults to 10.
        short_hash (bool, optional): If True, return short hashes; otherwise, return full hashes. Defaults to False.

    Returns:
        list[dict[str, Any]]: A list of dictionaries, each containing 'short_hash' and 'full_hash' keys (if short_hash is True),
                      or only 'full_hash' (if short_hash is False), representing the commit hashes.
    """
    logger.info(f"git log --pretty=format:%H -n {n}")
    current_branch = self.get_current_branch()
    commits = list(self.repo.iter_commits(current_branch, max_count=n))
    if short_hash:
        return [{"short_hash": commit.hexsha[:7], "full_hash": commit.hexsha} for commit in commits]
    return [{"full_hash": commit.hexsha} for commit in commits]
def git_diff(self) ‑> list[dict[str, typing.Any]]

Returns the differences in the working directory.

Returns

list[dict[str, Any]]
Structured git diff response
Expand source code
@log()
def git_diff(self) -> list[dict[str, Any]]:
    """Returns the differences in the working directory.

    Returns:
        list[dict[str, Any]]: Structured `git diff` response
    """
    logger.info("git diff --name-only")
    diffs = self.repo.git.diff("HEAD", name_only=True).splitlines()
    return [{"file": diff} for diff in diffs]
def git_diff_commit(self, commit1: str, commit2: str) ‑> list[dict[str, typing.Any]]

Shows changes between two commits.

Args

commit1 : str
First commit
commit2 : str
Second commit

Returns

list[dict[str, Any]]
Structured git diff response
Expand source code
@log()
def git_diff_commit(self, commit1: str, commit2: str) -> list[dict[str, Any]]:
    """Shows changes between two commits.

    Args:
        commit1 (str): First commit
        commit2 (str): Second commit

    Returns:
        list[dict[str, Any]]: Structured `git diff` response
    """
    logger.info(f"git diff --name-only {commit1} {commit2}")
    diffs = self.repo.git.diff(commit1, commit2, name_only=True).splitlines()
    return [{"file": diff} for diff in diffs]
def git_log_file(self, filename: str) ‑> list[dict[str, typing.Any]]

Returns the commit history for a specific file.

Args

filename : str
The path to the file.

Returns

list[dict[str, Any]]
Structured git log response
Expand source code
@log()
def git_log_file(self, filename: str) -> list[dict[str, Any]]:
    """Returns the commit history for a specific file.

    Args:
        filename (str): The path to the file.

    Returns:
        list[dict[str, Any]]: Structured `git log` response
    """
    logger.info(f"git log --pretty=format:%H -n 1 {filename}")
    commits = self.repo.git.log("--pretty=format:%H - %an, %ar : %s", filename).splitlines()
    return [{"commit": commit} for commit in commits]

Returns the commit history that matches the search string.

Args

search_string : str
The search string.

Returns

list of dict
Structured git log response
Expand source code
@log()
def git_log_search(self, search_string: str) -> list[dict[str, Any]]:
    """Returns the commit history that matches the search string.

    Args:
        search_string (str): The search string.

    Returns:
        list of dict: Structured `git log` response
    """
    logger.info(f"git log --pretty=format:%H -S {search_string}")
    commits = self.repo.git.log("-S", search_string, "--pretty=format:%H - %an, %ar : %s").splitlines()
    return [{"commit": commit} for commit in commits]
def git_show(self) ‑> list[dict[str, typing.Any]]

Shows various types of objects (commits, tags, etc.).

Returns

list[dict[str, Any]]
Structured git show response
Expand source code
@log()
def git_show(self) -> list[dict[str, Any]]:
    """Shows various types of objects (commits, tags, etc.).

    Returns:
        list[dict[str, Any]]: Structured `git show` response
    """
    logger.info("git show --pretty=format:%H -n 1")
    show_data = self.repo.git.show("--pretty=format:%H - %an, %ar : %s", n=1).splitlines()
    return [{"data": data} for data in show_data]
def git_status(self) ‑> dict[str, typing.Any]

Returns the status of the repository.

Returns

dict[str, Any]
Structured git status response
Expand source code
@log()
def git_status(self) -> dict[str, Any]:
    """Returns the status of the repository.

    Returns:
        dict[str, Any]: Structured `git status` response
    """
    logger.info("git status")
    changed_files = [item.a_path for item in self.repo.index.diff(None)]
    untracked_files = self.repo.untracked_files
    return {"changed_files": changed_files, "untracked_files": untracked_files}
def is_ignored_by_gitignore(self, file_path: str, gitignore_path: str = '.gitignore') ‑> bool

Check if a file is ignored by .gitignore.

Args

file_path : str
The path of the file to check.
gitignore_path : str
The path to the .gitignore file. Defaults to '.gitignore' in the current directory.

Returns

bool
True if the file is ignored, False otherwise.

Raises

FileNotFoundError
If the .gitignore file is not found.
Expand source code
def is_ignored_by_gitignore(self, file_path: str, gitignore_path: str = ".gitignore") -> bool:
    """
    Check if a file is ignored by .gitignore.

    Args:
        file_path (str): The path of the file to check.
        gitignore_path (str): The path to the .gitignore file. Defaults to '.gitignore' in the current directory.

    Returns:
        bool: True if the file is ignored, False otherwise.

    Raises:
        FileNotFoundError: If the .gitignore file is not found.
    """
    # Resolve the full path to the .gitignore file
    full_gitignore_path = os.path.join(self.repo_path, gitignore_path)

    if not os.path.isfile(full_gitignore_path):
        raise FileNotFoundError(f"No .gitignore file found at {full_gitignore_path}")

    # Normalize file path
    file_path = os.path.abspath(file_path)

    with open(full_gitignore_path, encoding="utf-8", errors=self.utf8_errors) as gitignore:
        for line in gitignore:
            line = line.strip()
            # Skip empty lines and comments
            if not line or line.startswith("#"):
                continue

            # Convert the .gitignore pattern to a glob pattern
            gitignore_pattern = os.path.join(os.path.dirname(gitignore_path), line)

            if fnmatch.fnmatch(file_path, gitignore_pattern):
                return True

    return False
class GrepTool (root_folder: str, config: Config)

A tool for searching files using regular expressions.

Initialize the GrepTool with a root folder.

Args

root_folder : str
The root folder to search within.
config : Config
The developer input that bot shouldn't set.
Expand source code
class GrepTool:
    """A tool for searching files using regular expressions."""

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the GrepTool with a root folder.

        Args:
            root_folder (str): The root folder to search within.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder: str = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    @log()
    def grep_markdown(
        self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches: int = -1
    ) -> str:
        """
        Search for lines matching a regular expression in files and returns markdown formatted results.

        Args:
            regex (str): A regular expression string to search for.
            glob_pattern (str): A glob pattern string to specify files.
            skip_first_matches (int): Number of initial matches to skip.
            maximum_matches (int): Maximum number of matches to return.

        Returns:
            str: Markdown formatted string of grep results.
        """
        results = self.grep(regex, glob_pattern, skip_first_matches, maximum_matches)
        matches_found = results.matches_found

        output = StringIO()
        for file_match in results.data:
            output.write(file_match.filename + "\n")
            for match in file_match.found:
                output.write(f"line {match.line_number}: {match.line}\n")
        output.write(
            f"{matches_found} matches found and {min(matches_found, maximum_matches) if maximum_matches != -1 else matches_found} displayed. "
            f"Skipped {skip_first_matches}\n"
        )
        output.seek(0)
        return output.read()

    @log()
    def grep(
        self,
        regex: str,
        glob_pattern: str,
        skip_first_matches: int = -1,
        maximum_matches_per_file: int = -1,
        maximum_matches_total: int = -1,
    ) -> GrepResults:
        """
        Search for lines matching a regular expression in files specified by a glob pattern.

        Args:
            regex (str): A regular expression string to search for.
            glob_pattern (str): A glob pattern string to specify files.
            skip_first_matches (int): Number of initial matches to skip.
            maximum_matches_per_file (int): Maximum number of matches to return for one file.
            maximum_matches_total (int): Maximum number of matches to return total.

        Returns:
            GrepResults: The results of the grep operation.
        """
        logger.info(
            f"grep --regex {regex} --glob_pattern {glob_pattern} "
            f"--skip_first_matches {skip_first_matches} "
            f"--maximum_matches_total {maximum_matches_total} "
            f"--maximum_matches_per_file {maximum_matches_per_file}"
        )
        pattern = re.compile(regex)
        matches_total = 0
        skip_count = 0 if skip_first_matches < 0 else skip_first_matches

        results = GrepResults(matches_found=-1)

        for filename in glob.glob(glob_pattern, root_dir=self.root_folder, recursive=True):
            matches_per_file = 0
            if os.path.isdir(filename):
                logging.warning(f"Skipping directory {filename}, because it isn't a file.")
                continue
            if not os.path.exists(filename):
                # What a hack
                open_path = self.root_folder + "/" + filename
            else:
                open_path = filename
            with open(open_path, encoding="utf-8", errors=self.utf8_errors) as file:
                if not is_file_in_root_folder(filename, self.root_folder):
                    logging.warning(f"Skipping file {filename}, because it isn't in the root folder.")
                    continue
                line_number = 0
                for line in file:
                    below_maximum = matches_per_file < maximum_matches_per_file
                    maximum_not_set = maximum_matches_per_file == -1
                    if below_maximum or maximum_not_set:
                        line_number += 1
                        if pattern.search(line):
                            matches_total += 1
                            matches_per_file += 1

                            if matches_total <= (matches_total + skip_count) or matches_total == -1:
                                if (0 < skip_first_matches < matches_total) or skip_first_matches == -1:
                                    # This creates names like \..\..\..\ etc.
                                    minimal_filename = remove_root_folder(filename, self.root_folder)
                                    # avoid double count
                                    found = next((fm for fm in results.data if fm.filename == minimal_filename), None)
                                    if not found:
                                        found = FileMatches(filename=minimal_filename)
                                        results.data.append(found)

                                    found.found.append(Match(line_number=line_number, line=line.strip()))
        results.data = list(sorted(results.data, key=lambda x: x.filename))
        results.matches_found = matches_total
        return results

Methods

def grep(self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches_per_file: int = -1, maximum_matches_total: int = -1) ‑> GrepResults

Search for lines matching a regular expression in files specified by a glob pattern.

Args

regex : str
A regular expression string to search for.
glob_pattern : str
A glob pattern string to specify files.
skip_first_matches : int
Number of initial matches to skip.
maximum_matches_per_file : int
Maximum number of matches to return for one file.
maximum_matches_total : int
Maximum number of matches to return total.

Returns

GrepResults
The results of the grep operation.
Expand source code
@log()
def grep(
    self,
    regex: str,
    glob_pattern: str,
    skip_first_matches: int = -1,
    maximum_matches_per_file: int = -1,
    maximum_matches_total: int = -1,
) -> GrepResults:
    """
    Search for lines matching a regular expression in files specified by a glob pattern.

    Args:
        regex (str): A regular expression string to search for.
        glob_pattern (str): A glob pattern string to specify files.
        skip_first_matches (int): Number of initial matches to skip.
        maximum_matches_per_file (int): Maximum number of matches to return for one file.
        maximum_matches_total (int): Maximum number of matches to return total.

    Returns:
        GrepResults: The results of the grep operation.
    """
    logger.info(
        f"grep --regex {regex} --glob_pattern {glob_pattern} "
        f"--skip_first_matches {skip_first_matches} "
        f"--maximum_matches_total {maximum_matches_total} "
        f"--maximum_matches_per_file {maximum_matches_per_file}"
    )
    pattern = re.compile(regex)
    matches_total = 0
    skip_count = 0 if skip_first_matches < 0 else skip_first_matches

    results = GrepResults(matches_found=-1)

    for filename in glob.glob(glob_pattern, root_dir=self.root_folder, recursive=True):
        matches_per_file = 0
        if os.path.isdir(filename):
            logging.warning(f"Skipping directory {filename}, because it isn't a file.")
            continue
        if not os.path.exists(filename):
            # What a hack
            open_path = self.root_folder + "/" + filename
        else:
            open_path = filename
        with open(open_path, encoding="utf-8", errors=self.utf8_errors) as file:
            if not is_file_in_root_folder(filename, self.root_folder):
                logging.warning(f"Skipping file {filename}, because it isn't in the root folder.")
                continue
            line_number = 0
            for line in file:
                below_maximum = matches_per_file < maximum_matches_per_file
                maximum_not_set = maximum_matches_per_file == -1
                if below_maximum or maximum_not_set:
                    line_number += 1
                    if pattern.search(line):
                        matches_total += 1
                        matches_per_file += 1

                        if matches_total <= (matches_total + skip_count) or matches_total == -1:
                            if (0 < skip_first_matches < matches_total) or skip_first_matches == -1:
                                # This creates names like \..\..\..\ etc.
                                minimal_filename = remove_root_folder(filename, self.root_folder)
                                # avoid double count
                                found = next((fm for fm in results.data if fm.filename == minimal_filename), None)
                                if not found:
                                    found = FileMatches(filename=minimal_filename)
                                    results.data.append(found)

                                found.found.append(Match(line_number=line_number, line=line.strip()))
    results.data = list(sorted(results.data, key=lambda x: x.filename))
    results.matches_found = matches_total
    return results
def grep_markdown(self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches: int = -1) ‑> str

Search for lines matching a regular expression in files and returns markdown formatted results.

Args

regex : str
A regular expression string to search for.
glob_pattern : str
A glob pattern string to specify files.
skip_first_matches : int
Number of initial matches to skip.
maximum_matches : int
Maximum number of matches to return.

Returns

str
Markdown formatted string of grep results.
Expand source code
@log()
def grep_markdown(
    self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches: int = -1
) -> str:
    """
    Search for lines matching a regular expression in files and returns markdown formatted results.

    Args:
        regex (str): A regular expression string to search for.
        glob_pattern (str): A glob pattern string to specify files.
        skip_first_matches (int): Number of initial matches to skip.
        maximum_matches (int): Maximum number of matches to return.

    Returns:
        str: Markdown formatted string of grep results.
    """
    results = self.grep(regex, glob_pattern, skip_first_matches, maximum_matches)
    matches_found = results.matches_found

    output = StringIO()
    for file_match in results.data:
        output.write(file_match.filename + "\n")
        for match in file_match.found:
            output.write(f"line {match.line_number}: {match.line}\n")
    output.write(
        f"{matches_found} matches found and {min(matches_found, maximum_matches) if maximum_matches != -1 else matches_found} displayed. "
        f"Skipped {skip_first_matches}\n"
    )
    output.seek(0)
    return output.read()
class HeadTailTool (root_folder: str, config: Config)

Initialize the HeadTailTool with a root folder.

Args

root_folder : str
The root folder where files will be checked.
config : Config
The developer input that bot shouldn't set.
Expand source code
class HeadTailTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """Initialize the HeadTailTool with a root folder.

        Args:
            root_folder (str): The root folder where files will be checked.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)

    @log()
    def head_markdown(self, file_path: str, lines: int = 10) -> str:
        """Return the first 'lines' lines of a file formatted as markdown.

        Args:
            file_path (str): Path to the file.
            lines (int): Number of lines to return. Defaults to 10.

        Returns:
            str: String containing the first 'lines' lines of the file.
        """
        return "\n".join(self.head(file_path, lines))

    @log()
    def head(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]:
        """Return the first 'lines' or 'byte_count' from a file.

        Args:
            file_path (str): Path to the file.
            lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
            byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines.

        Returns:
            list[str]: Lines or byte_count of bytes from the start of the file.
        """
        return self.head_tail(file_path, lines, "head", byte_count)

    @log()
    def tail_markdown(self, file_path: str, lines: int = 10) -> str:
        """Return the last 'lines' lines of a file formatted as markdown.

        Args:
            file_path (str): Path to the file.
            lines (int): Number of lines to return. Defaults to 10.

        Returns:
            str: String containing the last 'lines' lines of the file.
        """
        return "\n".join(self.tail(file_path, lines))

    @log()
    def tail(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]:
        """Return the last 'lines' or 'bytes' from a file.

        Args:
            file_path (str): Path to the file.
            lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
            byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines.

        Returns:
            list[str]: Lines or bytes from the end of the file.
        """
        return self.head_tail(file_path, lines, "tail", byte_count)

    def head_tail(
        self, file_path: str, lines: int = 10, mode: str = "head", byte_count: Optional[int] = None
    ) -> list[str]:
        """Read lines or bytes from the start ('head') or end ('tail') of a file.

        Args:
            file_path (str): Path to the file.
            lines (int): Number of lines to read. Ignored if byte_count is specified. Defaults to 10.
            mode (str): Operation mode, either 'head' or 'tail'. Defaults to 'head'.
            byte_count (Optional[int]): Number of bytes to read. If specified, overrides lines.

        Returns:
            list[str]: Requested lines or bytes from the file.

        Raises:
            ValueError: If mode is not 'head' or 'tail'.
            FileNotFoundError: If the file is not found in the root folder.
        """
        if mode == "head":
            logger.info(f"head --file_path {file_path} --lines {lines}")
        else:
            logger.info(f"tail --file_path {file_path} --lines {lines}")
        if mode not in ["head", "tail"]:
            raise ValueError("Mode must be 'head' or 'tail'")

        if not is_file_in_root_folder(file_path, self.root_folder):
            raise FileNotFoundError(f"File {file_path} not found in root folder {self.root_folder}")

        with open(file_path, "rb") as file:
            if byte_count is not None:
                if mode == "head":
                    return [file.read(byte_count).decode()]
                # mode == 'tail'
                file.seek(-byte_count, 2)  # Seek from end of file
                return [file.read(byte_count).decode()]

            # Read by lines if byte_count is not specified
            if mode == "head":
                head_lines = []
                for _ in range(lines):
                    try:
                        line = next(file).decode("utf-8")
                        head_lines.append(line.rstrip("\r\n"))
                    except StopIteration:
                        break
                return head_lines
                # return [next(file).decode("utf-8").rstrip("\r\n") for _ in range(lines)]
            # mode == 'tail'
            return [line.decode("utf-8").rstrip("\r\n") for line in list(file)[-lines:]]

Methods

def head(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) ‑> list[str]

Return the first 'lines' or 'byte_count' from a file.

Args

file_path : str
Path to the file.
lines : int
Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
byte_count : Optional[int]
Number of bytes to return. If specified, overrides lines.

Returns

list[str]
Lines or byte_count of bytes from the start of the file.
Expand source code
@log()
def head(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]:
    """Return the first 'lines' or 'byte_count' from a file.

    Args:
        file_path (str): Path to the file.
        lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
        byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines.

    Returns:
        list[str]: Lines or byte_count of bytes from the start of the file.
    """
    return self.head_tail(file_path, lines, "head", byte_count)
def head_markdown(self, file_path: str, lines: int = 10) ‑> str

Return the first 'lines' lines of a file formatted as markdown.

Args

file_path : str
Path to the file.
lines : int
Number of lines to return. Defaults to 10.

Returns

str
String containing the first 'lines' lines of the file.
Expand source code
@log()
def head_markdown(self, file_path: str, lines: int = 10) -> str:
    """Return the first 'lines' lines of a file formatted as markdown.

    Args:
        file_path (str): Path to the file.
        lines (int): Number of lines to return. Defaults to 10.

    Returns:
        str: String containing the first 'lines' lines of the file.
    """
    return "\n".join(self.head(file_path, lines))
def head_tail(self, file_path: str, lines: int = 10, mode: str = 'head', byte_count: Optional[int] = None) ‑> list[str]

Read lines or bytes from the start ('head') or end ('tail') of a file.

Args

file_path : str
Path to the file.
lines : int
Number of lines to read. Ignored if byte_count is specified. Defaults to 10.
mode : str
Operation mode, either 'head' or 'tail'. Defaults to 'head'.
byte_count : Optional[int]
Number of bytes to read. If specified, overrides lines.

Returns

list[str]
Requested lines or bytes from the file.

Raises

ValueError
If mode is not 'head' or 'tail'.
FileNotFoundError
If the file is not found in the root folder.
Expand source code
def head_tail(
    self, file_path: str, lines: int = 10, mode: str = "head", byte_count: Optional[int] = None
) -> list[str]:
    """Read lines or bytes from the start ('head') or end ('tail') of a file.

    Args:
        file_path (str): Path to the file.
        lines (int): Number of lines to read. Ignored if byte_count is specified. Defaults to 10.
        mode (str): Operation mode, either 'head' or 'tail'. Defaults to 'head'.
        byte_count (Optional[int]): Number of bytes to read. If specified, overrides lines.

    Returns:
        list[str]: Requested lines or bytes from the file.

    Raises:
        ValueError: If mode is not 'head' or 'tail'.
        FileNotFoundError: If the file is not found in the root folder.
    """
    if mode == "head":
        logger.info(f"head --file_path {file_path} --lines {lines}")
    else:
        logger.info(f"tail --file_path {file_path} --lines {lines}")
    if mode not in ["head", "tail"]:
        raise ValueError("Mode must be 'head' or 'tail'")

    if not is_file_in_root_folder(file_path, self.root_folder):
        raise FileNotFoundError(f"File {file_path} not found in root folder {self.root_folder}")

    with open(file_path, "rb") as file:
        if byte_count is not None:
            if mode == "head":
                return [file.read(byte_count).decode()]
            # mode == 'tail'
            file.seek(-byte_count, 2)  # Seek from end of file
            return [file.read(byte_count).decode()]

        # Read by lines if byte_count is not specified
        if mode == "head":
            head_lines = []
            for _ in range(lines):
                try:
                    line = next(file).decode("utf-8")
                    head_lines.append(line.rstrip("\r\n"))
                except StopIteration:
                    break
            return head_lines
            # return [next(file).decode("utf-8").rstrip("\r\n") for _ in range(lines)]
        # mode == 'tail'
        return [line.decode("utf-8").rstrip("\r\n") for line in list(file)[-lines:]]
def tail(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) ‑> list[str]

Return the last 'lines' or 'bytes' from a file.

Args

file_path : str
Path to the file.
lines : int
Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
byte_count : Optional[int]
Number of bytes to return. If specified, overrides lines.

Returns

list[str]
Lines or bytes from the end of the file.
Expand source code
@log()
def tail(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]:
    """Return the last 'lines' or 'bytes' from a file.

    Args:
        file_path (str): Path to the file.
        lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
        byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines.

    Returns:
        list[str]: Lines or bytes from the end of the file.
    """
    return self.head_tail(file_path, lines, "tail", byte_count)
def tail_markdown(self, file_path: str, lines: int = 10) ‑> str

Return the last 'lines' lines of a file formatted as markdown.

Args

file_path : str
Path to the file.
lines : int
Number of lines to return. Defaults to 10.

Returns

str
String containing the last 'lines' lines of the file.
Expand source code
@log()
def tail_markdown(self, file_path: str, lines: int = 10) -> str:
    """Return the last 'lines' lines of a file formatted as markdown.

    Args:
        file_path (str): Path to the file.
        lines (int): Number of lines to return. Defaults to 10.

    Returns:
        str: String containing the last 'lines' lines of the file.
    """
    return "\n".join(self.tail(file_path, lines))
class InsertTool (root_folder: str, config: Config)

Initialize the InsertTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class InsertTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the InsertTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.python_module = config.get_value("python_module")
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    @log()
    def insert_text_after_context(self, file_path: str, context: str, text_to_insert: str) -> str:
        """Inserts a given text immediately after a specified context in a file.

        This method opens the file, finds the line containing the specified context,
        and inserts the provided text immediately after this line. If the context
        matches multiple lines, it raises a ValueError due to ambiguity.

        Args:
            file_path (str): The path of the file in which the text is to be inserted.
            context (str): The context string to search for in the file. The text is
                           inserted after the line containing this context.
            text_to_insert (str): The text to insert into the file.

        Returns:
            str: A message for the bot with the result of the insert.

        Raises:
            ValueError: If the provided context matches multiple lines in the file.
        """
        if not file_path:
            raise TypeError("No file_path, please provide file_path for each request.")
        if not context:
            raise TypeError("No context, please context so I can find where to insert the text.")
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            lines = file.readlines()
        original_lines = list(lines)

        context_line_indices = [i for i, line in enumerate(lines) if context in line]

        if len(context_line_indices) == 0:
            with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
                plain_text = file.read()
            raise ValueError(
                f"No matches found, no changes made, context is not a substring of any row. "
                f"For reference, here is the contents of the file:\n{plain_text}"
            )

        # Check for ambiguity in the context match
        if len(context_line_indices) > 1:
            with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
                plain_text = file.read()
            found_at = ", ".join([str(i) for i in context_line_indices])
            raise ValueError(
                f"Ambiguous context: The provided context matches multiple lines, namely {found_at}. A context line the "
                "string or substring of the line just before your desired insertion point. It must "
                "uniquely identify a location. Either use a longer substring to match or switch to using"
                "the insert_text_after_multiline_context tool.\n"
                f"For reference, here is the contents of the file:\n{plain_text}"
            )

        # Index of the line after the context line
        insert_index = context_line_indices[0] + 1

        # Insert the text
        lines.insert(insert_index, text_to_insert + "\n")

        return self._save_if_changed(file_path, original_lines, lines)

    @log()
    def insert_text_at_start_or_end(self, file_path: str, text_to_insert: str, position: str = "end") -> str:
        """Inserts text at the start or end of a file.

        Opens the file and inserts the specified text either at the beginning or the
        end of the file, based on the 'position' argument. If the position argument
        is neither 'start' nor 'end', it raises a ValueError.

        Args:
            file_path (str): The path of the file in which the text is to be inserted.
            text_to_insert (str): The text to insert into the file.
            position (str, optional): The position where the text should be inserted.
                                      Should be either 'start' or 'end'. Defaults to 'end'.

        Raises:
            ValueError: If the 'position' argument is not 'start' or 'end'.

        """
        if not file_path:
            raise TypeError("No file_path, please provide file_path for each request.")
        if not text_to_insert:
            raise TypeError("No text_to_insert, please provide so I have something to insert.")
        if position not in ("start", "end"):
            raise ValueError("position must be start or end, so I know where to insert text.")
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            lines = file.readlines()
        original_lines = list(lines)
        if position == "start":
            lines.insert(0, text_to_insert + "\n")
        elif position == "end":
            lines.append(text_to_insert + "\n")
        else:
            raise ValueError("Invalid position: choose 'start' or 'end'.")

        return self._save_if_changed(file_path, original_lines, lines)

    @log()
    def insert_text_after_multiline_context(self, file_path: str, context_lines: list[str], text_to_insert: str) -> str:
        """Inserts text immediately after a specified multiline context in a file.

        Opens the file and searches for a sequence of lines (context). Once the context
        is found, it inserts the specified text immediately after this context. If the
        context is not found, it raises a ValueError.

        Args:
            file_path (str): The path of the file in which the text is to be inserted.
            context_lines (list of str): A list of strings representing the multiline
                                         context to search for in the file.
            text_to_insert (str): The text to insert into the file after the context.

        Raises:
            ValueError: If the multiline context is not found in the file.

        """
        if not file_path:
            raise TypeError("No file_path, please provide file_path for each request.")
        if not context_lines:
            raise TypeError("No context_lines, please context lines so I can find where to insert the new lines.")
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            lines = file.readlines()

        try:
            ends_with_n = lines[:-1][0].endswith("\n")
        except IndexError:
            ends_with_n = False

        # this is going to make it hard to preserve whitespace.
        # Convert context_lines to a string for easier matching
        context_string = "".join([line + "\n" for line in context_lines]).rstrip("\n")

        # Convert file lines to a string
        file_string = "".join(lines)

        starts_at = file_string.find(context_string)
        if starts_at == -1:
            with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
                plain_text = file.read()
            raise ValueError(
                f"No matches found, no changes made, context_lines are not found in this document. "
                f"For reference, here is the contents of the file:\n{plain_text}"
            )
        # Find the index where the context ends
        context_end_index = starts_at + len(context_string)

        # Split the file_string back into lines at the context end
        before_context = file_string[:context_end_index]
        after_context = file_string[context_end_index:]

        # Insert the new text
        new_file_string = before_context + "\n" + text_to_insert + "\n" + after_context.strip("\n")

        if ends_with_n:
            new_file_string += "\n"

        return self._save_if_changed(file_path, lines, new_file_string)

    def _save_if_changed(self, file_path: str, original_lines, new_file_string: Union[str, list[str]]) -> str:
        """
        Save the file if it has changed.

        Args:
            file_path: The path of the file to save.
            original_lines: The original file contents.
            new_file_string: The new file contents.

        Returns:
            A message for the bot with the result of the save.
        """
        if not new_file_string:
            raise TypeError("Something went wrong in insert and all text disappeared. Cancelling.")

        if isinstance(new_file_string, str) and "\n".join(original_lines) == new_file_string:
            return (
                "File not changed this means the old file contents are the same as the new. This has nothing "
                "to do with file permissions."
            )
        if isinstance(new_file_string, list) and original_lines == new_file_string:
            return (
                "File not changed, this means the old file contents are the same as the new. This has nothing "
                "to do with file permissions."
            )
        # if is_python_file(file_path):
        #     is_valid, error = is_valid_python_source(source)
        #     if not is_valid and error:
        #         return f"Invalid Python source code. No changes made. {error.lineno} {error.msg} {error.text}"
        #     if not is_valid:
        #         return f"Invalid Python source code. No changes made. {error}."

        # Write back to the file
        BackupRestore.backup_file(file_path)
        with open(file_path, "w", encoding="utf-8", errors=self.utf8_errors) as file:
            if isinstance(new_file_string, str):
                file.write(new_file_string)
            else:
                file.writelines(new_file_string)

        validation = self._validate_code(file_path)

        if validation:
            BackupRestore.revert_to_latest_backup(file_path)
            return f"File not rewritten because of problems.\n{validation.message}"

        if self.auto_cat:
            feedback = "Insert completed and no exceptions thrown."
            contents = CatTool(self.root_folder, self.config).cat_markdown([file_path])
            return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"
        return "Insert completed and no exceptions thrown. Please verify by other means."

    def _validate_code(self, full_path: str) -> Optional[ValidationMessageForBot]:
        """
        Validate python

        Args:
            full_path (str): The path to the file to validate.

        Returns:
            Optional[ValidationMessageForBot]: A validation message if the file is invalid, otherwise None.
        """
        if not is_python_file(full_path):
            return None
        if not self.python_module:
            logger.warning("No python module set, skipping validation.")
            return None
        validator = ValidateModule(self.python_module)
        results = validator.validate()
        explanation = validator.explain_to_bot(results)
        if explanation.is_valid:
            return None
        return explanation

Methods

def insert_text_after_context(self, file_path: str, context: str, text_to_insert: str) ‑> str

Inserts a given text immediately after a specified context in a file.

This method opens the file, finds the line containing the specified context, and inserts the provided text immediately after this line. If the context matches multiple lines, it raises a ValueError due to ambiguity.

Args

file_path : str
The path of the file in which the text is to be inserted.
context : str
The context string to search for in the file. The text is inserted after the line containing this context.
text_to_insert : str
The text to insert into the file.

Returns

str
A message for the bot with the result of the insert.

Raises

ValueError
If the provided context matches multiple lines in the file.
Expand source code
@log()
def insert_text_after_context(self, file_path: str, context: str, text_to_insert: str) -> str:
    """Inserts a given text immediately after a specified context in a file.

    This method opens the file, finds the line containing the specified context,
    and inserts the provided text immediately after this line. If the context
    matches multiple lines, it raises a ValueError due to ambiguity.

    Args:
        file_path (str): The path of the file in which the text is to be inserted.
        context (str): The context string to search for in the file. The text is
                       inserted after the line containing this context.
        text_to_insert (str): The text to insert into the file.

    Returns:
        str: A message for the bot with the result of the insert.

    Raises:
        ValueError: If the provided context matches multiple lines in the file.
    """
    if not file_path:
        raise TypeError("No file_path, please provide file_path for each request.")
    if not context:
        raise TypeError("No context, please context so I can find where to insert the text.")
    with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
        lines = file.readlines()
    original_lines = list(lines)

    context_line_indices = [i for i, line in enumerate(lines) if context in line]

    if len(context_line_indices) == 0:
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            plain_text = file.read()
        raise ValueError(
            f"No matches found, no changes made, context is not a substring of any row. "
            f"For reference, here is the contents of the file:\n{plain_text}"
        )

    # Check for ambiguity in the context match
    if len(context_line_indices) > 1:
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            plain_text = file.read()
        found_at = ", ".join([str(i) for i in context_line_indices])
        raise ValueError(
            f"Ambiguous context: The provided context matches multiple lines, namely {found_at}. A context line the "
            "string or substring of the line just before your desired insertion point. It must "
            "uniquely identify a location. Either use a longer substring to match or switch to using"
            "the insert_text_after_multiline_context tool.\n"
            f"For reference, here is the contents of the file:\n{plain_text}"
        )

    # Index of the line after the context line
    insert_index = context_line_indices[0] + 1

    # Insert the text
    lines.insert(insert_index, text_to_insert + "\n")

    return self._save_if_changed(file_path, original_lines, lines)
def insert_text_after_multiline_context(self, file_path: str, context_lines: list[str], text_to_insert: str) ‑> str

Inserts text immediately after a specified multiline context in a file.

Opens the file and searches for a sequence of lines (context). Once the context is found, it inserts the specified text immediately after this context. If the context is not found, it raises a ValueError.

Args

file_path : str
The path of the file in which the text is to be inserted.
context_lines : list of str
A list of strings representing the multiline context to search for in the file.
text_to_insert : str
The text to insert into the file after the context.

Raises

ValueError
If the multiline context is not found in the file.
Expand source code
@log()
def insert_text_after_multiline_context(self, file_path: str, context_lines: list[str], text_to_insert: str) -> str:
    """Inserts text immediately after a specified multiline context in a file.

    Opens the file and searches for a sequence of lines (context). Once the context
    is found, it inserts the specified text immediately after this context. If the
    context is not found, it raises a ValueError.

    Args:
        file_path (str): The path of the file in which the text is to be inserted.
        context_lines (list of str): A list of strings representing the multiline
                                     context to search for in the file.
        text_to_insert (str): The text to insert into the file after the context.

    Raises:
        ValueError: If the multiline context is not found in the file.

    """
    if not file_path:
        raise TypeError("No file_path, please provide file_path for each request.")
    if not context_lines:
        raise TypeError("No context_lines, please context lines so I can find where to insert the new lines.")
    with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
        lines = file.readlines()

    try:
        ends_with_n = lines[:-1][0].endswith("\n")
    except IndexError:
        ends_with_n = False

    # this is going to make it hard to preserve whitespace.
    # Convert context_lines to a string for easier matching
    context_string = "".join([line + "\n" for line in context_lines]).rstrip("\n")

    # Convert file lines to a string
    file_string = "".join(lines)

    starts_at = file_string.find(context_string)
    if starts_at == -1:
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            plain_text = file.read()
        raise ValueError(
            f"No matches found, no changes made, context_lines are not found in this document. "
            f"For reference, here is the contents of the file:\n{plain_text}"
        )
    # Find the index where the context ends
    context_end_index = starts_at + len(context_string)

    # Split the file_string back into lines at the context end
    before_context = file_string[:context_end_index]
    after_context = file_string[context_end_index:]

    # Insert the new text
    new_file_string = before_context + "\n" + text_to_insert + "\n" + after_context.strip("\n")

    if ends_with_n:
        new_file_string += "\n"

    return self._save_if_changed(file_path, lines, new_file_string)
def insert_text_at_start_or_end(self, file_path: str, text_to_insert: str, position: str = 'end') ‑> str

Inserts text at the start or end of a file.

Opens the file and inserts the specified text either at the beginning or the end of the file, based on the 'position' argument. If the position argument is neither 'start' nor 'end', it raises a ValueError.

Args

file_path : str
The path of the file in which the text is to be inserted.
text_to_insert : str
The text to insert into the file.
position : str, optional
The position where the text should be inserted. Should be either 'start' or 'end'. Defaults to 'end'.

Raises

ValueError
If the 'position' argument is not 'start' or 'end'.
Expand source code
@log()
def insert_text_at_start_or_end(self, file_path: str, text_to_insert: str, position: str = "end") -> str:
    """Inserts text at the start or end of a file.

    Opens the file and inserts the specified text either at the beginning or the
    end of the file, based on the 'position' argument. If the position argument
    is neither 'start' nor 'end', it raises a ValueError.

    Args:
        file_path (str): The path of the file in which the text is to be inserted.
        text_to_insert (str): The text to insert into the file.
        position (str, optional): The position where the text should be inserted.
                                  Should be either 'start' or 'end'. Defaults to 'end'.

    Raises:
        ValueError: If the 'position' argument is not 'start' or 'end'.

    """
    if not file_path:
        raise TypeError("No file_path, please provide file_path for each request.")
    if not text_to_insert:
        raise TypeError("No text_to_insert, please provide so I have something to insert.")
    if position not in ("start", "end"):
        raise ValueError("position must be start or end, so I know where to insert text.")
    with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
        lines = file.readlines()
    original_lines = list(lines)
    if position == "start":
        lines.insert(0, text_to_insert + "\n")
    elif position == "end":
        lines.append(text_to_insert + "\n")
    else:
        raise ValueError("Invalid position: choose 'start' or 'end'.")

    return self._save_if_changed(file_path, original_lines, lines)
class LsTool (root_folder: str, config: Config)

Initialize the FindTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class LsTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the FindTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)

    @log()
    def ls_markdown(self, path: Optional[str] = ".", all_files: bool = False, long: bool = False) -> str:
        """List directory contents, with options to include all files and detailed view.

        Args:
            path (str, optional): The directory path to list. Defaults to the current directory '.'.
            all_files (bool): If True, include hidden files. Defaults to False.
            long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False.

        Returns:
            str: The markdown representation of the ls command output.
        """
        try:
            entries_info = self.ls(path, all_files, long)
        except (FileNotFoundError, NotADirectoryError):
            tree_text = tree(Path(os.getcwd()))
            markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}"
            return markdown_content

        output = StringIO()

        is_first = True
        for line in entries_info:
            if not is_first:
                output.write("\n")
            is_first = False
            output.write(line)

        output.seek(0)
        return output.read()

    @log()
    def ls(self, path: Optional[str] = None, all_files: bool = False, long: bool = False) -> Union[list[str], str]:
        """
        List directory contents, with options to include all files and detailed view.

        Args:
            path (str, optional): The directory path to list. Defaults to the current directory '.'.
            all_files (bool): If True, include hidden files. Defaults to False.
            long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False.

        Returns:
            List[str]: List of files and directories, optionally with details.
        """
        logger.info(f"ls --path {path} --all_files {all_files} --long  {long}")

        if path is None:
            path = ""

        if path is not None and ("?" in path or "*" in path or "[" in path or "]" in path):
            # Globs behave very different from non-globs. :(
            #  or "{" in path or "}"  <-- is this a glob pattern?
            entries = safe_glob(path, self.root_folder)
        else:
            try:
                # enumerate list to check if the path exists
                entries = list(
                    (_ for _ in os.listdir(path))
                    if all_files
                    else (entry for entry in os.listdir(path) if not entry.startswith("."))
                )
            except (FileNotFoundError, NotADirectoryError):
                # if not, just tell the bot everything.
                tree_text = tree(Path(os.getcwd()))
                markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}"
                return markdown_content
        entries_info = []

        for entry in entries:
            # is this None-safety here correct?
            full_path = entry if path is None else os.path.join(path, entry)
            if not is_file_in_root_folder(full_path, self.root_folder):
                continue
            if os.path.isdir(full_path) and entry.endswith("__pycache__"):
                continue
            if long:
                stats = os.stat(full_path)
                # Always human readable, too many tokens for byte count.
                size = human_readable_size(stats.st_size)
                mod_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(stats.st_mtime))
                entries_info.append(f"{size:} {mod_time} {entry}")
            else:
                entries_info.append(entry)
        if logger.level == logging.DEBUG:
            for line in entries_info:
                logger.debug(line)
        return entries_info

Methods

def ls(self, path: Optional[str] = None, all_files: bool = False, long: bool = False) ‑> Union[list[str], str]

List directory contents, with options to include all files and detailed view.

Args

path : str, optional
The directory path to list. Defaults to the current directory '.'.
all_files : bool
If True, include hidden files. Defaults to False.
long : bool
If True, include details like permissions, owner, size, and modification date. Defaults to False.

Returns

List[str]
List of files and directories, optionally with details.
Expand source code
@log()
def ls(self, path: Optional[str] = None, all_files: bool = False, long: bool = False) -> Union[list[str], str]:
    """
    List directory contents, with options to include all files and detailed view.

    Args:
        path (str, optional): The directory path to list. Defaults to the current directory '.'.
        all_files (bool): If True, include hidden files. Defaults to False.
        long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False.

    Returns:
        List[str]: List of files and directories, optionally with details.
    """
    logger.info(f"ls --path {path} --all_files {all_files} --long  {long}")

    if path is None:
        path = ""

    if path is not None and ("?" in path or "*" in path or "[" in path or "]" in path):
        # Globs behave very different from non-globs. :(
        #  or "{" in path or "}"  <-- is this a glob pattern?
        entries = safe_glob(path, self.root_folder)
    else:
        try:
            # enumerate list to check if the path exists
            entries = list(
                (_ for _ in os.listdir(path))
                if all_files
                else (entry for entry in os.listdir(path) if not entry.startswith("."))
            )
        except (FileNotFoundError, NotADirectoryError):
            # if not, just tell the bot everything.
            tree_text = tree(Path(os.getcwd()))
            markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}"
            return markdown_content
    entries_info = []

    for entry in entries:
        # is this None-safety here correct?
        full_path = entry if path is None else os.path.join(path, entry)
        if not is_file_in_root_folder(full_path, self.root_folder):
            continue
        if os.path.isdir(full_path) and entry.endswith("__pycache__"):
            continue
        if long:
            stats = os.stat(full_path)
            # Always human readable, too many tokens for byte count.
            size = human_readable_size(stats.st_size)
            mod_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(stats.st_mtime))
            entries_info.append(f"{size:} {mod_time} {entry}")
        else:
            entries_info.append(entry)
    if logger.level == logging.DEBUG:
        for line in entries_info:
            logger.debug(line)
    return entries_info
def ls_markdown(self, path: Optional[str] = '.', all_files: bool = False, long: bool = False) ‑> str

List directory contents, with options to include all files and detailed view.

Args

path : str, optional
The directory path to list. Defaults to the current directory '.'.
all_files : bool
If True, include hidden files. Defaults to False.
long : bool
If True, include details like permissions, owner, size, and modification date. Defaults to False.

Returns

str
The markdown representation of the ls command output.
Expand source code
@log()
def ls_markdown(self, path: Optional[str] = ".", all_files: bool = False, long: bool = False) -> str:
    """List directory contents, with options to include all files and detailed view.

    Args:
        path (str, optional): The directory path to list. Defaults to the current directory '.'.
        all_files (bool): If True, include hidden files. Defaults to False.
        long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False.

    Returns:
        str: The markdown representation of the ls command output.
    """
    try:
        entries_info = self.ls(path, all_files, long)
    except (FileNotFoundError, NotADirectoryError):
        tree_text = tree(Path(os.getcwd()))
        markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}"
        return markdown_content

    output = StringIO()

    is_first = True
    for line in entries_info:
        if not is_first:
            output.write("\n")
        is_first = False
        output.write(line)

    output.seek(0)
    return output.read()
class PatchTool (root_folder: str, config: Config)

Edit a file by applying a git patch.

Initialize the PatchTool with a root folder.

Args

root_folder : str
The root folder for valid patchable files.
config : Config
The developer input that bot shouldn't set.
Expand source code
class PatchTool:
    """Edit a file by applying a git patch."""

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the PatchTool with a root folder.

        Args:
            root_folder (str): The root folder for valid patchable files.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder: str = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)

    @log()
    def apply_git_patch(self, patch_content: str) -> str:
        """
        Apply a git patch to the files in the root folder.

        Args:
            patch_content (str): The content of the git patch.

        Returns:
            str: A message indicating successful patch application.

        Raises:
            RuntimeError: If the patch application fails.
        """
        # Create a temporary file to store the patch content
        with tempfile.NamedTemporaryFile(suffix=".patch", delete=False) as tmp_patch:
            tmp_patch_name = tmp_patch.name
            tmp_patch.write(patch_content.encode("utf-8"))
            tmp_patch.flush()

        _patch = PatchSet.from_filename(tmp_patch_name, encoding="utf-8")

        cmd = ["git", "apply", tmp_patch_name, "--reject", "--verbose"]

        # Execute the command and capture stdout and stderr
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=True, shell=False)  # nosec
            # Log stdout and stderr
            logger.info("STDOUT:\n%s", result.stdout.replace(" ", ".").replace("\n", "\\n"))
            logger.info("STDERR:\n%s", result.stderr.replace(" ", ".").replace("\n", "\\n"))

            # Check for errors and return the result
            if result.returncode != 0:
                raise RuntimeError(f"Failed to apply patch: {result.stderr}")

        except subprocess.CalledProcessError as cpe:
            print(cpe)
            print(cpe.stdout)
            print(cpe.stderr)
            raise

        return "Patch applied without exception, please verify by other means to see if it was successful."

    def _extract_files_from_patch(self, patch_content: str) -> set[str]:
        """
        Extract file names from the patch content.

        Args:
            patch_content (str): The content of the git patch.

        Returns:
            set[str]: A set of file names extracted from the patch.
        """
        file_names = set()
        lines = patch_content.split("\n")

        for line in lines:
            if line.startswith("--- a/") or line.startswith("+++ b/"):
                # Extract the file name and add it to the set
                parts = line.split()
                if len(parts) > 1:
                    file_name = parts[1]
                    if file_name.startswith("a/") or file_name.startswith("b/"):
                        file_name = file_name[2:]
                    file_names.add(file_name)

        return file_names

Methods

def apply_git_patch(self, patch_content: str) ‑> str

Apply a git patch to the files in the root folder.

Args

patch_content : str
The content of the git patch.

Returns

str
A message indicating successful patch application.

Raises

RuntimeError
If the patch application fails.
Expand source code
@log()
def apply_git_patch(self, patch_content: str) -> str:
    """
    Apply a git patch to the files in the root folder.

    Args:
        patch_content (str): The content of the git patch.

    Returns:
        str: A message indicating successful patch application.

    Raises:
        RuntimeError: If the patch application fails.
    """
    # Create a temporary file to store the patch content
    with tempfile.NamedTemporaryFile(suffix=".patch", delete=False) as tmp_patch:
        tmp_patch_name = tmp_patch.name
        tmp_patch.write(patch_content.encode("utf-8"))
        tmp_patch.flush()

    _patch = PatchSet.from_filename(tmp_patch_name, encoding="utf-8")

    cmd = ["git", "apply", tmp_patch_name, "--reject", "--verbose"]

    # Execute the command and capture stdout and stderr
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, check=True, shell=False)  # nosec
        # Log stdout and stderr
        logger.info("STDOUT:\n%s", result.stdout.replace(" ", ".").replace("\n", "\\n"))
        logger.info("STDERR:\n%s", result.stderr.replace(" ", ".").replace("\n", "\\n"))

        # Check for errors and return the result
        if result.returncode != 0:
            raise RuntimeError(f"Failed to apply patch: {result.stderr}")

    except subprocess.CalledProcessError as cpe:
        print(cpe)
        print(cpe.stdout)
        print(cpe.stderr)
        raise

    return "Patch applied without exception, please verify by other means to see if it was successful."
class PyCatTool (root_folder: str, config: Config)

Initialize the PyCatTool with a root folder.

Args

root_folder : str
The root folder path to start the file traversal from.
config : Config
The developer input that bot shouldn't set.
Expand source code
class PyCatTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the PyCatTool with a root folder.

        Args:
            root_folder (str): The root folder path to start the file traversal from.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    @log()
    def format_code_as_markdown(
        self,
        base_path: str,
        header: str,
        no_docs: bool = False,
        no_comments: bool = False,
    ) -> str:
        """
        Combine all Python files in a directory into a single Markdown file.

        This method traverses the directory starting from base_path, and for each Python file found,
        its contents are formatted and appended to the Markdown file specified by output_file.

        Args:
            base_path (str): The base path of the directory to start traversing.
            header (str): A header string to be included at the beginning of the Markdown file.
            no_docs (bool): Whether to exclude docstrings from the output. Defaults to False.
            no_comments (bool): Whether to exclude comments from the output. Defaults to False.

        Returns:
            str: The Markdown file contents.
        """
        output_file = StringIO()
        if header == "tree":
            tree_text = tree(Path(base_path))
            markdown_content = f"# Source Code Filesystem Tree\n\n{tree_text}"
            output_file.write(markdown_content)

        markdown_content = f"# {header} Source Code\n\n"

        for root, _dirs, files in os.walk(base_path):
            for file in files:
                if not is_file_in_root_folder(file, self.root_folder):
                    continue
                if is_python_file(file):
                    full_path = os.path.join(root, file)
                    relative_path = os.path.relpath(full_path, base_path)
                    markdown_content += format_path_as_header(relative_path)
                    markdown_content += "```python\n"
                    with open(full_path, encoding="utf-8", errors=self.utf8_errors) as handle:
                        text = handle.read()
                    markdown_content += text
                    markdown_content += "\n```\n\n"
        output_file.write(markdown_content)
        return output_file.getvalue()

Methods

def format_code_as_markdown(self, base_path: str, header: str, no_docs: bool = False, no_comments: bool = False) ‑> str

Combine all Python files in a directory into a single Markdown file.

This method traverses the directory starting from base_path, and for each Python file found, its contents are formatted and appended to the Markdown file specified by output_file.

Args

base_path : str
The base path of the directory to start traversing.
header : str
A header string to be included at the beginning of the Markdown file.
no_docs : bool
Whether to exclude docstrings from the output. Defaults to False.
no_comments : bool
Whether to exclude comments from the output. Defaults to False.

Returns

str
The Markdown file contents.
Expand source code
@log()
def format_code_as_markdown(
    self,
    base_path: str,
    header: str,
    no_docs: bool = False,
    no_comments: bool = False,
) -> str:
    """
    Combine all Python files in a directory into a single Markdown file.

    This method traverses the directory starting from base_path, and for each Python file found,
    its contents are formatted and appended to the Markdown file specified by output_file.

    Args:
        base_path (str): The base path of the directory to start traversing.
        header (str): A header string to be included at the beginning of the Markdown file.
        no_docs (bool): Whether to exclude docstrings from the output. Defaults to False.
        no_comments (bool): Whether to exclude comments from the output. Defaults to False.

    Returns:
        str: The Markdown file contents.
    """
    output_file = StringIO()
    if header == "tree":
        tree_text = tree(Path(base_path))
        markdown_content = f"# Source Code Filesystem Tree\n\n{tree_text}"
        output_file.write(markdown_content)

    markdown_content = f"# {header} Source Code\n\n"

    for root, _dirs, files in os.walk(base_path):
        for file in files:
            if not is_file_in_root_folder(file, self.root_folder):
                continue
            if is_python_file(file):
                full_path = os.path.join(root, file)
                relative_path = os.path.relpath(full_path, base_path)
                markdown_content += format_path_as_header(relative_path)
                markdown_content += "```python\n"
                with open(full_path, encoding="utf-8", errors=self.utf8_errors) as handle:
                    text = handle.read()
                markdown_content += text
                markdown_content += "\n```\n\n"
    output_file.write(markdown_content)
    return output_file.getvalue()
class PytestTool (root_folder: str, config: Config)

Optimized for AI version of pytest.

Initialize the PytestTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class PytestTool:
    """Optimized for AI version of pytest."""

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the PytestTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder

        self.config = config
        self.module = config.get_value("pytest_module")
        self.tests_folder = config.get_value("pytest_folder")

        self.min_coverage = float(config.get_value("pytest_min_coverage") or 0.0)

    @log()
    def pytest(
        self,
    ) -> str:
        """
        Runs pytest on tests in tests folder..

        Returns:
            str: Output from pytest.
        """
        # Host script must set env vars, temp folder location and pwd!
        # with change_directory(self.root_folder):
        # What is -rA
        if not self.module or not self.tests_folder or self.min_coverage:
            raise FatalConfigurationError("Please set in ai_config module, test_folder and min_coverage")
        _passed_tests, _failed_tests, _coverage, command_result = count_pytest_results(
            self.module, self.tests_folder, self.min_coverage
        )
        markdown_output = f"""## Pytest Output
### Standard Output
{command_result.stdout}
### Standard Error
{command_result.stderr}
### Return Code
`{command_result.return_code}`"""
        return markdown_output

Methods

def pytest(self) ‑> str

Runs pytest on tests in tests folder..

Returns

str
Output from pytest.
Expand source code
    @log()
    def pytest(
        self,
    ) -> str:
        """
        Runs pytest on tests in tests folder..

        Returns:
            str: Output from pytest.
        """
        # Host script must set env vars, temp folder location and pwd!
        # with change_directory(self.root_folder):
        # What is -rA
        if not self.module or not self.tests_folder or self.min_coverage:
            raise FatalConfigurationError("Please set in ai_config module, test_folder and min_coverage")
        _passed_tests, _failed_tests, _coverage, command_result = count_pytest_results(
            self.module, self.tests_folder, self.min_coverage
        )
        markdown_output = f"""## Pytest Output
### Standard Output
{command_result.stdout}
### Standard Error
{command_result.stderr}
### Return Code
`{command_result.return_code}`"""
        return markdown_output
class ReplaceTool (root_folder: str, config: Config)

Initialize the SedTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class ReplaceTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the SedTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.python_module = config.get_value("python_module")
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    @log()
    def replace_line_by_line(
        self, file_path: str, old_text: str, new_text: str, line_start: int = 0, line_end: int = -1
    ) -> str:
        """Replaces occurrences of a specified text with new text in a range of lines in a file.

        Opens the file and replaces occurrences of 'old_text' with 'new_text' within the specified
        line range. If 'line_end' is -1, it defaults to the end of the file. Returns a message
        indicating whether changes were successfully applied or not.

        Args:
            file_path (str): The path to the file.
            old_text (str): The text to be replaced.
            new_text (str): The new text to replace the old text.
            line_start (int, optional): The starting line number (0-indexed) for the replacement.
                                        Defaults to 0.
            line_end (int, optional): The ending line number (0-indexed) for the replacement.
                                      If -1, it goes to the end of the file. Defaults to -1.

        Returns:
            str: A message indicating the success of the operation.

        Raises:
            TypeError: If file_path or old_text is None, or if no lines are left after replacement.
        """
        if not file_path:
            raise TypeError("No file_path, please provide file_path for each request.")
        if not old_text:
            raise TypeError("No old_text, please context so I can find the text to replace.")
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            input_text = file.read()
        lines = []
        input_lines = input_text.splitlines()
        if line_end == -1:
            line_end = len(input_lines)
        for line_no, line in enumerate(input_lines):
            if line_start <= line_no < line_end and old_text in line:
                line = line.replace(old_text, new_text)
            lines.append(line)
        if not lines:
            raise TypeError("Nothing left after replace, something went wrong, cancelling.")
        final = "\n".join(lines)
        return self._save_if_changed(file_path, final, input_text)

    @log()
    def replace_all(self, file_path: str, old_text: str, new_text: str) -> str:
        """Replaces all occurrences of a specified text with new text in a file.

        Opens the file and replaces all occurrences of 'old_text' with 'new_text'. Returns a
        message indicating whether changes were successfully applied or not.

        Args:
            file_path (str): The path to the file.
            old_text (str): The text to be replaced.
            new_text (str): The new text to replace the old text.

        Returns:
            str: A message indicating the success of the operation.

        Raises:
            TypeError: If file_path or old_text is None.
        """
        if new_text is None:
            new_text = ""
        if not file_path:
            raise TypeError("No file_path, please provide file_path for each request.")
        if not old_text:
            raise TypeError("No old_text, please context so I can find the text to replace.")
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            input_text = file.read()
        final = input_text.replace(old_text, new_text)
        return self._save_if_changed(file_path, final, input_text)

    @log()
    def replace_with_regex(self, file_path: str, regex_match_expression: str, replacement: str) -> str:
        """Replaces text in a file based on a regular expression match.

        Opens the file and replaces text that matches the regular expression 'regex_match_expression'
        with the 'replacement' text. Returns a message indicating whether changes were successfully
        applied or not.

        Args:
            file_path (str): The path to the file.
            regex_match_expression (str): The regular expression pattern to match.
            replacement (str): The text to replace the matched pattern.

        Returns:
            str: A message indicating the success of the operation.

        Raises:
            TypeError: If file_path or regex_match_expression is None.
        """
        if not file_path:
            raise TypeError("No file_path, please provide file_path for each request.")
        if not regex_match_expression:
            raise TypeError("No regex_match_expression, please context so I can find the text to replace.")
        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            input_text = file.read()
        final = re.sub(regex_match_expression, replacement, input_text)
        return self._save_if_changed(file_path, final, input_text)

    def _save_if_changed(self, file_path: str, final: str, input_text: str) -> str:
        """Saves the modified text to the file if changes have been made.

        Compares the original text with the modified text and writes the modified text
        to the file if there are changes. Returns a message indicating whether any changes
        were made.

        Args:
            file_path (str): The path to the file.
            final (str): The modified text.
            input_text (str): The original text.

        Returns:
            str: A message indicating whether changes were made or not.

        Raises:
            TypeError: If file_path is None.
        """
        if not final:
            raise TypeError("Something went wrong in replace and all text disappeared. Cancelling.")

        if input_text != final:
            BackupRestore.backup_file(file_path)
            with open(file_path, "w", encoding="utf-8", errors=self.utf8_errors) as output_file:
                output_file.write(final)

            validation = self._validate_code(file_path)

            if validation:
                BackupRestore.revert_to_latest_backup(file_path)
                return f"File not written because of problems.\n{validation.message}"

            if self.auto_cat:
                feedback = "Changes applied without exception, please verify by other means.\n"
                contents = CatTool(self.root_folder, self.config).cat_markdown([file_path])
                return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"
            return "Changes applied without exception, please verify by other means."
        return (
            "No changes made, this means the old file contents are the same as the new. This has nothing "
            "to do with file permissions. Try again with a different match pattern."
        )

    def _validate_code(self, full_path: str) -> Optional[ValidationMessageForBot]:
        """
        Validate python

        Args:
            full_path (str): The path to the file to validate.

        Returns:
            Optional[ValidationMessageForBot]: A validation message if the file is invalid, otherwise None.
        """
        if not is_python_file(full_path):
            return None
        if not self.python_module:
            logger.warning("No python module set, skipping validation.")
            return None
        validator = ValidateModule(self.python_module)
        results = validator.validate()
        explanation = validator.explain_to_bot(results)
        if explanation.is_valid:
            return None
        return explanation

Methods

def replace_all(self, file_path: str, old_text: str, new_text: str) ‑> str

Replaces all occurrences of a specified text with new text in a file.

Opens the file and replaces all occurrences of 'old_text' with 'new_text'. Returns a message indicating whether changes were successfully applied or not.

Args

file_path : str
The path to the file.
old_text : str
The text to be replaced.
new_text : str
The new text to replace the old text.

Returns

str
A message indicating the success of the operation.

Raises

TypeError
If file_path or old_text is None.
Expand source code
@log()
def replace_all(self, file_path: str, old_text: str, new_text: str) -> str:
    """Replaces all occurrences of a specified text with new text in a file.

    Opens the file and replaces all occurrences of 'old_text' with 'new_text'. Returns a
    message indicating whether changes were successfully applied or not.

    Args:
        file_path (str): The path to the file.
        old_text (str): The text to be replaced.
        new_text (str): The new text to replace the old text.

    Returns:
        str: A message indicating the success of the operation.

    Raises:
        TypeError: If file_path or old_text is None.
    """
    if new_text is None:
        new_text = ""
    if not file_path:
        raise TypeError("No file_path, please provide file_path for each request.")
    if not old_text:
        raise TypeError("No old_text, please context so I can find the text to replace.")
    with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
        input_text = file.read()
    final = input_text.replace(old_text, new_text)
    return self._save_if_changed(file_path, final, input_text)
def replace_line_by_line(self, file_path: str, old_text: str, new_text: str, line_start: int = 0, line_end: int = -1) ‑> str

Replaces occurrences of a specified text with new text in a range of lines in a file.

Opens the file and replaces occurrences of 'old_text' with 'new_text' within the specified line range. If 'line_end' is -1, it defaults to the end of the file. Returns a message indicating whether changes were successfully applied or not.

Args

file_path : str
The path to the file.
old_text : str
The text to be replaced.
new_text : str
The new text to replace the old text.
line_start : int, optional
The starting line number (0-indexed) for the replacement. Defaults to 0.
line_end : int, optional
The ending line number (0-indexed) for the replacement. If -1, it goes to the end of the file. Defaults to -1.

Returns

str
A message indicating the success of the operation.

Raises

TypeError
If file_path or old_text is None, or if no lines are left after replacement.
Expand source code
@log()
def replace_line_by_line(
    self, file_path: str, old_text: str, new_text: str, line_start: int = 0, line_end: int = -1
) -> str:
    """Replaces occurrences of a specified text with new text in a range of lines in a file.

    Opens the file and replaces occurrences of 'old_text' with 'new_text' within the specified
    line range. If 'line_end' is -1, it defaults to the end of the file. Returns a message
    indicating whether changes were successfully applied or not.

    Args:
        file_path (str): The path to the file.
        old_text (str): The text to be replaced.
        new_text (str): The new text to replace the old text.
        line_start (int, optional): The starting line number (0-indexed) for the replacement.
                                    Defaults to 0.
        line_end (int, optional): The ending line number (0-indexed) for the replacement.
                                  If -1, it goes to the end of the file. Defaults to -1.

    Returns:
        str: A message indicating the success of the operation.

    Raises:
        TypeError: If file_path or old_text is None, or if no lines are left after replacement.
    """
    if not file_path:
        raise TypeError("No file_path, please provide file_path for each request.")
    if not old_text:
        raise TypeError("No old_text, please context so I can find the text to replace.")
    with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
        input_text = file.read()
    lines = []
    input_lines = input_text.splitlines()
    if line_end == -1:
        line_end = len(input_lines)
    for line_no, line in enumerate(input_lines):
        if line_start <= line_no < line_end and old_text in line:
            line = line.replace(old_text, new_text)
        lines.append(line)
    if not lines:
        raise TypeError("Nothing left after replace, something went wrong, cancelling.")
    final = "\n".join(lines)
    return self._save_if_changed(file_path, final, input_text)
def replace_with_regex(self, file_path: str, regex_match_expression: str, replacement: str) ‑> str

Replaces text in a file based on a regular expression match.

Opens the file and replaces text that matches the regular expression 'regex_match_expression' with the 'replacement' text. Returns a message indicating whether changes were successfully applied or not.

Args

file_path : str
The path to the file.
regex_match_expression : str
The regular expression pattern to match.
replacement : str
The text to replace the matched pattern.

Returns

str
A message indicating the success of the operation.

Raises

TypeError
If file_path or regex_match_expression is None.
Expand source code
@log()
def replace_with_regex(self, file_path: str, regex_match_expression: str, replacement: str) -> str:
    """Replaces text in a file based on a regular expression match.

    Opens the file and replaces text that matches the regular expression 'regex_match_expression'
    with the 'replacement' text. Returns a message indicating whether changes were successfully
    applied or not.

    Args:
        file_path (str): The path to the file.
        regex_match_expression (str): The regular expression pattern to match.
        replacement (str): The text to replace the matched pattern.

    Returns:
        str: A message indicating the success of the operation.

    Raises:
        TypeError: If file_path or regex_match_expression is None.
    """
    if not file_path:
        raise TypeError("No file_path, please provide file_path for each request.")
    if not regex_match_expression:
        raise TypeError("No regex_match_expression, please context so I can find the text to replace.")
    with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
        input_text = file.read()
    final = re.sub(regex_match_expression, replacement, input_text)
    return self._save_if_changed(file_path, final, input_text)
class RewriteTool (root_folder: str, config: Config)

Initialize the RewriteTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class RewriteTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the RewriteTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.python_module = config.get_value("python_module")
        self.only_add_text = config.get_flag("only_add_text", False)

    @log()
    def write_new_file(self, file_path: str, text: str) -> str:
        """
        Write a new file at file_path within the root_folder.

        Args:
            file_path (str): The relative path to the file to be written.
            text (str): The content to write into the file.

        Returns:
            str: A success message with the file path.

        Raises:
            ValueError: If the file already exists or if the file_path is outside the root_folder.
        """
        file_path = sanitize_path(file_path)
        # Don't prepend root folder, we will have already cd'd to it.
        full_path = file_path
        if not is_file_in_root_folder(full_path, self.root_folder):
            raise ValueError("File path must be within the root folder.")

        try:
            if os.path.exists(full_path):
                raise FileExistsError("File already exists.")

            with open(full_path, "w", encoding="utf-8") as file:
                file.write(text)

            validation = self._validate_code(full_path)

            if validation:
                os.remove(full_path)
                return f"File not written because of problems.\n{validation.message}"

            return f"File written to {full_path}"
        except FileExistsError as e:
            tree_text = tree(Path(os.getcwd()))
            markdown_content = f"# File {full_path} already exists. Here are all the files you can see\n\n{tree_text}"
            raise ValueError(
                str(e) + f" {markdown_content}\n Consider using rewrite_file method if you want to overwrite."
            ) from e

    @log()
    def rewrite_file(self, file_path: str, text: str) -> str:
        """
        Backup and rewrite an existing file at file_path within the root_folder.
        This will completely replace the contents of the file with the new text.

        Args:
            file_path (str): The relative path to the file to be rewritten.
            text (str): The new content to write into the file.

        Returns:
            str: A success message with the file path.

        Raises:
            ValueError: If the file does not exist or if the file_path is outside the root_folder.
        """
        if not text:
            raise TypeError("This would delete everything in the file. This is probably not what you want.")

        file_path = sanitize_path(file_path)

        # Don't prepend root folder, we will have already cd'd to it.
        full_path = file_path
        if not is_file_in_root_folder(full_path, self.root_folder):
            raise ValueError("File path must be within the root folder.")

        # not sure this is working right.
        _unchanged_proportion, initial, unchanged, added, removed = file_similarity(full_path, text.split("\n"))
        if self.only_add_text and removed > 0:
            raise TypeError("This would delete lines. Only add lines, do not remove them.")
        if self.only_add_text and len(text.split("\n")) < initial:
            raise TypeError("Line count decreased. Only add text, do not remove it.")
        # if 5 < initial <= removed:
        #     # concern is taking a large file, and deleting everything (ie. confusing full rewrite for an insert or edit)
        #     raise TypeError(
        #         "Removed lines is equal initial number of lines. "
        #         "When rewriting files, you have to re-write the previous lines, too."
        #     )
        # if unchanged > 0 and initial > 0 and added == 0 and removed == 0:
        #     raise TypeError(
        #         "Nothing changed, nothing was added or removed. "
        #         "When rewriting files, you have to re-write the whole file "
        #         "with lines changed, added or removed."
        #     )

        try:
            if not os.path.exists(full_path):
                raise FileNotFoundError("File does not exist, use ls tool to see what files there are.")

            BackupRestore.backup_file(full_path)

            with open(full_path, "w", encoding="utf-8") as file:
                file.write(text)

            validation = self._validate_code(full_path)

            if validation:
                BackupRestore.revert_to_latest_backup(full_path)
                return f"File not rewritten because of problems.\n{validation.message}"

            feedback = f"File rewritten to {full_path}"
            if self.auto_cat:
                feedback = "Changes made without exception, please verify by other means.\n"
                contents = CatTool(self.root_folder, self.config).cat_markdown([file_path])
                return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"
            return feedback + ", please view to verify contents."
        except FileNotFoundError as e:
            raise FileNotFoundError(
                str(e) + " Consider using write_new_file method if you want to create a new file."
            ) from e

    def _validate_code(self, full_path: str) -> Optional[ValidationMessageForBot]:
        """
        Validate python

        Args:
            full_path (str): The path to the file to validate.

        Returns:
            Optional[ValidationMessageForBot]: A validation message if the file is invalid, otherwise None.
        """
        if not is_python_file(full_path):
            return None
        if not self.python_module:
            logger.warning("No python module set, skipping validation.")
            return None
        validator = ValidateModule(self.python_module)
        results = validator.validate()
        explanation = validator.explain_to_bot(results)
        if explanation.is_valid:
            return None
        return explanation

Methods

def rewrite_file(self, file_path: str, text: str) ‑> str

Backup and rewrite an existing file at file_path within the root_folder. This will completely replace the contents of the file with the new text.

Args

file_path : str
The relative path to the file to be rewritten.
text : str
The new content to write into the file.

Returns

str
A success message with the file path.

Raises

ValueError
If the file does not exist or if the file_path is outside the root_folder.
Expand source code
@log()
def rewrite_file(self, file_path: str, text: str) -> str:
    """
    Backup and rewrite an existing file at file_path within the root_folder.
    This will completely replace the contents of the file with the new text.

    Args:
        file_path (str): The relative path to the file to be rewritten.
        text (str): The new content to write into the file.

    Returns:
        str: A success message with the file path.

    Raises:
        ValueError: If the file does not exist or if the file_path is outside the root_folder.
    """
    if not text:
        raise TypeError("This would delete everything in the file. This is probably not what you want.")

    file_path = sanitize_path(file_path)

    # Don't prepend root folder, we will have already cd'd to it.
    full_path = file_path
    if not is_file_in_root_folder(full_path, self.root_folder):
        raise ValueError("File path must be within the root folder.")

    # not sure this is working right.
    _unchanged_proportion, initial, unchanged, added, removed = file_similarity(full_path, text.split("\n"))
    if self.only_add_text and removed > 0:
        raise TypeError("This would delete lines. Only add lines, do not remove them.")
    if self.only_add_text and len(text.split("\n")) < initial:
        raise TypeError("Line count decreased. Only add text, do not remove it.")
    # if 5 < initial <= removed:
    #     # concern is taking a large file, and deleting everything (ie. confusing full rewrite for an insert or edit)
    #     raise TypeError(
    #         "Removed lines is equal initial number of lines. "
    #         "When rewriting files, you have to re-write the previous lines, too."
    #     )
    # if unchanged > 0 and initial > 0 and added == 0 and removed == 0:
    #     raise TypeError(
    #         "Nothing changed, nothing was added or removed. "
    #         "When rewriting files, you have to re-write the whole file "
    #         "with lines changed, added or removed."
    #     )

    try:
        if not os.path.exists(full_path):
            raise FileNotFoundError("File does not exist, use ls tool to see what files there are.")

        BackupRestore.backup_file(full_path)

        with open(full_path, "w", encoding="utf-8") as file:
            file.write(text)

        validation = self._validate_code(full_path)

        if validation:
            BackupRestore.revert_to_latest_backup(full_path)
            return f"File not rewritten because of problems.\n{validation.message}"

        feedback = f"File rewritten to {full_path}"
        if self.auto_cat:
            feedback = "Changes made without exception, please verify by other means.\n"
            contents = CatTool(self.root_folder, self.config).cat_markdown([file_path])
            return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"
        return feedback + ", please view to verify contents."
    except FileNotFoundError as e:
        raise FileNotFoundError(
            str(e) + " Consider using write_new_file method if you want to create a new file."
        ) from e
def write_new_file(self, file_path: str, text: str) ‑> str

Write a new file at file_path within the root_folder.

Args

file_path : str
The relative path to the file to be written.
text : str
The content to write into the file.

Returns

str
A success message with the file path.

Raises

ValueError
If the file already exists or if the file_path is outside the root_folder.
Expand source code
@log()
def write_new_file(self, file_path: str, text: str) -> str:
    """
    Write a new file at file_path within the root_folder.

    Args:
        file_path (str): The relative path to the file to be written.
        text (str): The content to write into the file.

    Returns:
        str: A success message with the file path.

    Raises:
        ValueError: If the file already exists or if the file_path is outside the root_folder.
    """
    file_path = sanitize_path(file_path)
    # Don't prepend root folder, we will have already cd'd to it.
    full_path = file_path
    if not is_file_in_root_folder(full_path, self.root_folder):
        raise ValueError("File path must be within the root folder.")

    try:
        if os.path.exists(full_path):
            raise FileExistsError("File already exists.")

        with open(full_path, "w", encoding="utf-8") as file:
            file.write(text)

        validation = self._validate_code(full_path)

        if validation:
            os.remove(full_path)
            return f"File not written because of problems.\n{validation.message}"

        return f"File written to {full_path}"
    except FileExistsError as e:
        tree_text = tree(Path(os.getcwd()))
        markdown_content = f"# File {full_path} already exists. Here are all the files you can see\n\n{tree_text}"
        raise ValueError(
            str(e) + f" {markdown_content}\n Consider using rewrite_file method if you want to overwrite."
        ) from e
class SedTool (root_folder: str, config: Config)

Initialize the SedTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class SedTool:
    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the SedTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        self.auto_cat = config.get_flag("auto_cat", True)
        self.utf8_errors = config.get_value("utf8_errors", "surrogateescape")

    @log()
    def sed(self, file_path: str, commands: list[str]) -> str:
        r"""
        Transform the contents of a file located at file_path as per the provided sed-like commands.

        Args:
            file_path (str): The path of the file to be transformed.
            commands (list[str]): A list of sed-like commands for text transformation.

        Returns:
            str: The transformed text from the file.

        Supported command syntax:
            - s/regex/replacement/flags: Regex substitution.
            - p: Print the current line.
            - a\text: Append text after the current line.
            - i\text: Insert text before the current line.
            - [number]c\text: Change the text of a specific line number.
            - [number]d: Delete a specific line number.

        Note: This function reads from a file and returns the transformed text. It does not modify the file in-place.
        """
        if not is_file_in_root_folder(file_path, self.root_folder):
            raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.")

        with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
            input_text = file.read()
        output_text = SedTool._process_sed(input_text, commands)
        if is_python_file(file_path):
            is_valid, error = is_valid_python_source(output_text)
            if not is_valid and error is not None:
                return f"Invalid Python source code. No changes made. {error.lineno} {error.msg} {error.text}"

        if input_text != output_text:
            with open(file_path, "w", encoding="utf-8") as output_file:
                output_file.write(output_text)

            if self.auto_cat:
                feedback = "Changes without exception, please verify by other means.\n"
                contents = CatTool(self.root_folder, self.config).cat_markdown([file_path])
                return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"
            return "Changes without exception, please verify by other means."
        return "No changes made."

    @classmethod
    def _process_sed(cls, input_text: str, commands: list[str]) -> str:
        r"""
        Transform input_text as per the provided sed-like commands.

        Args:
            input_text (str): The input text to be transformed.
            commands (list[str]): A list of sed-like commands for text transformation.

        Returns:
            str: The transformed text.

        Supported command syntax:
            - s/regex/replacement/flags: Regex substitution.
            - a\text: Append text after the current line.
            - i\text: Insert text before the current line.
            - [number]c\text: Change the text of a specific line number.
            - [number]d: Delete a specific line number.

        Example:
            >>> SedTool._process_sed("Hello World\\nThis is a test", ["s/World/Universe/", "a\\Appended text"])
            'Hello Universe\\nThis is a test\nAppended text'
            >>> SedTool._process_sed("First Line\\nSecond Line", ["2d", "i\\Inserted at Start"])
            'Inserted at Start\nFirst Line\\nSecond Line'
        """
        if isinstance(commands, str):
            commands = [commands]

        # don't know how to fix the covariant/invariant typing issue here
        lines: list[str] = input_text.split("\n")

        for i in range(len(lines)):
            for command in commands:
                if command.startswith("s/") and re.match(r"s/.+/.*/", command):
                    # Regex substitution: s/regex/replacement/flags
                    parts = command[2:].rsplit("/", 2)
                    regex, replacement, flags = parts[0], parts[1], parts[2] if len(parts) > 2 else ""
                    count = 1 if "g" not in flags else 0  # replace all if 'g' is present
                    lines[i] = re.sub(regex, replacement, lines[i], count=count)
                elif command.startswith("a\\"):
                    # Append: a\text
                    append_text = command[2:]
                    lines[i] += "\n" + append_text
                elif re.match(r"\d+a\\", command):
                    # insert after the specified line. a for after.
                    target_line, change_text = command.split("a\\")
                    if i + 1 == int(target_line):
                        lines[i] = change_text
                elif command.startswith("i\\") and i == 0:
                    # Insert: i\text (only at the beginning of the text)
                    insert_text = command[2:]
                    lines[i] = insert_text + "\n" + lines[i]
                elif re.match(r"\d+c\\", command):
                    # Change specific line: [number]c\text
                    target_line, change_text = command.split("c\\")
                    if i + 1 == int(target_line):
                        lines[i] = change_text
                elif re.match(r"\d+d", command):
                    # Delete specific line: [number]d
                    delete_line = int(command[:-1])
                    if i + 1 == delete_line:
                        # None was a better deletion marker, but messes with mypy.
                        lines[i] = "None  # Mark for deletion"
                elif command == "p":
                    # print? No action?
                    pass
                else:
                    raise TypeError(
                        "Unknown command, expected prefix of s/ or a\\ or digit + c or digit + d for replace, append, change, or delete respectively"
                    )

        # Rebuild the output from modified lines, excluding deleted ones
        output = [line for line in lines if line is not None]

        return "\n".join(output)

    # # Rerun the regex substitution test with the corrected function
    # test_regex_substitution_corrected = lambda: simulate_sed_corrected(input_text, commands) == expected_output
    # test_regex_substitution_corrected()

Methods

def sed(self, file_path: str, commands: list[str]) ‑> str

Transform the contents of a file located at file_path as per the provided sed-like commands.

Args

file_path : str
The path of the file to be transformed.
commands : list[str]
A list of sed-like commands for text transformation.

Returns

str
The transformed text from the file.

Supported command syntax: - s/regex/replacement/flags: Regex substitution. - p: Print the current line. - a\text: Append text after the current line. - i\text: Insert text before the current line. - [number]c\text: Change the text of a specific line number. - [number]d: Delete a specific line number.

Note: This function reads from a file and returns the transformed text. It does not modify the file in-place.

Expand source code
@log()
def sed(self, file_path: str, commands: list[str]) -> str:
    r"""
    Transform the contents of a file located at file_path as per the provided sed-like commands.

    Args:
        file_path (str): The path of the file to be transformed.
        commands (list[str]): A list of sed-like commands for text transformation.

    Returns:
        str: The transformed text from the file.

    Supported command syntax:
        - s/regex/replacement/flags: Regex substitution.
        - p: Print the current line.
        - a\text: Append text after the current line.
        - i\text: Insert text before the current line.
        - [number]c\text: Change the text of a specific line number.
        - [number]d: Delete a specific line number.

    Note: This function reads from a file and returns the transformed text. It does not modify the file in-place.
    """
    if not is_file_in_root_folder(file_path, self.root_folder):
        raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.")

    with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file:
        input_text = file.read()
    output_text = SedTool._process_sed(input_text, commands)
    if is_python_file(file_path):
        is_valid, error = is_valid_python_source(output_text)
        if not is_valid and error is not None:
            return f"Invalid Python source code. No changes made. {error.lineno} {error.msg} {error.text}"

    if input_text != output_text:
        with open(file_path, "w", encoding="utf-8") as output_file:
            output_file.write(output_text)

        if self.auto_cat:
            feedback = "Changes without exception, please verify by other means.\n"
            contents = CatTool(self.root_folder, self.config).cat_markdown([file_path])
            return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"
        return "Changes without exception, please verify by other means."
    return "No changes made."
class TaskBot (config: Config, name: str, bot_instructions: str, model: str, dialog_logger_md: DialogLoggerWithMarkdown, persist_bots: bool = False, persist_threads: bool = False, maximum_loops: int = 10)

Minimal bot management code.

Expand source code
class TaskBot:
    """Minimal bot management code."""

    def __init__(
        self,
        config: Config,
        name: str,
        bot_instructions: str,
        model: str,
        dialog_logger_md: DialogLoggerWithMarkdown,
        persist_bots: bool = False,
        persist_threads: bool = False,
        maximum_loops: int = 10,
    ):
        self.model = model
        """Model, name and instructions uniquely identify a bot."""
        self.name = name
        """Model, name and instructions uniquely identify a bot."""
        self.bot_instructions = bot_instructions
        """Model, name and instructions uniquely identify a bot."""

        self.client: openai.AsyncOpenAI = openai.AsyncOpenAI()
        self.thread: Optional[Thread] = None
        self.assistant: Optional[Assistant] = None

        self.dialog_logger_md = dialog_logger_md
        """Conversation style logger"""

        self.persist_bots = persist_bots
        """Keep bots or attempt to delete them at end of session"""

        self.persist_threads = persist_threads
        """Keep thread or attempt to delete them"""

        self.config = config
        """Stores bot, thread config and other global config."""

        self.maximum_loops = maximum_loops
        """Prevent infinite loops and money wastage."""

        self.toolkit: Optional[ToolKit] = None
        """Reference to toolkit so that goal checkers can check if any tools were used."""

        self.allow_self_certification = False
        """Do you want to trust the bot when it says it has achieved the goal?"""

        self.conversation_over_marker = "DONE"
        """Goal function checker returns this when done."""

    async def initialize(self) -> None:
        """Get or create a bot and store it in the config."""
        bot = await self.get_create_bot()
        logger.debug(f"Assistant id: {bot.id}")
        self.assistant = bot
        self.dialog_logger_md.write_header(bot_name=self.name, model=self.model, bot_instructions=self.bot_instructions)

    async def get_create_bot(self) -> Assistant:
        """Get or create a bot and store it in the config."""
        current_bot = self.config.get_bot(self.name)
        if not current_bot:
            await self.create_bot()
        else:
            try:
                self.assistant = await self.client.beta.assistants.retrieve(current_bot.assistant_id)
                logger.debug(f"Assistant retrieved: {self.assistant.id}")
            except openai.NotFoundError:
                await self.create_bot()
        if not self.assistant:
            raise TypeError("Assistant not found or created.")
        logger.debug(f"Assistant id: {self.assistant.id}")
        return self.assistant

    async def create_bot(self):
        """Create a bot and store it in the config."""
        self.assistant = await self.client.beta.assistants.create(
            name=self.name,
            instructions=self.bot_instructions,
            model=self.model,
        )
        self.config.add_bot(self.assistant.id, self.name)
        logger.debug(f"Assistant created: {self.assistant.id}")

    def toolkit_factory(
        self, root_folder: str, model: str, tool_names: list[str]
    ) -> tuple[ToolKit, list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction]]:
        self.toolkit = ToolKit(root_folder, model, 500, permitted_tools=tool_names, config=self.config)
        # sync COM
        self.toolkit.conversation_over_marker = self.conversation_over_marker
        initialize_all_tools(keeps=tool_names)
        tools_schema: list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction] = [
            ToolAssistantToolsFunction(**{"function": cast(FunctionDefinition, schema), "type": "function"})
            for schema in ALL_TOOLS
        ]
        if not tools_schema:
            raise Exception("Not enough tools!")
        return self.toolkit, tools_schema

    async def one_shot_ask(self, the_ask: str) -> Any:
        """Free-form request, structured response.

        Args:
            the_ask (str): The request.

        Returns:
            Any: The response.
        """
        if not self.toolkit:
            raise TypeError("Missing toolkit before one_shot_ask")
        if not self.assistant:
            raise TypeError("Missing assistant before one_shot_ask")
        try:
            _, tool_schemas = self.toolkit_factory(
                ".",
                self.model,
                [
                    # "report_bool",
                    "report_dict",
                    "report_float",
                    "report_int",
                    # "report_json",
                    "report_list",
                    # "report_set",
                    # "report_text", Why? Just do an unstructured query.
                    # "report_toml",
                    # "report_tuple",
                    # "report_xml",
                ],
            )
            thread = await self.client.beta.threads.create()
            logger.info(the_ask)
            _message = await self.client.beta.threads.messages.create(
                thread_id=thread.id,
                role="user",
                content=the_ask,
            )
            # pydantic_tools =[run_create_params.Tool(_) for _ in tool_schemas]
            run = await self.client.beta.threads.runs.create(
                thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas
            )
            tool_use_count = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md)
            if tool_use_count == 0:
                logger.warning("No tool usage, something went wrong.")

            messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc")
            # logger.info(messages)
            last_words = parse_message(messages)
            self.dialog_logger_md.add_bot(last_words)

        except Exception as exception:
            self.dialog_logger_md.add_error(exception)
            raise
        finally:
            # clean up thread
            if self.thread and not self.persist_threads:
                await self.client.beta.threads.delete(self.thread.id)
            if self.assistant and not self.persist_bots:
                # clean up assistant
                await self.client.beta.assistants.delete(self.assistant.id)

    async def basic_tool_loop(
        self,
        the_ask: str,
        root_folder: str,
        tool_names: list[Any],
        keep_going_prompt: Callable[[ToolKit], Awaitable[str]],
        stop_on_no_tool_use: bool,
    ) -> int:
        """
        Loop through tool requests.

        Args:
            the_ask (str): The initial request.
            root_folder (str): The root folder for file operations.
            tool_names (list[Any]): The tools to use.
            keep_going_prompt (str): The prompt to use to keep going.
            stop_on_no_tool_use (bool): Stop if no tools are used.

        Returns:
            None
        """
        if not self.assistant:
            raise TypeError("Missing assistant before basic_tool_loop")

        if self.dialog_logger_md:
            self.dialog_logger_md.add_user(the_ask)
            self.dialog_logger_md.add_toolkit(tool_names)

        tool_loops = 0
        total_tool_use_count = 0
        try:
            if self.allow_self_certification:
                tool_names.append("report_text")
            tool_names = list(set(tool_names))
            _, tool_schemas = self.toolkit_factory(root_folder, self.model, tool_names)
            if not self.toolkit:
                raise TypeError("Missing toolkit before basic_tool_loop")
            thread = await self.client.beta.threads.create()
            logger.info(the_ask)
            _message = await self.client.beta.threads.messages.create(
                thread_id=thread.id,
                role="user",
                content=the_ask,
            )
            run = await self.client.beta.threads.runs.create(
                thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas
            )
            tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md)
            tool_loops += 1
            if tool_loops > self.maximum_loops:
                raise TypeError("Too many tool loops")

            total_tool_use_count += tools_used_this_round

            messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc")
            initial_bot_response = parse_message(messages)
            self.dialog_logger_md.add_bot(initial_bot_response)

            # Did you use any tools? (maybe move this to goal function)
            # if tools_used_this_round == 0:
            #     initial_user_response = (
            #         "I see you didn't use any tools.  "
            #         "Please list what tools you have available, and if there are some available, "
            #         "why they were not useful."
            #     )
            # else:
            if not self.toolkit:
                raise TypeError("Missing toolkit before keep_going_prompt")
            initial_user_response = await keep_going_prompt(self.toolkit)

            # TODO: make into method.
            await self.client.beta.threads.messages.create(
                thread_id=thread.id,
                role="user",
                content=initial_user_response,
            )
            run = await self.client.beta.threads.runs.create(thread_id=thread.id, assistant_id=self.assistant.id)

            self.dialog_logger_md.add_user(initial_user_response)

            # "keep going/done" loop
            done = "NOPE"
            tools_used_this_round = -1

            # TODO: initialize this in constructor
            if (
                self.allow_self_certification
                and hasattr(self.toolkit, "tool_answer_collector")
                and self.toolkit.tool_answer_collector
            ):
                final_report = self.toolkit.tool_answer_collector.text_answer
                final_comment = self.toolkit.tool_answer_collector.comment
                self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}")
                return total_tool_use_count

            # Bot has at least 3 ways to stop
            # - return message of DONE
            # - use answer tool to submit DONE, or IMPOSSIBLE
            # - stop using tools
            if tools_used_this_round == 0 and stop_on_no_tool_use:
                logger.info("No tools used this round, conversation will end.")
            while done != self.conversation_over_marker or (tools_used_this_round == 0 and stop_on_no_tool_use):
                tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md)
                # Did we use any tools
                total_tool_use_count += tools_used_this_round

                # infinite loop protection
                tool_loops += 1
                if tool_loops > self.maximum_loops:
                    raise TypeError("Too many tool loops")

                messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc")
                successive_response = parse_message(messages)
                self.dialog_logger_md.add_bot(successive_response)
                if self.allow_self_certification:
                    # TODO: move this to goal checker?
                    done = capture_done_message(messages, self.conversation_over_marker)
                    if done == self.conversation_over_marker:
                        break

                # Did bot use tool to submit final report. Wow. Can't trust all bots.
                if (
                    self.allow_self_certification
                    and hasattr(self.toolkit, "tool_answer_collector")
                    and self.toolkit.tool_answer_collector
                ):
                    final_report = str(self.toolkit.tool_answer_collector.text_answer).upper().strip()
                    final_comment = self.toolkit.tool_answer_collector.comment
                    self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}")
                    break

                if done != self.conversation_over_marker:
                    # Replace with 2nd bot?
                    keep_going_text = await keep_going_prompt(self.toolkit)
                    # This is *not* self certification
                    if keep_going_text == self.conversation_over_marker:
                        # The bot did a good job and we can certify that.
                        break
                    self.dialog_logger_md.add_user(keep_going_text)
                    logger.info(keep_going_text)
                    await self.client.beta.threads.messages.create(
                        thread_id=thread.id,
                        role="user",
                        content=keep_going_text,
                    )
                    run = await self.client.beta.threads.runs.create(
                        thread_id=thread.id, assistant_id=self.assistant.id
                    )

        except Exception as exception:
            self.dialog_logger_md.add_error(exception)
            raise
        finally:
            # clean up thread
            if self.thread and not self.persist_threads:
                await self.client.beta.threads.delete(self.thread.id)
            if self.assistant and not self.persist_bots:
                # clean up assistant
                await self.client.beta.assistants.delete(self.assistant.id)
        return total_tool_use_count

Instance variables

var allow_self_certification

Do you want to trust the bot when it says it has achieved the goal?

var bot_instructions

Model, name and instructions uniquely identify a bot.

var config

Stores bot, thread config and other global config.

var conversation_over_marker

Goal function checker returns this when done.

var dialog_logger_md

Conversation style logger

var maximum_loops

Prevent infinite loops and money wastage.

var model

Model, name and instructions uniquely identify a bot.

var name

Model, name and instructions uniquely identify a bot.

var persist_bots

Keep bots or attempt to delete them at end of session

var persist_threads

Keep thread or attempt to delete them

var toolkit

Reference to toolkit so that goal checkers can check if any tools were used.

Methods

async def basic_tool_loop(self, the_ask: str, root_folder: str, tool_names: list[typing.Any], keep_going_prompt: collections.abc.Callable[[ToolKit], collections.abc.Awaitable[str]], stop_on_no_tool_use: bool) ‑> int

Loop through tool requests.

Args

the_ask : str
The initial request.
root_folder : str
The root folder for file operations.
tool_names : list[Any]
The tools to use.
keep_going_prompt : str
The prompt to use to keep going.
stop_on_no_tool_use : bool
Stop if no tools are used.

Returns

None

Expand source code
async def basic_tool_loop(
    self,
    the_ask: str,
    root_folder: str,
    tool_names: list[Any],
    keep_going_prompt: Callable[[ToolKit], Awaitable[str]],
    stop_on_no_tool_use: bool,
) -> int:
    """
    Loop through tool requests.

    Args:
        the_ask (str): The initial request.
        root_folder (str): The root folder for file operations.
        tool_names (list[Any]): The tools to use.
        keep_going_prompt (str): The prompt to use to keep going.
        stop_on_no_tool_use (bool): Stop if no tools are used.

    Returns:
        None
    """
    if not self.assistant:
        raise TypeError("Missing assistant before basic_tool_loop")

    if self.dialog_logger_md:
        self.dialog_logger_md.add_user(the_ask)
        self.dialog_logger_md.add_toolkit(tool_names)

    tool_loops = 0
    total_tool_use_count = 0
    try:
        if self.allow_self_certification:
            tool_names.append("report_text")
        tool_names = list(set(tool_names))
        _, tool_schemas = self.toolkit_factory(root_folder, self.model, tool_names)
        if not self.toolkit:
            raise TypeError("Missing toolkit before basic_tool_loop")
        thread = await self.client.beta.threads.create()
        logger.info(the_ask)
        _message = await self.client.beta.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=the_ask,
        )
        run = await self.client.beta.threads.runs.create(
            thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas
        )
        tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md)
        tool_loops += 1
        if tool_loops > self.maximum_loops:
            raise TypeError("Too many tool loops")

        total_tool_use_count += tools_used_this_round

        messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc")
        initial_bot_response = parse_message(messages)
        self.dialog_logger_md.add_bot(initial_bot_response)

        # Did you use any tools? (maybe move this to goal function)
        # if tools_used_this_round == 0:
        #     initial_user_response = (
        #         "I see you didn't use any tools.  "
        #         "Please list what tools you have available, and if there are some available, "
        #         "why they were not useful."
        #     )
        # else:
        if not self.toolkit:
            raise TypeError("Missing toolkit before keep_going_prompt")
        initial_user_response = await keep_going_prompt(self.toolkit)

        # TODO: make into method.
        await self.client.beta.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=initial_user_response,
        )
        run = await self.client.beta.threads.runs.create(thread_id=thread.id, assistant_id=self.assistant.id)

        self.dialog_logger_md.add_user(initial_user_response)

        # "keep going/done" loop
        done = "NOPE"
        tools_used_this_round = -1

        # TODO: initialize this in constructor
        if (
            self.allow_self_certification
            and hasattr(self.toolkit, "tool_answer_collector")
            and self.toolkit.tool_answer_collector
        ):
            final_report = self.toolkit.tool_answer_collector.text_answer
            final_comment = self.toolkit.tool_answer_collector.comment
            self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}")
            return total_tool_use_count

        # Bot has at least 3 ways to stop
        # - return message of DONE
        # - use answer tool to submit DONE, or IMPOSSIBLE
        # - stop using tools
        if tools_used_this_round == 0 and stop_on_no_tool_use:
            logger.info("No tools used this round, conversation will end.")
        while done != self.conversation_over_marker or (tools_used_this_round == 0 and stop_on_no_tool_use):
            tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md)
            # Did we use any tools
            total_tool_use_count += tools_used_this_round

            # infinite loop protection
            tool_loops += 1
            if tool_loops > self.maximum_loops:
                raise TypeError("Too many tool loops")

            messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc")
            successive_response = parse_message(messages)
            self.dialog_logger_md.add_bot(successive_response)
            if self.allow_self_certification:
                # TODO: move this to goal checker?
                done = capture_done_message(messages, self.conversation_over_marker)
                if done == self.conversation_over_marker:
                    break

            # Did bot use tool to submit final report. Wow. Can't trust all bots.
            if (
                self.allow_self_certification
                and hasattr(self.toolkit, "tool_answer_collector")
                and self.toolkit.tool_answer_collector
            ):
                final_report = str(self.toolkit.tool_answer_collector.text_answer).upper().strip()
                final_comment = self.toolkit.tool_answer_collector.comment
                self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}")
                break

            if done != self.conversation_over_marker:
                # Replace with 2nd bot?
                keep_going_text = await keep_going_prompt(self.toolkit)
                # This is *not* self certification
                if keep_going_text == self.conversation_over_marker:
                    # The bot did a good job and we can certify that.
                    break
                self.dialog_logger_md.add_user(keep_going_text)
                logger.info(keep_going_text)
                await self.client.beta.threads.messages.create(
                    thread_id=thread.id,
                    role="user",
                    content=keep_going_text,
                )
                run = await self.client.beta.threads.runs.create(
                    thread_id=thread.id, assistant_id=self.assistant.id
                )

    except Exception as exception:
        self.dialog_logger_md.add_error(exception)
        raise
    finally:
        # clean up thread
        if self.thread and not self.persist_threads:
            await self.client.beta.threads.delete(self.thread.id)
        if self.assistant and not self.persist_bots:
            # clean up assistant
            await self.client.beta.assistants.delete(self.assistant.id)
    return total_tool_use_count
async def create_bot(self)

Create a bot and store it in the config.

Expand source code
async def create_bot(self):
    """Create a bot and store it in the config."""
    self.assistant = await self.client.beta.assistants.create(
        name=self.name,
        instructions=self.bot_instructions,
        model=self.model,
    )
    self.config.add_bot(self.assistant.id, self.name)
    logger.debug(f"Assistant created: {self.assistant.id}")
async def get_create_bot(self) ‑> openai.types.beta.assistant.Assistant

Get or create a bot and store it in the config.

Expand source code
async def get_create_bot(self) -> Assistant:
    """Get or create a bot and store it in the config."""
    current_bot = self.config.get_bot(self.name)
    if not current_bot:
        await self.create_bot()
    else:
        try:
            self.assistant = await self.client.beta.assistants.retrieve(current_bot.assistant_id)
            logger.debug(f"Assistant retrieved: {self.assistant.id}")
        except openai.NotFoundError:
            await self.create_bot()
    if not self.assistant:
        raise TypeError("Assistant not found or created.")
    logger.debug(f"Assistant id: {self.assistant.id}")
    return self.assistant
async def initialize(self) ‑> None

Get or create a bot and store it in the config.

Expand source code
async def initialize(self) -> None:
    """Get or create a bot and store it in the config."""
    bot = await self.get_create_bot()
    logger.debug(f"Assistant id: {bot.id}")
    self.assistant = bot
    self.dialog_logger_md.write_header(bot_name=self.name, model=self.model, bot_instructions=self.bot_instructions)
async def one_shot_ask(self, the_ask: str) ‑> Any

Free-form request, structured response.

Args

the_ask : str
The request.

Returns

Any
The response.
Expand source code
async def one_shot_ask(self, the_ask: str) -> Any:
    """Free-form request, structured response.

    Args:
        the_ask (str): The request.

    Returns:
        Any: The response.
    """
    if not self.toolkit:
        raise TypeError("Missing toolkit before one_shot_ask")
    if not self.assistant:
        raise TypeError("Missing assistant before one_shot_ask")
    try:
        _, tool_schemas = self.toolkit_factory(
            ".",
            self.model,
            [
                # "report_bool",
                "report_dict",
                "report_float",
                "report_int",
                # "report_json",
                "report_list",
                # "report_set",
                # "report_text", Why? Just do an unstructured query.
                # "report_toml",
                # "report_tuple",
                # "report_xml",
            ],
        )
        thread = await self.client.beta.threads.create()
        logger.info(the_ask)
        _message = await self.client.beta.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=the_ask,
        )
        # pydantic_tools =[run_create_params.Tool(_) for _ in tool_schemas]
        run = await self.client.beta.threads.runs.create(
            thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas
        )
        tool_use_count = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md)
        if tool_use_count == 0:
            logger.warning("No tool usage, something went wrong.")

        messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc")
        # logger.info(messages)
        last_words = parse_message(messages)
        self.dialog_logger_md.add_bot(last_words)

    except Exception as exception:
        self.dialog_logger_md.add_error(exception)
        raise
    finally:
        # clean up thread
        if self.thread and not self.persist_threads:
            await self.client.beta.threads.delete(self.thread.id)
        if self.assistant and not self.persist_bots:
            # clean up assistant
            await self.client.beta.assistants.delete(self.assistant.id)
def toolkit_factory(self, root_folder: str, model: str, tool_names: list[str]) ‑> tuple[ToolKit, list[openai.types.beta.threads.run_create_params.ToolAssistantToolsCode | openai.types.beta.threads.run_create_params.ToolAssistantToolsRetrieval | openai.types.beta.threads.run_create_params.ToolAssistantToolsFunction]]
Expand source code
def toolkit_factory(
    self, root_folder: str, model: str, tool_names: list[str]
) -> tuple[ToolKit, list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction]]:
    self.toolkit = ToolKit(root_folder, model, 500, permitted_tools=tool_names, config=self.config)
    # sync COM
    self.toolkit.conversation_over_marker = self.conversation_over_marker
    initialize_all_tools(keeps=tool_names)
    tools_schema: list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction] = [
        ToolAssistantToolsFunction(**{"function": cast(FunctionDefinition, schema), "type": "function"})
        for schema in ALL_TOOLS
    ]
    if not tools_schema:
        raise Exception("Not enough tools!")
    return self.toolkit, tools_schema
class TodoTool (root_folder: str, config: Config)

Keep track of tasks.

Initialize the TodoTool with a root folder.

Args

root_folder : str
The root folder for valid files.
config : Config
The developer input that bot shouldn't set.
Expand source code
class TodoTool:
    """Keep track of tasks."""

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the TodoTool with a root folder.

        Args:
            root_folder (str): The root folder for valid files.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder: str = root_folder
        self.config = config
        self.roles = config.get_list("todo_roles")
        self.task_manager = ai_todo.TaskManager(self.root_folder, self.roles)

    @log()
    def add_todo(
        self, title: str, description: str, category: str, source_code_ref: str, assignee: Optional[str] = None
    ) -> str:
        """
        Adds a new task to the task manager.

        Args:
            title (str): The title of the task.
            description (str): A description of the task.
            category (str): The category of the task (e.g., 'bug', 'feature').
            source_code_ref (str): Reference to the source code related to the task.
            assignee (str, optional): The name of the assignee. Defaults to None.

        Returns:
            str: A confirmation message indicating successful addition of the task.
        """
        self.task_manager.add_task(title, description, category, source_code_ref, assignee)
        summary = self.task_manager.get_stats()
        return f"Successful added task {title}\n{summary}"

    @log()
    def remove_todo(self, title: str) -> str:
        """
        Marks a task as finished based on its title.

        Args:
            title (str): The title of the task to be marked as finished.

        Returns:
            str: A confirmation message indicating the task was successfully marked as finished.
        """
        self.task_manager.finish_task(title)
        summary = self.task_manager.get_stats()
        return f"Successful removed task {title}\n{summary}"

    @log()
    def query_todos_by_regex(self, regex_pattern: str = r"[\s\S]+") -> str:
        r"""
        Queries tasks by a keyword in their title, using a regular expression pattern.

        Args:
            regex_pattern (str, optional): The regular expression pattern to match in task titles.
                                           Defaults to "[\s\S]+", which matches any title.

        Returns:
            str: The rendered Markdown string of tasks matching the given pattern.
        """
        return self.task_manager.query_by_title_keyword(regex_pattern)

    @log()
    def query_todos_by_assignee(self, assignee_name: str) -> str:
        """
        Queries tasks assigned to a specific assignee. Currently, the assignee is hard-coded as 'Developer'.

        Args:
            assignee_name (str): The name of the assignee to query tasks for.

        Returns:
            str: The rendered Markdown string of tasks assigned to the specified assignee.
        """
        return self.task_manager.query_by_assignee(assignee_name)

    @log()
    def list_valid_assignees(
        self,
    ) -> list[str]:
        """
        Lists the valid assignees for tasks.

        Returns:
            list[str]: The rendered Markdown string of valid assignees.
        """
        return self.task_manager.valid_assignees

Methods

def add_todo(self, title: str, description: str, category: str, source_code_ref: str, assignee: Optional[str] = None) ‑> str

Adds a new task to the task manager.

Args

title : str
The title of the task.
description : str
A description of the task.
category : str
The category of the task (e.g., 'bug', 'feature').
source_code_ref : str
Reference to the source code related to the task.
assignee : str, optional
The name of the assignee. Defaults to None.

Returns

str
A confirmation message indicating successful addition of the task.
Expand source code
@log()
def add_todo(
    self, title: str, description: str, category: str, source_code_ref: str, assignee: Optional[str] = None
) -> str:
    """
    Adds a new task to the task manager.

    Args:
        title (str): The title of the task.
        description (str): A description of the task.
        category (str): The category of the task (e.g., 'bug', 'feature').
        source_code_ref (str): Reference to the source code related to the task.
        assignee (str, optional): The name of the assignee. Defaults to None.

    Returns:
        str: A confirmation message indicating successful addition of the task.
    """
    self.task_manager.add_task(title, description, category, source_code_ref, assignee)
    summary = self.task_manager.get_stats()
    return f"Successful added task {title}\n{summary}"
def list_valid_assignees(self) ‑> list[str]

Lists the valid assignees for tasks.

Returns

list[str]
The rendered Markdown string of valid assignees.
Expand source code
@log()
def list_valid_assignees(
    self,
) -> list[str]:
    """
    Lists the valid assignees for tasks.

    Returns:
        list[str]: The rendered Markdown string of valid assignees.
    """
    return self.task_manager.valid_assignees
def query_todos_by_assignee(self, assignee_name: str) ‑> str

Queries tasks assigned to a specific assignee. Currently, the assignee is hard-coded as 'Developer'.

Args

assignee_name : str
The name of the assignee to query tasks for.

Returns

str
The rendered Markdown string of tasks assigned to the specified assignee.
Expand source code
@log()
def query_todos_by_assignee(self, assignee_name: str) -> str:
    """
    Queries tasks assigned to a specific assignee. Currently, the assignee is hard-coded as 'Developer'.

    Args:
        assignee_name (str): The name of the assignee to query tasks for.

    Returns:
        str: The rendered Markdown string of tasks assigned to the specified assignee.
    """
    return self.task_manager.query_by_assignee(assignee_name)
def query_todos_by_regex(self, regex_pattern: str = '[\\s\\S]+') ‑> str

Queries tasks by a keyword in their title, using a regular expression pattern.

Args

regex_pattern : str, optional
The regular expression pattern to match in task titles. Defaults to "[\s\S]+", which matches any title.

Returns

str
The rendered Markdown string of tasks matching the given pattern.
Expand source code
@log()
def query_todos_by_regex(self, regex_pattern: str = r"[\s\S]+") -> str:
    r"""
    Queries tasks by a keyword in their title, using a regular expression pattern.

    Args:
        regex_pattern (str, optional): The regular expression pattern to match in task titles.
                                       Defaults to "[\s\S]+", which matches any title.

    Returns:
        str: The rendered Markdown string of tasks matching the given pattern.
    """
    return self.task_manager.query_by_title_keyword(regex_pattern)
def remove_todo(self, title: str) ‑> str

Marks a task as finished based on its title.

Args

title : str
The title of the task to be marked as finished.

Returns

str
A confirmation message indicating the task was successfully marked as finished.
Expand source code
@log()
def remove_todo(self, title: str) -> str:
    """
    Marks a task as finished based on its title.

    Args:
        title (str): The title of the task to be marked as finished.

    Returns:
        str: A confirmation message indicating the task was successfully marked as finished.
    """
    self.task_manager.finish_task(title)
    summary = self.task_manager.get_stats()
    return f"Successful removed task {title}\n{summary}"
class TokenCounterTool (root_folder: str, config: Config)

Count the number of tokens in a string.

Initialize the FindTool class.

Args

root_folder : str
The root folder path for file operations.
config : Config
The developer input that bot shouldn't set.
Expand source code
class TokenCounterTool:
    """Count the number of tokens in a string."""

    def __init__(self, root_folder: str, config: Config) -> None:
        """
        Initialize the FindTool class.

        Args:
            root_folder (str): The root folder path for file operations.
            config (Config): The developer input that bot shouldn't set.
        """
        self.root_folder = root_folder
        self.config = config
        model = config.get_value("token_model")
        if not model:
            raise ValueError("token_model must be set in the config")
        self.token_model = model

    def count_tokens(self, text: str) -> int:
        """Count the number of tokens in a string.

        Args:
            text (str): The text to count the tokens in.

        Returns:
            int: The number of tokens.
        """
        if not text:
            return 0
        # gpt3 turbo - cl100k_base
        # gpt2 (or r50k_base)   Most GPT-3 models
        # p50k_base     Code models, text-davinci-002, text-davinci-003
        # cl100k_base   text-embedding-ada-002
        # enc = tiktoken.get_encoding("cl100k_base")

        encoding = tiktoken.encoding_for_model(self.token_model)
        tokens = encoding.encode(text)
        token_count = len(tokens)
        return token_count

Methods

def count_tokens(self, text: str) ‑> int

Count the number of tokens in a string.

Args

text : str
The text to count the tokens in.

Returns

int
The number of tokens.
Expand source code
def count_tokens(self, text: str) -> int:
    """Count the number of tokens in a string.

    Args:
        text (str): The text to count the tokens in.

    Returns:
        int: The number of tokens.
    """
    if not text:
        return 0
    # gpt3 turbo - cl100k_base
    # gpt2 (or r50k_base)   Most GPT-3 models
    # p50k_base     Code models, text-davinci-002, text-davinci-003
    # cl100k_base   text-embedding-ada-002
    # enc = tiktoken.get_encoding("cl100k_base")

    encoding = tiktoken.encoding_for_model(self.token_model)
    tokens = encoding.encode(text)
    token_count = len(tokens)
    return token_count
class ToolKit (root_folder: str, token_model: str, global_max_lines: int, permitted_tools: list[str], config: Config)

AI Shell Toolkit

Initialize the ToolKitBase class.

Args

root_folder : str
The root folder path for file operations.
token_model : str
The token model to use for the toolkit
global_max_lines : int
The global max lines to use for the toolkit
permitted_tools : list[str]
The permitted tools for the toolkit
config : Config
The developer input that bot shouldn't set.
Expand source code
class ToolKit(ToolKitBase):
    """AI Shell Toolkit"""

    def __init__(
        self, root_folder: str, token_model: str, global_max_lines: int, permitted_tools: list[str], config: Config
    ) -> None:
        super().__init__(root_folder, token_model, global_max_lines, permitted_tools, config)
        self._lookup: dict[str, Callable[[dict[str, Any]], Any]] = {
            "report_bool": self.report_bool,
            "report_dict": self.report_dict,
            "report_float": self.report_float,
            "report_int": self.report_int,
            "report_json": self.report_json,
            "report_list": self.report_list,
            "report_set": self.report_set,
            "report_text": self.report_text,
            "report_toml": self.report_toml,
            "report_tuple": self.report_tuple,
            "report_xml": self.report_xml,
            "cat": self.cat,
            "cat_markdown": self.cat_markdown,
            "cut_characters": self.cut_characters,
            "cut_fields": self.cut_fields,
            "cut_fields_by_name": self.cut_fields_by_name,
            "ed": self.ed,
            "edlin": self.edlin,
            "find_files": self.find_files,
            "find_files_markdown": self.find_files_markdown,
            "get_current_branch": self.get_current_branch,
            "get_recent_commits": self.get_recent_commits,
            "git_diff": self.git_diff,
            "git_diff_commit": self.git_diff_commit,
            "git_log_file": self.git_log_file,
            "git_log_search": self.git_log_search,
            "git_show": self.git_show,
            "git_status": self.git_status,
            "is_ignored_by_gitignore": self.is_ignored_by_gitignore,
            "grep": self.grep,
            "grep_markdown": self.grep_markdown,
            "head": self.head,
            "head_markdown": self.head_markdown,
            "head_tail": self.head_tail,
            "tail": self.tail,
            "tail_markdown": self.tail_markdown,
            "insert_text_after_context": self.insert_text_after_context,
            "insert_text_after_multiline_context": self.insert_text_after_multiline_context,
            "insert_text_at_start_or_end": self.insert_text_at_start_or_end,
            "ls": self.ls,
            "ls_markdown": self.ls_markdown,
            "apply_git_patch": self.apply_git_patch,
            "format_code_as_markdown": self.format_code_as_markdown,
            "pytest": self.pytest,
            "replace_all": self.replace_all,
            "replace_line_by_line": self.replace_line_by_line,
            "replace_with_regex": self.replace_with_regex,
            "rewrite_file": self.rewrite_file,
            "write_new_file": self.write_new_file,
            "sed": self.sed,
            "add_todo": self.add_todo,
            "list_valid_assignees": self.list_valid_assignees,
            "query_todos_by_assignee": self.query_todos_by_assignee,
            "query_todos_by_regex": self.query_todos_by_regex,
            "remove_todo": self.remove_todo,
            "count_tokens": self.count_tokens,
        }
        # Stateful tool support. Useless assignment to make mypy happy
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    def report_bool(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            bool,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_bool(answer=answer, comment=comment)

    def report_dict(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            Any,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_dict(answer=answer, comment=comment)

    def report_float(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            float,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_float(answer=answer, comment=comment)

    def report_int(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            int,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_int(answer=answer, comment=comment)

    def report_json(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            str,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_json(answer=answer, comment=comment)

    def report_list(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            str,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_list(answer=answer, comment=comment)

    def report_set(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            list[Any],
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_set(answer=answer, comment=comment)

    def report_text(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            str,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_text(answer=answer, comment=comment)

    def report_toml(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            str,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_toml(answer=answer, comment=comment)

    def report_tuple(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            list[Any],
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_tuple(answer=answer, comment=comment)

    def report_xml(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

        answer = cast(
            str,
            arguments.get(
                "answer",
            ),
        )
        comment = cast(
            str,
            arguments.get(
                "comment",
            ),
        )
        return self.tool_answer_collector.report_xml(answer=answer, comment=comment)

    def cat(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = CatTool(self.root_folder, self.config)

        file_paths = cast(
            str,
            arguments.get(
                "file_paths",
            ),
        )
        number_lines = cast(bool, arguments.get("number_lines", True))
        squeeze_blank = cast(bool, arguments.get("squeeze_blank", False))
        return tool.cat(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank)

    def cat_markdown(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = CatTool(self.root_folder, self.config)

        file_paths = cast(
            str,
            arguments.get(
                "file_paths",
            ),
        )
        number_lines = cast(bool, arguments.get("number_lines", True))
        squeeze_blank = cast(bool, arguments.get("squeeze_blank", False))
        return tool.cat_markdown(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank)

    def cut_characters(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = CutTool(self.root_folder, self.config)

        character_ranges = cast(
            str,
            arguments.get(
                "character_ranges",
            ),
        )
        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        return tool.cut_characters(character_ranges=character_ranges, file_path=file_path)

    def cut_fields(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = CutTool(self.root_folder, self.config)

        delimiter = cast(str, arguments.get("delimiter", ","))
        field_ranges = cast(
            str,
            arguments.get(
                "field_ranges",
            ),
        )
        filename = cast(
            str,
            arguments.get(
                "filename",
            ),
        )
        return tool.cut_fields(delimiter=delimiter, field_ranges=field_ranges, filename=filename)

    def cut_fields_by_name(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = CutTool(self.root_folder, self.config)

        delimiter = cast(str, arguments.get("delimiter", ","))
        field_names = cast(
            str,
            arguments.get(
                "field_names",
            ),
        )
        filename = cast(
            str,
            arguments.get(
                "filename",
            ),
        )
        return tool.cut_fields_by_name(delimiter=delimiter, field_names=field_names, filename=filename)

    def ed(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = EdTool(self.root_folder, self.config)

        file_name = cast(
            str,
            arguments.get(
                "file_name",
            ),
        )
        script = cast(
            str,
            arguments.get(
                "script",
            ),
        )
        return tool.ed(file_name=file_name, script=script)

    def edlin(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = EdlinTool(self.root_folder, self.config)

        file_name = cast(
            str,
            arguments.get(
                "file_name",
            ),
        )
        script = cast(
            str,
            arguments.get(
                "script",
            ),
        )
        return tool.edlin(file_name=file_name, script=script)

    def find_files(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = FindTool(self.root_folder, self.config)

        file_type = cast(
            Optional[str],
            arguments.get(
                "file_type",
            ),
        )
        name = cast(
            Optional[str],
            arguments.get(
                "name",
            ),
        )
        regex = cast(
            Optional[str],
            arguments.get(
                "regex",
            ),
        )
        size = cast(
            Optional[str],
            arguments.get(
                "size",
            ),
        )
        return tool.find_files(file_type=file_type, name=name, regex=regex, size=size)

    def find_files_markdown(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = FindTool(self.root_folder, self.config)

        file_type = cast(
            Optional[str],
            arguments.get(
                "file_type",
            ),
        )
        name = cast(
            Optional[str],
            arguments.get(
                "name",
            ),
        )
        regex = cast(
            Optional[str],
            arguments.get(
                "regex",
            ),
        )
        size = cast(
            Optional[str],
            arguments.get(
                "size",
            ),
        )
        return tool.find_files_markdown(file_type=file_type, name=name, regex=regex, size=size)

    def get_current_branch(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        return tool.get_current_branch()

    def get_recent_commits(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        n = cast(int, arguments.get("n", 10))
        short_hash = cast(bool, arguments.get("short_hash", False))
        return tool.get_recent_commits(n=n, short_hash=short_hash)

    def git_diff(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        return tool.git_diff()

    def git_diff_commit(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        commit1 = cast(
            str,
            arguments.get(
                "commit1",
            ),
        )
        commit2 = cast(
            str,
            arguments.get(
                "commit2",
            ),
        )
        return tool.git_diff_commit(commit1=commit1, commit2=commit2)

    def git_log_file(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        filename = cast(
            str,
            arguments.get(
                "filename",
            ),
        )
        return tool.git_log_file(filename=filename)

    def git_log_search(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        search_string = cast(
            str,
            arguments.get(
                "search_string",
            ),
        )
        return tool.git_log_search(search_string=search_string)

    def git_show(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        return tool.git_show()

    def git_status(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        return tool.git_status()

    def is_ignored_by_gitignore(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GitTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        gitignore_path = cast(str, arguments.get("gitignore_path", ".gitignore"))
        return tool.is_ignored_by_gitignore(file_path=file_path, gitignore_path=gitignore_path)

    def grep(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GrepTool(self.root_folder, self.config)

        glob_pattern = cast(
            str,
            arguments.get(
                "glob_pattern",
            ),
        )
        maximum_matches_per_file = cast(int, arguments.get("maximum_matches_per_file", -1))
        maximum_matches_total = cast(int, arguments.get("maximum_matches_total", -1))
        regex = cast(
            str,
            arguments.get(
                "regex",
            ),
        )
        skip_first_matches = cast(int, arguments.get("skip_first_matches", -1))
        return tool.grep(
            glob_pattern=glob_pattern,
            maximum_matches_per_file=maximum_matches_per_file,
            maximum_matches_total=maximum_matches_total,
            regex=regex,
            skip_first_matches=skip_first_matches,
        )

    def grep_markdown(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = GrepTool(self.root_folder, self.config)

        glob_pattern = cast(
            str,
            arguments.get(
                "glob_pattern",
            ),
        )
        maximum_matches = cast(int, arguments.get("maximum_matches", -1))
        regex = cast(
            str,
            arguments.get(
                "regex",
            ),
        )
        skip_first_matches = cast(int, arguments.get("skip_first_matches", -1))
        return tool.grep_markdown(
            glob_pattern=glob_pattern,
            maximum_matches=maximum_matches,
            regex=regex,
            skip_first_matches=skip_first_matches,
        )

    def head(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = HeadTailTool(self.root_folder, self.config)

        byte_count = cast(
            Optional[int],
            arguments.get(
                "byte_count",
            ),
        )
        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        lines = cast(int, arguments.get("lines", 10))
        return tool.head(byte_count=byte_count, file_path=file_path, lines=lines)

    def head_markdown(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = HeadTailTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        lines = cast(int, arguments.get("lines", 10))
        return tool.head_markdown(file_path=file_path, lines=lines)

    def head_tail(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = HeadTailTool(self.root_folder, self.config)

        byte_count = cast(
            Optional[int],
            arguments.get(
                "byte_count",
            ),
        )
        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        lines = cast(int, arguments.get("lines", 10))
        mode = cast(str, arguments.get("mode", "head"))
        return tool.head_tail(byte_count=byte_count, file_path=file_path, lines=lines, mode=mode)

    def tail(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = HeadTailTool(self.root_folder, self.config)

        byte_count = cast(
            Optional[int],
            arguments.get(
                "byte_count",
            ),
        )
        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        lines = cast(int, arguments.get("lines", 10))
        return tool.tail(byte_count=byte_count, file_path=file_path, lines=lines)

    def tail_markdown(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = HeadTailTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        lines = cast(int, arguments.get("lines", 10))
        return tool.tail_markdown(file_path=file_path, lines=lines)

    def insert_text_after_context(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = InsertTool(self.root_folder, self.config)

        context = cast(
            str,
            arguments.get(
                "context",
            ),
        )
        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        text_to_insert = cast(
            str,
            arguments.get(
                "text_to_insert",
            ),
        )
        return tool.insert_text_after_context(context=context, file_path=file_path, text_to_insert=text_to_insert)

    def insert_text_after_multiline_context(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = InsertTool(self.root_folder, self.config)

        context_lines = cast(
            str,
            arguments.get(
                "context_lines",
            ),
        )
        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        text_to_insert = cast(
            str,
            arguments.get(
                "text_to_insert",
            ),
        )
        return tool.insert_text_after_multiline_context(
            context_lines=context_lines, file_path=file_path, text_to_insert=text_to_insert
        )

    def insert_text_at_start_or_end(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = InsertTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        position = cast(str, arguments.get("position", "end"))
        text_to_insert = cast(
            str,
            arguments.get(
                "text_to_insert",
            ),
        )
        return tool.insert_text_at_start_or_end(file_path=file_path, position=position, text_to_insert=text_to_insert)

    def ls(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = LsTool(self.root_folder, self.config)

        all_files = cast(bool, arguments.get("all_files", False))
        long = cast(bool, arguments.get("long", False))
        path = cast(
            Optional[str],
            arguments.get(
                "path",
            ),
        )
        return tool.ls(all_files=all_files, long=long, path=path)

    def ls_markdown(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = LsTool(self.root_folder, self.config)

        all_files = cast(bool, arguments.get("all_files", False))
        long = cast(bool, arguments.get("long", False))
        path = cast(Optional[str], arguments.get("path", "."))
        return tool.ls_markdown(all_files=all_files, long=long, path=path)

    def apply_git_patch(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = PatchTool(self.root_folder, self.config)

        patch_content = cast(
            str,
            arguments.get(
                "patch_content",
            ),
        )
        return tool.apply_git_patch(patch_content=patch_content)

    def format_code_as_markdown(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = PyCatTool(self.root_folder, self.config)

        base_path = cast(
            str,
            arguments.get(
                "base_path",
            ),
        )
        header = cast(
            str,
            arguments.get(
                "header",
            ),
        )
        no_comments = cast(bool, arguments.get("no_comments", False))
        no_docs = cast(bool, arguments.get("no_docs", False))
        return tool.format_code_as_markdown(
            base_path=base_path, header=header, no_comments=no_comments, no_docs=no_docs
        )

    def pytest(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = PytestTool(self.root_folder, self.config)

        return tool.pytest()

    def replace_all(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = ReplaceTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        new_text = cast(
            str,
            arguments.get(
                "new_text",
            ),
        )
        old_text = cast(
            str,
            arguments.get(
                "old_text",
            ),
        )
        return tool.replace_all(file_path=file_path, new_text=new_text, old_text=old_text)

    def replace_line_by_line(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = ReplaceTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        line_end = cast(int, arguments.get("line_end", -1))
        line_start = cast(int, arguments.get("line_start", 0))
        new_text = cast(
            str,
            arguments.get(
                "new_text",
            ),
        )
        old_text = cast(
            str,
            arguments.get(
                "old_text",
            ),
        )
        return tool.replace_line_by_line(
            file_path=file_path, line_end=line_end, line_start=line_start, new_text=new_text, old_text=old_text
        )

    def replace_with_regex(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = ReplaceTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        regex_match_expression = cast(
            str,
            arguments.get(
                "regex_match_expression",
            ),
        )
        replacement = cast(
            str,
            arguments.get(
                "replacement",
            ),
        )
        return tool.replace_with_regex(
            file_path=file_path, regex_match_expression=regex_match_expression, replacement=replacement
        )

    def rewrite_file(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = RewriteTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        text = cast(
            str,
            arguments.get(
                "text",
            ),
        )
        return tool.rewrite_file(file_path=file_path, text=text)

    def write_new_file(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = RewriteTool(self.root_folder, self.config)

        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        text = cast(
            str,
            arguments.get(
                "text",
            ),
        )
        return tool.write_new_file(file_path=file_path, text=text)

    def sed(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = SedTool(self.root_folder, self.config)

        commands = cast(
            str,
            arguments.get(
                "commands",
            ),
        )
        file_path = cast(
            str,
            arguments.get(
                "file_path",
            ),
        )
        return tool.sed(commands=commands, file_path=file_path)

    def add_todo(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = TodoTool(self.root_folder, self.config)

        assignee = cast(
            Optional[str],
            arguments.get(
                "assignee",
            ),
        )
        category = cast(
            str,
            arguments.get(
                "category",
            ),
        )
        description = cast(
            str,
            arguments.get(
                "description",
            ),
        )
        source_code_ref = cast(
            str,
            arguments.get(
                "source_code_ref",
            ),
        )
        title = cast(
            str,
            arguments.get(
                "title",
            ),
        )
        return tool.add_todo(
            assignee=assignee, category=category, description=description, source_code_ref=source_code_ref, title=title
        )

    def list_valid_assignees(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = TodoTool(self.root_folder, self.config)

        return tool.list_valid_assignees()

    def query_todos_by_assignee(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = TodoTool(self.root_folder, self.config)

        assignee_name = cast(
            str,
            arguments.get(
                "assignee_name",
            ),
        )
        return tool.query_todos_by_assignee(assignee_name=assignee_name)

    def query_todos_by_regex(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = TodoTool(self.root_folder, self.config)

        regex_pattern = cast(str, arguments.get("regex_pattern", r"[\s\S]+"))
        return tool.query_todos_by_regex(regex_pattern=regex_pattern)

    def remove_todo(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = TodoTool(self.root_folder, self.config)

        title = cast(
            str,
            arguments.get(
                "title",
            ),
        )
        return tool.remove_todo(title=title)

    def count_tokens(self, arguments: dict[str, Any]) -> Any:
        """Generated Do Not Edit"""
        tool = TokenCounterTool(self.root_folder, self.config)

        text = cast(
            str,
            arguments.get(
                "text",
            ),
        )
        return tool.count_tokens(text=text)

Ancestors

Methods

def add_todo(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def add_todo(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = TodoTool(self.root_folder, self.config)

    assignee = cast(
        Optional[str],
        arguments.get(
            "assignee",
        ),
    )
    category = cast(
        str,
        arguments.get(
            "category",
        ),
    )
    description = cast(
        str,
        arguments.get(
            "description",
        ),
    )
    source_code_ref = cast(
        str,
        arguments.get(
            "source_code_ref",
        ),
    )
    title = cast(
        str,
        arguments.get(
            "title",
        ),
    )
    return tool.add_todo(
        assignee=assignee, category=category, description=description, source_code_ref=source_code_ref, title=title
    )
def apply_git_patch(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def apply_git_patch(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = PatchTool(self.root_folder, self.config)

    patch_content = cast(
        str,
        arguments.get(
            "patch_content",
        ),
    )
    return tool.apply_git_patch(patch_content=patch_content)
def cat(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def cat(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = CatTool(self.root_folder, self.config)

    file_paths = cast(
        str,
        arguments.get(
            "file_paths",
        ),
    )
    number_lines = cast(bool, arguments.get("number_lines", True))
    squeeze_blank = cast(bool, arguments.get("squeeze_blank", False))
    return tool.cat(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank)
def cat_markdown(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def cat_markdown(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = CatTool(self.root_folder, self.config)

    file_paths = cast(
        str,
        arguments.get(
            "file_paths",
        ),
    )
    number_lines = cast(bool, arguments.get("number_lines", True))
    squeeze_blank = cast(bool, arguments.get("squeeze_blank", False))
    return tool.cat_markdown(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank)
def count_tokens(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def count_tokens(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = TokenCounterTool(self.root_folder, self.config)

    text = cast(
        str,
        arguments.get(
            "text",
        ),
    )
    return tool.count_tokens(text=text)
def cut_characters(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def cut_characters(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = CutTool(self.root_folder, self.config)

    character_ranges = cast(
        str,
        arguments.get(
            "character_ranges",
        ),
    )
    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    return tool.cut_characters(character_ranges=character_ranges, file_path=file_path)
def cut_fields(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def cut_fields(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = CutTool(self.root_folder, self.config)

    delimiter = cast(str, arguments.get("delimiter", ","))
    field_ranges = cast(
        str,
        arguments.get(
            "field_ranges",
        ),
    )
    filename = cast(
        str,
        arguments.get(
            "filename",
        ),
    )
    return tool.cut_fields(delimiter=delimiter, field_ranges=field_ranges, filename=filename)
def cut_fields_by_name(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def cut_fields_by_name(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = CutTool(self.root_folder, self.config)

    delimiter = cast(str, arguments.get("delimiter", ","))
    field_names = cast(
        str,
        arguments.get(
            "field_names",
        ),
    )
    filename = cast(
        str,
        arguments.get(
            "filename",
        ),
    )
    return tool.cut_fields_by_name(delimiter=delimiter, field_names=field_names, filename=filename)
def ed(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def ed(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = EdTool(self.root_folder, self.config)

    file_name = cast(
        str,
        arguments.get(
            "file_name",
        ),
    )
    script = cast(
        str,
        arguments.get(
            "script",
        ),
    )
    return tool.ed(file_name=file_name, script=script)
def edlin(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def edlin(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = EdlinTool(self.root_folder, self.config)

    file_name = cast(
        str,
        arguments.get(
            "file_name",
        ),
    )
    script = cast(
        str,
        arguments.get(
            "script",
        ),
    )
    return tool.edlin(file_name=file_name, script=script)
def find_files(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def find_files(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = FindTool(self.root_folder, self.config)

    file_type = cast(
        Optional[str],
        arguments.get(
            "file_type",
        ),
    )
    name = cast(
        Optional[str],
        arguments.get(
            "name",
        ),
    )
    regex = cast(
        Optional[str],
        arguments.get(
            "regex",
        ),
    )
    size = cast(
        Optional[str],
        arguments.get(
            "size",
        ),
    )
    return tool.find_files(file_type=file_type, name=name, regex=regex, size=size)
def find_files_markdown(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def find_files_markdown(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = FindTool(self.root_folder, self.config)

    file_type = cast(
        Optional[str],
        arguments.get(
            "file_type",
        ),
    )
    name = cast(
        Optional[str],
        arguments.get(
            "name",
        ),
    )
    regex = cast(
        Optional[str],
        arguments.get(
            "regex",
        ),
    )
    size = cast(
        Optional[str],
        arguments.get(
            "size",
        ),
    )
    return tool.find_files_markdown(file_type=file_type, name=name, regex=regex, size=size)
def format_code_as_markdown(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def format_code_as_markdown(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = PyCatTool(self.root_folder, self.config)

    base_path = cast(
        str,
        arguments.get(
            "base_path",
        ),
    )
    header = cast(
        str,
        arguments.get(
            "header",
        ),
    )
    no_comments = cast(bool, arguments.get("no_comments", False))
    no_docs = cast(bool, arguments.get("no_docs", False))
    return tool.format_code_as_markdown(
        base_path=base_path, header=header, no_comments=no_comments, no_docs=no_docs
    )
def get_current_branch(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def get_current_branch(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    return tool.get_current_branch()
def get_recent_commits(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def get_recent_commits(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    n = cast(int, arguments.get("n", 10))
    short_hash = cast(bool, arguments.get("short_hash", False))
    return tool.get_recent_commits(n=n, short_hash=short_hash)
def git_diff(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def git_diff(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    return tool.git_diff()
def git_diff_commit(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def git_diff_commit(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    commit1 = cast(
        str,
        arguments.get(
            "commit1",
        ),
    )
    commit2 = cast(
        str,
        arguments.get(
            "commit2",
        ),
    )
    return tool.git_diff_commit(commit1=commit1, commit2=commit2)
def git_log_file(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def git_log_file(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    filename = cast(
        str,
        arguments.get(
            "filename",
        ),
    )
    return tool.git_log_file(filename=filename)

Generated Do Not Edit

Expand source code
def git_log_search(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    search_string = cast(
        str,
        arguments.get(
            "search_string",
        ),
    )
    return tool.git_log_search(search_string=search_string)
def git_show(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def git_show(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    return tool.git_show()
def git_status(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def git_status(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    return tool.git_status()
def grep(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def grep(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GrepTool(self.root_folder, self.config)

    glob_pattern = cast(
        str,
        arguments.get(
            "glob_pattern",
        ),
    )
    maximum_matches_per_file = cast(int, arguments.get("maximum_matches_per_file", -1))
    maximum_matches_total = cast(int, arguments.get("maximum_matches_total", -1))
    regex = cast(
        str,
        arguments.get(
            "regex",
        ),
    )
    skip_first_matches = cast(int, arguments.get("skip_first_matches", -1))
    return tool.grep(
        glob_pattern=glob_pattern,
        maximum_matches_per_file=maximum_matches_per_file,
        maximum_matches_total=maximum_matches_total,
        regex=regex,
        skip_first_matches=skip_first_matches,
    )
def grep_markdown(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def grep_markdown(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GrepTool(self.root_folder, self.config)

    glob_pattern = cast(
        str,
        arguments.get(
            "glob_pattern",
        ),
    )
    maximum_matches = cast(int, arguments.get("maximum_matches", -1))
    regex = cast(
        str,
        arguments.get(
            "regex",
        ),
    )
    skip_first_matches = cast(int, arguments.get("skip_first_matches", -1))
    return tool.grep_markdown(
        glob_pattern=glob_pattern,
        maximum_matches=maximum_matches,
        regex=regex,
        skip_first_matches=skip_first_matches,
    )
def head(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def head(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = HeadTailTool(self.root_folder, self.config)

    byte_count = cast(
        Optional[int],
        arguments.get(
            "byte_count",
        ),
    )
    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    lines = cast(int, arguments.get("lines", 10))
    return tool.head(byte_count=byte_count, file_path=file_path, lines=lines)
def head_markdown(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def head_markdown(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = HeadTailTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    lines = cast(int, arguments.get("lines", 10))
    return tool.head_markdown(file_path=file_path, lines=lines)
def head_tail(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def head_tail(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = HeadTailTool(self.root_folder, self.config)

    byte_count = cast(
        Optional[int],
        arguments.get(
            "byte_count",
        ),
    )
    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    lines = cast(int, arguments.get("lines", 10))
    mode = cast(str, arguments.get("mode", "head"))
    return tool.head_tail(byte_count=byte_count, file_path=file_path, lines=lines, mode=mode)
def insert_text_after_context(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def insert_text_after_context(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = InsertTool(self.root_folder, self.config)

    context = cast(
        str,
        arguments.get(
            "context",
        ),
    )
    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    text_to_insert = cast(
        str,
        arguments.get(
            "text_to_insert",
        ),
    )
    return tool.insert_text_after_context(context=context, file_path=file_path, text_to_insert=text_to_insert)
def insert_text_after_multiline_context(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def insert_text_after_multiline_context(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = InsertTool(self.root_folder, self.config)

    context_lines = cast(
        str,
        arguments.get(
            "context_lines",
        ),
    )
    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    text_to_insert = cast(
        str,
        arguments.get(
            "text_to_insert",
        ),
    )
    return tool.insert_text_after_multiline_context(
        context_lines=context_lines, file_path=file_path, text_to_insert=text_to_insert
    )
def insert_text_at_start_or_end(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def insert_text_at_start_or_end(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = InsertTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    position = cast(str, arguments.get("position", "end"))
    text_to_insert = cast(
        str,
        arguments.get(
            "text_to_insert",
        ),
    )
    return tool.insert_text_at_start_or_end(file_path=file_path, position=position, text_to_insert=text_to_insert)
def is_ignored_by_gitignore(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def is_ignored_by_gitignore(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = GitTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    gitignore_path = cast(str, arguments.get("gitignore_path", ".gitignore"))
    return tool.is_ignored_by_gitignore(file_path=file_path, gitignore_path=gitignore_path)
def list_valid_assignees(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def list_valid_assignees(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = TodoTool(self.root_folder, self.config)

    return tool.list_valid_assignees()
def ls(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def ls(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = LsTool(self.root_folder, self.config)

    all_files = cast(bool, arguments.get("all_files", False))
    long = cast(bool, arguments.get("long", False))
    path = cast(
        Optional[str],
        arguments.get(
            "path",
        ),
    )
    return tool.ls(all_files=all_files, long=long, path=path)
def ls_markdown(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def ls_markdown(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = LsTool(self.root_folder, self.config)

    all_files = cast(bool, arguments.get("all_files", False))
    long = cast(bool, arguments.get("long", False))
    path = cast(Optional[str], arguments.get("path", "."))
    return tool.ls_markdown(all_files=all_files, long=long, path=path)
def pytest(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def pytest(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = PytestTool(self.root_folder, self.config)

    return tool.pytest()
def query_todos_by_assignee(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def query_todos_by_assignee(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = TodoTool(self.root_folder, self.config)

    assignee_name = cast(
        str,
        arguments.get(
            "assignee_name",
        ),
    )
    return tool.query_todos_by_assignee(assignee_name=assignee_name)
def query_todos_by_regex(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def query_todos_by_regex(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = TodoTool(self.root_folder, self.config)

    regex_pattern = cast(str, arguments.get("regex_pattern", r"[\s\S]+"))
    return tool.query_todos_by_regex(regex_pattern=regex_pattern)
def remove_todo(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def remove_todo(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = TodoTool(self.root_folder, self.config)

    title = cast(
        str,
        arguments.get(
            "title",
        ),
    )
    return tool.remove_todo(title=title)
def replace_all(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def replace_all(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = ReplaceTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    new_text = cast(
        str,
        arguments.get(
            "new_text",
        ),
    )
    old_text = cast(
        str,
        arguments.get(
            "old_text",
        ),
    )
    return tool.replace_all(file_path=file_path, new_text=new_text, old_text=old_text)
def replace_line_by_line(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def replace_line_by_line(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = ReplaceTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    line_end = cast(int, arguments.get("line_end", -1))
    line_start = cast(int, arguments.get("line_start", 0))
    new_text = cast(
        str,
        arguments.get(
            "new_text",
        ),
    )
    old_text = cast(
        str,
        arguments.get(
            "old_text",
        ),
    )
    return tool.replace_line_by_line(
        file_path=file_path, line_end=line_end, line_start=line_start, new_text=new_text, old_text=old_text
    )
def replace_with_regex(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def replace_with_regex(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = ReplaceTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    regex_match_expression = cast(
        str,
        arguments.get(
            "regex_match_expression",
        ),
    )
    replacement = cast(
        str,
        arguments.get(
            "replacement",
        ),
    )
    return tool.replace_with_regex(
        file_path=file_path, regex_match_expression=regex_match_expression, replacement=replacement
    )
def report_bool(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_bool(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        bool,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_bool(answer=answer, comment=comment)
def report_dict(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_dict(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        Any,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_dict(answer=answer, comment=comment)
def report_float(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_float(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        float,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_float(answer=answer, comment=comment)
def report_int(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_int(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        int,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_int(answer=answer, comment=comment)
def report_json(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_json(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        str,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_json(answer=answer, comment=comment)
def report_list(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_list(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        str,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_list(answer=answer, comment=comment)
def report_set(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_set(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        list[Any],
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_set(answer=answer, comment=comment)
def report_text(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_text(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        str,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_text(answer=answer, comment=comment)
def report_toml(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_toml(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        str,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_toml(answer=answer, comment=comment)
def report_tuple(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_tuple(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        list[Any],
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_tuple(answer=answer, comment=comment)
def report_xml(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def report_xml(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config)

    answer = cast(
        str,
        arguments.get(
            "answer",
        ),
    )
    comment = cast(
        str,
        arguments.get(
            "comment",
        ),
    )
    return self.tool_answer_collector.report_xml(answer=answer, comment=comment)
def rewrite_file(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def rewrite_file(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = RewriteTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    text = cast(
        str,
        arguments.get(
            "text",
        ),
    )
    return tool.rewrite_file(file_path=file_path, text=text)
def sed(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def sed(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = SedTool(self.root_folder, self.config)

    commands = cast(
        str,
        arguments.get(
            "commands",
        ),
    )
    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    return tool.sed(commands=commands, file_path=file_path)
def tail(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def tail(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = HeadTailTool(self.root_folder, self.config)

    byte_count = cast(
        Optional[int],
        arguments.get(
            "byte_count",
        ),
    )
    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    lines = cast(int, arguments.get("lines", 10))
    return tool.tail(byte_count=byte_count, file_path=file_path, lines=lines)
def tail_markdown(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def tail_markdown(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = HeadTailTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    lines = cast(int, arguments.get("lines", 10))
    return tool.tail_markdown(file_path=file_path, lines=lines)
def write_new_file(self, arguments: dict[str, typing.Any]) ‑> Any

Generated Do Not Edit

Expand source code
def write_new_file(self, arguments: dict[str, Any]) -> Any:
    """Generated Do Not Edit"""
    tool = RewriteTool(self.root_folder, self.config)

    file_path = cast(
        str,
        arguments.get(
            "file_path",
        ),
    )
    text = cast(
        str,
        arguments.get(
            "text",
        ),
    )
    return tool.write_new_file(file_path=file_path, text=text)

Inherited members