Package ai_shell
Filesystem shell tools for OpenAI Assistant
ai_shell
OpenAI-centric shell for giving safe, chat-optimized, filesystem access to an Assistant as a "tool".
Even if you trust the bot to run bash directly on your machine or docker container, standard tools will run up your bill with excess tokens in the reply, or a command generates too few tokens and the bot doesn't know what is going on.
This is an alternative to code_interpreter
, tools running code in docker container locally, or tools running arbitrary
shell code locally.
Installation
pip install ai_shell
Usage
See these full examples. As long as the OPENAI_API_KEY environment variable is set, you can run these examples.
- Pylint bot will attempt to fix python code lint issues.
- Test writer bot will attempt to write unit tests for python code.
- Tool tester bot tries out tools to see if they basically work.
To execute demo bots, run these commands and follow initialization instructions if needed. They all expect to manipulate python code in an /src/ folder.
python -m ai_shell.demo_bots.docs_writer_bot
python -m ai_shell.demo_bots.pylint_bot
python -m ai_shell.demo_bots.test_writer_bot
python -m ai_shell.demo_bots.tool_tester_bot
python -m ai_shell.demo_bots.todo_bot
This is the python interface to the tools, how you're expected to wire up the tool to your bot.
import ai_shell
cat = ai_shell.CatTool(".")
print(cat.cat(["file.py"]))
print(cat.cat_markdown(["file.py"]))
ls = ai_shell.LsTool(".")
print(ls.ls("docs"))
print(ls.ls_markdown("docs"))
This is the smallest example to illustrate basic capabilities, also see here.
import asyncio
import ai_shell
async def main():
def static_keep_going(toolkit: ai_shell.ToolKit):
usage = toolkit.get_tool_usage_for("ls")
if usage["count"] > 0:
return (
"Great job! You've used ls. Summarize in paragraph form and we're done."
)
return (
"You haven't used the ls tool yet. Do you have access to the ls tool? If"
" there is a problem report it to the report_text tool to end the session."
)
# Creates temporary bots
bot = ai_shell.TaskBot(
ai_shell.Config(),
name="Folder inspection bot.",
bot_instructions="Run the ls tool and tell me what you see.",
model="gpt-3.5-turbo-1106",
dialog_logger_md=ai_shell.DialogLoggerWithMarkdown("./tmp"),
)
await bot.initialize()
the_ask = f"""You are in the './' folder. You do not need to guess the pwd, it is './'.
Run ls and tell me what you see in paragraph format."""
await bot.basic_tool_loop(
the_ask=the_ask,
root_folder="./src",
tool_names=[
"ls",
"report_text",
],
keep_going_prompt=static_keep_going,
)
if __name__ == "__main__":
asyncio.run(main())
This is the cli interface, which is intended for testing, not for bot usage.
ais cat_markdown --file-paths pyproject.toml
Features in Brief
- Many cli-like tools interfaces, such as ls, cat, grep, head, tail, and git.
- OpenAI glue for all cli tools.
- UX with a bot in mind.
- Security with mischievous but not especially malicious bot in mind.
- Bot (Assistant) boilerplate help
- Support for bots doing one shot tool use and goal function driven tool use.
- Bot have extensibility points.
- TODO: plugin system for tools.
Analogues supported today
Directories: ls, find
Files: cat, grep, head, tail
Editing: sed, ed, edlin, patch, replace, insert, rewrite, write new
Data: cut
Other: pycat, token counter, git
Tasking: todo
n.b. Every file is read and written as utf-8 strings.
Prior Art
ai_shell draws inspiration from various command-line interface (CLI) tools and shell environments, integrating features from traditional shells with OpenAI's language models. It is designed to provide an easy and secure interface for AI-assisted file system interactions, keeping in mind both usability and safety.
Documentation
Document Editing
The bot has a hard time with editing files.
Some top level strategies
- editor-like tools, eg ed and edlin
- text replacement tools, e.g. sed, regex, replace
- full document replacement
- write only new files
- multiline insert tool
- diff/patch tool
- well known tools vs simpler ad-hoc tools
- validated format
- Guided edit
Editors
The bot cannot consistently edit a document with ed or edlin. It gets confused about the state of the document, the syntax of editor, etc.
Text Replacers
It gets confused about the syntax of sed and regex.
It also has to keep straight complex escaping patterns as it write JSON that has to be escaped for JSON, which wraps editor syntax, which can included regex.
Full Text replacement
It constantly confused rewriting an entire document with modifications with writing just the modified lines, creating non-stop mass, accidental deletions.
Writing only new files
This shift the burden of merging document to a human.
Multiline Insert
This is similar to the sed and regex tool, but with simpler syntax. The cost for simplicity is that the bot has no background knowledge of how to use the multiline insert tool, while it has lots of knowledge of sed and regex.
Success vs Failure
When the bot gets a success flag, it thinks the job is done and stops early. When the bot gets a failure, it sometimes assumes the task is impossible and gives up. This is mitigated a little bit by returning the entire document after each edit so it can see what happened.
Diff/Patch
To create a well known Patch format, the bot has to be able to count lines perfectly. It often can't so the Patch tool rejects the patches and after a few attempts the bot gives up. Common patch tools provide almost no feedback because the bot is failing in a way that normal tools don't, so why would they provide helpful feedback?
Validated Format
In the case of, say python code, the syntax can be validated and if the bot messed up the document, it can be reverted adn the bot can try again.
Guided Edit
An example of guided edit is feeding the bot one line or paragraph of text and asking it to transform it or otherwise do something with it. Then ordinary code merges it in a predictable format. For example, show the bot a function, then ask for a docstring.
Features
General Features
- Tools are similar to common shell commands like ls, grep, head, tail, and git for easy use by the bot.
- Handle bot "accessibility". The bot can't see, hear, or interact with the console display.
- Designed to minimize token usage in both input and output
- Supports maximum row response for output.
- Supports mime_types
- Outputs in Markdown to reduce token repetition through sections and subsection headers.
- Implementation is simplified with no command chaining.
Text and Size Management
- TODO: Maximum output limits, both by config and by bot.
- Capability to count tokens, with a maximum limit and fallbacks to word or byte count.
- Implements whitespace and other lossless/lossy text compressions.
Source Code Display and Analysis
- Converts code to Markdown (.md) files, including minified versions.
- TODO: AST (Abstract Syntax Tree) display and search capabilities using pycodesearch.
- TODO: Features for displaying source code of functions, classes in imported modules.
- TODO: Simplified directory listings (dir(module)) and module help (help(module)) focusing on key elements.
Security Constraints
General Security Measures
- Provides multiple output formats: plain text, light markdown, and JSON (structured objects).
- TODO: Ignores files specified in .gitignore.
- TODO: Skips files hidden by the operating system.
- Prevents parent directory traversal. Disallows file system modification outside of specified folders.
- No direct shell access and write permissions are restricted.
- Limited write access to source code, isolated within a specific git branch.
- File writes are permitted only in a designated branch.
- No network access.
Bots and Subbots
(in progress)
I created this so I could create a swarm of bots to collaboratively work on a task. As it turns out, sub-bots or subbots, would be useful for tool usage.
Subbots
- tool selection. Too many tools or extraneous tools confuse the bot.
- prompt improvement. The bot can often write a better prompt.
- todo management. Splitting up tasks and managing them and doing the task makes the task execution part worse.
Subbot usage requires an editable config file for storing assistant ids.
Programmer's Manual
Possible goals
- Write a single shot bot, that uses tools but does not loop.
- Write a tool using bot. It uses tools, loops and has a goal function.
- Use the tools in the toolkit with your own bot framework
Extension Points
- Goal checking functions
- New tools with plugins
- External tools (with merge request)
Use Cases for ai_shell
Good uses
It is a library for making more bots, especially if the bot needs shell access and either that is dangerous or the bot has a hard time with the complexities of a real shell.
Bad uses
It in no way is trying to compete with unix tools, faithfully reproduce their behavior, etc.
Bots to add types
- Add typehints
- Do typehints make sense
Bots to add documentation
- Add docstrings
- Convert docstrings to particular format
- Check if docstrings still match the content of the function
- Add intra-code comments
Bot to do tests
- Add doctests
- Add unit tests
- Add examples (e.g. if name == main, at end of file)
Work lint issues
- Run pylint, bandit, mypy, etc, work issues
Code Generation
A code generation tool generates all the OpenAI assistant code
- jsonschema, as generated from the method signatures
- toolkit, the python tool invocation code, as generated from the jsonschema
- a CLI interface in
__main__.py
that invokes the tool for human testing, not intended for bot use.
JSON
The tools expect chaotic JSON from the bot.
- truncated JSON
- Invalid escape codes
- Function args sometimes are the wrong type, e.g. string instead of list of strings.
All messages returned are JSON, even if the return value is plaintext markdown, it is wrapped in a layer of JSON.
Error messages for the bot also need to be json, so I used RFC-7807 as inspiration.
Toolkit Safety
The bot sometimes attempts to call tools that don't exist, e.g. parallel
Sometimes this means the bot knows about a
feature not
yet known to the openai client. To be safe, you have to whitelist tools that the bot can use. Otherwise it could
potentially call a tool by guessing its name.
Mime Types
"Mime types for API". Rather than give the bot two or more formats, I will implement a mimetype, so that the APIs are configured to either return markdown or structure, depending on config. The mime type is still wrapped in JSON! Just less of it.
The bot sometimes fills in mime type when it is submitting a file thinking that the mime type is for classifying input.
Editing
The bot finds it difficult to edit files. For many techniques, it can get close. In chat dialogs, the bot wants to regenerate the whole document to edit even a single character. Or it wants to use ad hoc diffs. This is problematic for programatic use.
Linux-Like Editing Strategies
The bot knows unix tools and often can get really close to using a function that behaves like that tool.
- ed. Ed is such a bad editor even the bot has hard time with it.
- edlin/dedlin. Edlin's commands almost can be used as an executable script.
- patch. git diffs/unidiffs. The bot can sometimes generate a diff. Sometimes the diff is corrupt because it requires careful line counting. Sometimes the bot create a diff for a hallucinated target file.
- sed. Fancy replace.
Simpler Ad Hoc Tools.
- replace. This has no equivalent in unix, it is a replacer that doesn't require line numbers.
- insert. This has no equivalent in unix, it is an inserter that doesn't require line numbers.
- rewrite. This has no equivalent in unix, it is a tool to rewrite a file, or create a new one, e.g. a corrected copy.
A surprising with all of these approaches is that the bot doesn't check its work (won't run a cat after an edit), and often just assumes everything worked because it didn't get an error message. It assumes that the success message means the user really is happy and the task is done.
Viewing Strategies
- Cat.
- Head/Tail.
- PyCat. Work in progress, view python code either aggregated or compressed.
File Browsing Strategies
- Ls. Returns directory tree if bot tries to browse a directory that doesn't exist.
- Grep.
- Find.
- Cut. Primitive CSV browser
Safety Tools
- Git. Particularly to allow for commit/revert
- Diff tool.
Logging
At least three kinds of logging
- REST API calls. Each API call to OpenAPI written as JSON document.
- Tool commands. This is a nearly executable bash script to replay what the bot did.
- The dialog. This is a chat log style log.
Config
Config is intended to bound the behavior of certain tools, to persist a bot and to enable helper bots.
Helper bots attempt to help the bot use tools by letting specialized bot focus on a narrow part of the problem of using tool.
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
[1.0.4] - 2024-01-20
Added
- Started tracking changes.
Expand source code
"""
Filesystem shell tools for OpenAI Assistant
.. include:: ../README.md
.. include:: ../docs/DocumentEditing.md
.. include:: ../docs/Features.md
.. include:: ../docs/Manual.md
.. include:: ../docs/UseCases.md
.. include:: ../docs/Design.md
.. include:: ../CHANGELOG.md
"""
from ai_shell.ai_logs.log_to_markdown import DialogLoggerWithMarkdown
from ai_shell.ai_logs.logging_utils import configure_logging
from ai_shell.answer_tool import AnswerCollectorTool
from ai_shell.bot_glue.bot import TaskBot
from ai_shell.cat_tool import CatTool
from ai_shell.cut_tool import CutTool
from ai_shell.ed_tool import EdTool
from ai_shell.edlin_tool import EdlinTool
from ai_shell.externals import pytest_call
from ai_shell.externals.black_call import invoke_black
from ai_shell.externals.pygount_call import count_lines_of_code
from ai_shell.externals.pylint_call import invoke_pylint
from ai_shell.find_tool import FindTool
from ai_shell.git_tool import GitTool
from ai_shell.grep_tool import GrepTool
from ai_shell.head_tail_tool import HeadTailTool
from ai_shell.insert_tool import InsertTool
from ai_shell.ls_tool import LsTool
from ai_shell.openai_toolkit import ToolKit
from ai_shell.openai_tools import ALL_TOOLS, initialize_all_tools, initialize_recommended_tools
from ai_shell.patch_tool import PatchTool
from ai_shell.pycat_tool import PyCatTool
from ai_shell.pytest_tool import PytestTool
from ai_shell.replace_tool import ReplaceTool
from ai_shell.rewrite_tool import RewriteTool
from ai_shell.sed_tool import SedTool
from ai_shell.todo_tool import TodoTool
from ai_shell.token_tool import TokenCounterTool
from ai_shell.utils.config_manager import Config
from ai_shell.utils.cwd_utils import change_directory
__all__ = [
"CatTool",
"CutTool",
"FindTool",
"GrepTool",
"HeadTailTool",
"LsTool",
"GitTool",
"TokenCounterTool",
"PatchTool",
"RewriteTool",
"PyCatTool",
"EdTool",
"EdlinTool",
"ToolKit",
"SedTool",
"ReplaceTool",
"InsertTool",
"TodoTool",
"AnswerCollectorTool",
"PytestTool",
# Tool and general config
"initialize_all_tools",
"initialize_recommended_tools",
"Config",
"ALL_TOOLS",
# logging support
"configure_logging",
"DialogLoggerWithMarkdown",
# bot support
"TaskBot",
# goal checker tools
"invoke_pylint",
"pytest_call",
"invoke_black",
"count_lines_of_code",
# misc that could have been 3rd party
"change_directory",
]
Sub-modules
ai_shell.ai_logs
ai_shell.answer_tool
-
Call a tool, but actually the tool is just a way to get a structured response …
ai_shell.backup_restore
ai_shell.bot_glue
-
Just enough bot code to exercise the tools. Bots are not the focus of this project, but without a bot you can't check if the tools are any good for a …
ai_shell.cat_tool
-
Cat optimized for AI prompts.
ai_shell.code_generate
-
Generate code for AI Shell using the docstrings as source data …
ai_shell.cut_tool
-
AI optimized cut tool …
ai_shell.demo_bots
ai_shell.diff_tool
-
Apply patch with unidiff instead of Git
ai_shell.ed_tool
-
Ed is a bad editor, but it is documented …
ai_shell.edlin_tool
-
Give bot access to an edlin clone …
ai_shell.externals
-
Wrappers for external CLI tools that require subprocess calls.
ai_shell.find_tool
-
AI Optimized version of find, but much simpler …
ai_shell.git_tool
-
Wrapper around GitPython and shell commands to git.
ai_shell.grep_tool
-
AI optimized grep tool
ai_shell.head_tail_tool
-
AI optimized head/tail tool
ai_shell.import_plugins
ai_shell.insert_tool
-
Text editor for simple text insertion at line or context.
ai_shell.ls_tool
-
Directory listing tool, optimized for AI.
ai_shell.openai_schemas
-
jsonschema for functions
ai_shell.openai_support
-
All the tools are optimized for LLMs, but not openai specifically …
ai_shell.openai_toolkit
-
Generate code, do not edit.
ai_shell.openai_tools
-
All the tools are optimized for LLMs, but not openai specifically …
ai_shell.patch_tool
-
Let the bot patch files instead of full rewrite. Also it is an alternative to editing with ed or edlin …
ai_shell.plugins
ai_shell.pycat_tool
-
Cat, except optimized for python files.
ai_shell.pytest_tool
-
Optimized for AI version of pytest.
ai_shell.pyutils
-
When the bot views a file, we might want to save tokens. If it is python file, we can compress it …
ai_shell.read_py_source
-
Read source with tools that understand the structure of python
ai_shell.regex_tester_tool
-
The bot needs to be able to test regex expressions before it uses them.
ai_shell.replace_tool
-
Optimized for AI version of sed. For file editing …
ai_shell.rewrite_tool
-
For short files with lots of edits, just let the bot rewrite the file …
ai_shell.sed_tool
-
Optimized for AI version of sed. For file editing …
ai_shell.subbots
-
Bots that help other bots achieve its goals.
ai_shell.todo_tool
-
AI Optimized TODO tool. Alternative to JIRA or the like …
ai_shell.token_tool
-
Token Counting …
ai_shell.utils
-
Miscellaneous utilities that cluttered up the other namespaces.
Functions
def change_directory(new_path: str) ‑> collections.abc.Iterator[None]
-
Change the current working directory to a new path.
Args
new_path
:str
- The new path to change to.
Expand source code
@contextmanager def change_directory(new_path: str) -> Iterator[None]: """Change the current working directory to a new path. Args: new_path (str): The new path to change to. """ original_directory = os.getcwd() try: os.chdir(new_path) yield None finally: os.chdir(original_directory)
def configure_logging() ‑> dict[str, typing.Any]
-
Basic style
Expand source code
def configure_logging() -> dict[str, Any]: """Basic style""" logging_config: dict[str, Any] = { "version": 1, "disable_existing_loggers": True, "formatters": { "standard": {"format": "[%(levelname)s] %(name)s: %(message)s"}, }, "handlers": { "default": { "level": "DEBUG", "formatter": "standard", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", # Default is stderr }, # "bug_trail": { # "level": "DEBUG", # # "formatter": "standard", # "class": "bug_trail_core.BugTrailHandler", # "db_path": bug_trail_config.database_path, # "minimum_level": logging.DEBUG, # }, # "json": { # # "()": "json_file_handler_factory", # "level": "DEBUG", # "class": "ai_shell.utils.json_log_handler.JSONFileHandler", # "directory": "api_logs", # "module_name": "openai", # }, }, "loggers": { # root logger can capture too much "": { # root logger "handlers": ["default", "bug_trail"], "level": "DEBUG", "propagate": False, }, }, } debug_level_modules: list[str] = ["__main__", "ai_shell", "minimal_example"] info_level_modules: list[str] = [] warn_level_modules: list[str] = [] # json handler for name in ["openai"]: logging_config["loggers"][name] = { "handlers": [], # ["json"], "level": "DEBUG", "propagate": False, } for name in debug_level_modules: logging_config["loggers"][name] = { "handlers": [ "default", # "bug_trail" ], "level": "DEBUG", "propagate": False, } for name in info_level_modules: logging_config["loggers"][name] = { "handlers": [ "default", # "bug_trail" ], "level": "INFO", "propagate": False, } for name in warn_level_modules: logging_config["loggers"][name] = { "handlers": [ "default", # "bug_trail" ], "level": "WARNING", "propagate": False, } return logging_config
def count_lines_of_code(file_path: str) ‑> pygount.analysis.SourceAnalysis
-
Check the lines of code in a file. File must exist.
Args
file_path
:str
- The path to the file.
Returns
SourceAnalysis
- The analysis of the file, including line counts.
Expand source code
def count_lines_of_code(file_path: str) -> SourceAnalysis: """ Check the lines of code in a file. File must exist. Args: file_path (str): The path to the file. Returns: SourceAnalysis: The analysis of the file, including line counts. """ return SourceAnalysis.from_file(file_path, "pygount", encoding="utf-8")
def initialize_all_tools(skips: Optional[list[str]] = None, keeps: Optional[list[str]] = None) ‑> None
-
Initialize all tools
Args
skips
:Optional[list[str]]
, optional- Tools to skip. Defaults to None.
keeps
:Optional[list[str]]
, optional- Tools to keep. Defaults to None.
Expand source code
def initialize_all_tools(skips: Optional[list[str]] = None, keeps: Optional[list[str]] = None) -> None: """Initialize all tools Args: skips (Optional[list[str]], optional): Tools to skip. Defaults to None. keeps (Optional[list[str]], optional): Tools to keep. Defaults to None. """ if keeps is not None: keep = keeps elif skips is None: keep = just_tool_names() else: keep = [name for name in just_tool_names() if name not in skips] for _ns, tools in _SCHEMAS.items(): for name, schema in tools.items(): function_style: dict[str, Union[str, Collection[str]]] = {"name": name} parameters = {"type": "object", "properties": schema["properties"], "required": schema["required"]} function_style["parameters"] = parameters function_style["description"] = schema["description"] if name in keep: ALL_TOOLS.append(function_style) active_tools_string = ", ".join(tool["name"] for tool in ALL_TOOLS) logger.info(f"Active tools {active_tools_string}")
def initialize_recommended_tools(root_folder: str, config: Config) ‑> None
-
Initialize recommended tools
Args
root_folder
:str
- The root folder to recommend tools for.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
def initialize_recommended_tools(root_folder: str, config: Config) -> None: """Initialize recommended tools Args: root_folder (str): The root folder to recommend tools for. config (Config): The developer input that bot shouldn't set. """ initialize_all_tools(keeps=recommendations(root_folder, config))
def invoke_black(file_path: str) ‑> CommandResult
-
Runs black on the file or folder. Code 128 means the file is hosed.
Args
file_path
:str
- The name of the module to run black on.
Returns
CommandResult
- The result of the command.
Expand source code
def invoke_black(file_path: str) -> CommandResult: """ Runs black on the file or folder. Code 128 means the file is hosed. Args: file_path (str): The name of the module to run black on. Returns: CommandResult: The result of the command. """ command_name = "black" arg_string = f"'{file_path}' --check" return safe_subprocess(command_name, arg_string)
def invoke_pylint(module_name: str, minimum_score: float) ‑> CommandResult
-
Runs pylint on the module.
Args
module_name
:str
- The name of the module to run pylint on.
minimum_score
:float
- The minimum score to pass.
Returns
CommandResult
- The result of the command.
Expand source code
def invoke_pylint(module_name: str, minimum_score: float) -> CommandResult: """ Runs pylint on the module. Args: module_name (str): The name of the module to run pylint on. minimum_score (float): The minimum score to pass. Returns: CommandResult: The result of the command. """ command_name = "pylint" arg_string = f"'{module_name}' --fail-under {minimum_score}" # generic response. return safe_subprocess(command_name, arg_string)
Classes
class AnswerCollectorTool (root_folder: str, config: Config)
-
Initialize the PytestTool class.
Args
root_folder
:str
- The root folder path for file operations. (Not used yet)
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class AnswerCollectorTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the PytestTool class. Args: root_folder (str): The root folder path for file operations. (Not used yet) config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.comment: Optional[str] = None self.bool_answer: Optional[bool] = None self.json_answer: Optional[str] = None self.xml_answer: Optional[str] = None self.toml_answer: Optional[str] = None self.tuple_answer: Optional[tuple] = None self.set_answer: Optional[set] = None self.text_answer: Optional[str] = None self.list_answer: Optional[list[str]] = None self.int_answer: Optional[int] = None self.float_answer: Optional[float] = None self.dict_answer: Optional[dict[str, Any]] = None self.response_received = "Response received." def _answered(self) -> None: """Check if this tool has been used. Raises: TypeError: If the tool has been used. Recreate a new one after each usage. """ if any( [ self.comment, self.bool_answer is not None, self.json_answer, self.xml_answer, self.toml_answer, self.tuple_answer, self.set_answer, self.text_answer, self.list_answer, self.int_answer, self.float_answer, self.dict_answer, ] ): raise TypeError("This Answer tool has been used. Please create a new one for another answer.") @log() def report_list(self, answer: list[str], comment: str = "") -> str: """Report answer in list format. Args: answer (list[str]): The answer to be reported in list format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.list_answer = answer self.comment = comment return self.response_received @log() def report_int(self, answer: int, comment: str = "") -> str: """Report answer in integer format Args: answer (int): The answer to be reported in integer format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.int_answer = answer self.comment = comment return self.response_received @log() def report_float(self, answer: float, comment: str = "") -> str: """Report answer in string format. Args: answer (float): The answer to be reported in float format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.float_answer = answer self.comment = comment return self.response_received @log() def report_dict(self, answer: dict[str, Any], comment: str = "") -> str: """Report answer in dict format. Args: answer (dict[str, Any]): The answer to be reported in dict format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.dict_answer = answer self.comment = comment return self.response_received @log() def report_text(self, answer: str, comment: str = "") -> str: """Report answer in string format. Args: answer (str): The answer to be reported in string format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.text_answer = answer self.comment = comment return self.response_received @log() def report_bool(self, answer: bool, comment: str = "") -> str: """Report answer in bool format. Args: answer (bool): The answer to be reported in bool format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.bool_answer = answer self.comment = comment return self.response_received @log() def report_tuple(self, answer: tuple, comment: str = "") -> str: """Report answer in tuple format. Args: answer (tuple): The answer to be reported in tuple format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.tuple_answer = answer self.comment = comment return self.response_received @log() def report_set(self, answer: set, comment: str = "") -> str: """Report answer in set format. Args: answer (set): The answer to be reported in set format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.set_answer = answer self.comment = comment return self.response_received @log() def report_json(self, answer: str, comment: str = "") -> str: """Report answer in json format. Args: answer (str): The answer to be reported in json format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.json_answer = answer self.comment = comment return self.response_received @log() def report_xml(self, answer: str, comment: str = "") -> str: """Report answer in xml format. Args: answer (str): The answer to be reported in xml format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.xml_answer = answer self.comment = comment return self.response_received @log() def report_toml(self, answer: str, comment: str = "") -> str: """Report answer in toml format. Args: answer (str): The answer to be reported in toml format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.toml_answer = answer self.comment = comment return self.response_received
Methods
def report_bool(self, answer: bool, comment: str = '') ‑> str
-
Report answer in bool format.
Args
answer
:bool
- The answer to be reported in bool format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_bool(self, answer: bool, comment: str = "") -> str: """Report answer in bool format. Args: answer (bool): The answer to be reported in bool format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.bool_answer = answer self.comment = comment return self.response_received
def report_dict(self, answer: dict[str, typing.Any], comment: str = '') ‑> str
-
Report answer in dict format.
Args
answer
:dict[str, Any]
- The answer to be reported in dict format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_dict(self, answer: dict[str, Any], comment: str = "") -> str: """Report answer in dict format. Args: answer (dict[str, Any]): The answer to be reported in dict format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.dict_answer = answer self.comment = comment return self.response_received
def report_float(self, answer: float, comment: str = '') ‑> str
-
Report answer in string format.
Args
answer
:float
- The answer to be reported in float format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_float(self, answer: float, comment: str = "") -> str: """Report answer in string format. Args: answer (float): The answer to be reported in float format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.float_answer = answer self.comment = comment return self.response_received
def report_int(self, answer: int, comment: str = '') ‑> str
-
Report answer in integer format
Args
answer
:int
- The answer to be reported in integer format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_int(self, answer: int, comment: str = "") -> str: """Report answer in integer format Args: answer (int): The answer to be reported in integer format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.int_answer = answer self.comment = comment return self.response_received
def report_json(self, answer: str, comment: str = '') ‑> str
-
Report answer in json format.
Args
answer
:str
- The answer to be reported in json format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_json(self, answer: str, comment: str = "") -> str: """Report answer in json format. Args: answer (str): The answer to be reported in json format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.json_answer = answer self.comment = comment return self.response_received
def report_list(self, answer: list[str], comment: str = '') ‑> str
-
Report answer in list format.
Args
answer
:list[str]
- The answer to be reported in list format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_list(self, answer: list[str], comment: str = "") -> str: """Report answer in list format. Args: answer (list[str]): The answer to be reported in list format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.list_answer = answer self.comment = comment return self.response_received
def report_set(self, answer: set, comment: str = '') ‑> str
-
Report answer in set format.
Args
answer
:set
- The answer to be reported in set format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_set(self, answer: set, comment: str = "") -> str: """Report answer in set format. Args: answer (set): The answer to be reported in set format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.set_answer = answer self.comment = comment return self.response_received
def report_text(self, answer: str, comment: str = '') ‑> str
-
Report answer in string format.
Args
answer
:str
- The answer to be reported in string format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_text(self, answer: str, comment: str = "") -> str: """Report answer in string format. Args: answer (str): The answer to be reported in string format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.text_answer = answer self.comment = comment return self.response_received
def report_toml(self, answer: str, comment: str = '') ‑> str
-
Report answer in toml format.
Args
answer
:str
- The answer to be reported in toml format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_toml(self, answer: str, comment: str = "") -> str: """Report answer in toml format. Args: answer (str): The answer to be reported in toml format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.toml_answer = answer self.comment = comment return self.response_received
def report_tuple(self, answer: tuple, comment: str = '') ‑> str
-
Report answer in tuple format.
Args
answer
:tuple
- The answer to be reported in tuple format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_tuple(self, answer: tuple, comment: str = "") -> str: """Report answer in tuple format. Args: answer (tuple): The answer to be reported in tuple format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.tuple_answer = answer self.comment = comment return self.response_received
def report_xml(self, answer: str, comment: str = '') ‑> str
-
Report answer in xml format.
Args
answer
:str
- The answer to be reported in xml format.
comment
:str
- Any comments, supplemental info about the answer.
Returns
str
- A string indicating that the response has been received.
Expand source code
@log() def report_xml(self, answer: str, comment: str = "") -> str: """Report answer in xml format. Args: answer (str): The answer to be reported in xml format. comment (str): Any comments, supplemental info about the answer. Returns: str: A string indicating that the response has been received. """ self._answered() self.xml_answer = answer self.comment = comment return self.response_received
class CatTool (root_folder: str, config: Config)
-
Simulates
cat
cli tool.Initialize the CatTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class CatTool: """ Simulates `cat` cli tool. """ def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the CatTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config @log() def cat_markdown( self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False, ) -> str: """ Concatenates the content of given file paths and formats them as markdown. Args: file_paths (list[str]): List of file paths to concatenate. number_lines (bool): If True, number all output lines. squeeze_blank (bool): If True, consecutive blank lines are squeezed to one. Returns: str: The concatenated and formatted content as a string. """ output = StringIO() for line in self.cat(file_paths, number_lines, squeeze_blank): output.write(line) # output.write("\n") output.seek(0) return output.read() @log() def cat( self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False, ) -> Generator[str, None, None]: """ Mimics the basic functionalities of the 'cat' command in Unix. Args: file_paths (list[str]): A list of file paths to concatenate. number_lines (bool): If True, number all output lines. squeeze_blank (bool): If True, consecutive blank lines are squeezed to one. Returns: Generator[str, None, None] Yields: str: Each line of the concatenated files. """ file_paths = convert_to_list(file_paths) for location, file_path in enumerate(file_paths): if file_path.startswith("./"): file_paths[location] = file_path[2:] logger.info(f"cat --file_paths {file_paths} " f"--number_lines {number_lines} --squeeze_blank {squeeze_blank}") for file_path in file_paths: if not is_file_in_root_folder(file_path, self.root_folder): raise TypeError("No parent folder traversals allowed") line_number = 1 for glob_pattern in file_paths: for file_path in safe_glob(glob_pattern, self.root_folder): if not os.path.isabs(file_path): file_path = self.root_folder + "/" + file_path try: with open(file_path, "rb") as file: for line in self._process_cat_file(file, line_number, number_lines, squeeze_blank): yield line line_number += 1 except PermissionError: logger.warning(f"Permission denied: {file_path}, suppressing from output.") def _process_cat_file( self, file: IO[bytes], line_number: int, number_lines: bool, squeeze_blank: bool, ) -> Generator[str, None, None]: """ Processes a file for concatenation, applying the specified formatting. Args: file: The file object to process. line_number (int): Current line number for numbering lines. number_lines (bool): If True, number all output lines. squeeze_blank (bool): If True, consecutive blank lines are squeezed to one. Returns: Generator[str, None, None]: A generator of processed lines. Yields: str: Each processed line of the file. """ was_blank = False for byte_lines in file: # if isinstance(byte_lines, bytes): line = byte_lines.decode("utf-8") # Decode bytes to string # Use StringIO for memory-efficient line processing with StringIO() as line_buffer: # Normalize line endings to \n line = line.replace("\r\n", "\n") line_buffer.write(line) if squeeze_blank and was_blank and line.strip() == "": continue # Skip consecutive blank lines was_blank = line.strip() == "" if number_lines: line_buffer.seek(0) line = f"{line_number}\t{line_buffer.read()}" line_number += 1 else: line = line_buffer.getvalue() yield line
Methods
def cat(self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False) ‑> collections.abc.Generator[str, None, None]
-
Mimics the basic functionalities of the 'cat' command in Unix.
Args
file_paths
:list[str]
- A list of file paths to concatenate.
number_lines
:bool
- If True, number all output lines.
squeeze_blank
:bool
- If True, consecutive blank lines are squeezed to one.
Returns
Generator[str, None, None]
Yields
str
- Each line of the concatenated files.
Expand source code
@log() def cat( self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False, ) -> Generator[str, None, None]: """ Mimics the basic functionalities of the 'cat' command in Unix. Args: file_paths (list[str]): A list of file paths to concatenate. number_lines (bool): If True, number all output lines. squeeze_blank (bool): If True, consecutive blank lines are squeezed to one. Returns: Generator[str, None, None] Yields: str: Each line of the concatenated files. """ file_paths = convert_to_list(file_paths) for location, file_path in enumerate(file_paths): if file_path.startswith("./"): file_paths[location] = file_path[2:] logger.info(f"cat --file_paths {file_paths} " f"--number_lines {number_lines} --squeeze_blank {squeeze_blank}") for file_path in file_paths: if not is_file_in_root_folder(file_path, self.root_folder): raise TypeError("No parent folder traversals allowed") line_number = 1 for glob_pattern in file_paths: for file_path in safe_glob(glob_pattern, self.root_folder): if not os.path.isabs(file_path): file_path = self.root_folder + "/" + file_path try: with open(file_path, "rb") as file: for line in self._process_cat_file(file, line_number, number_lines, squeeze_blank): yield line line_number += 1 except PermissionError: logger.warning(f"Permission denied: {file_path}, suppressing from output.")
def cat_markdown(self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False) ‑> str
-
Concatenates the content of given file paths and formats them as markdown.
Args
file_paths
:list[str]
- List of file paths to concatenate.
number_lines
:bool
- If True, number all output lines.
squeeze_blank
:bool
- If True, consecutive blank lines are squeezed to one.
Returns
str
- The concatenated and formatted content as a string.
Expand source code
@log() def cat_markdown( self, file_paths: list[str], number_lines: bool = True, squeeze_blank: bool = False, ) -> str: """ Concatenates the content of given file paths and formats them as markdown. Args: file_paths (list[str]): List of file paths to concatenate. number_lines (bool): If True, number all output lines. squeeze_blank (bool): If True, consecutive blank lines are squeezed to one. Returns: str: The concatenated and formatted content as a string. """ output = StringIO() for line in self.cat(file_paths, number_lines, squeeze_blank): output.write(line) # output.write("\n") output.seek(0) return output.read()
class Config (config_path: str = '')
-
A class for managing the ai_shell.toml file.
This is for globally available things that shouldn't or can't be set by the bot.
Initialize the Config class.
Expand source code
class Config: """A class for managing the ai_shell.toml file. This is for globally available things that shouldn't or can't be set by the bot. """ def __init__(self, config_path: str = "") -> None: """Initialize the Config class.""" if config_path and config_path.endswith(".toml"): self.config_file = config_path elif config_path: self.config_file = os.path.join(config_path, "ai_shell.toml") else: self.config_file = os.getenv("CONFIG_PATH", "ai_shell.toml") # freeze the location of the config file self.config_file = os.path.abspath(self.config_file) self._list_data: dict[str, list[str]] = {} self._values_data: dict[str, str] = {} self._flags_data: dict[str, bool] = { "enable_tool_selector_bot": False, "enable_regex_tester_bot": False, "enable_prompt_improver_bot": False, "enable_dialog_log": False, "enable_shell_log": False, "enable_api_log": False, "enable_autocat": True, } self._bots_data: list = [] self.load_config() def load_config(self) -> None: """Load the config from the config file.""" if os.path.isfile(self.config_file): data = toml.load(self.config_file) self._flags_data = data["flags"] self._bots_data = data["bots"] self._values_data = data["values"] self._list_data = data["lists"] else: self.save_config() if len(self._bots_data) > 100: raise ValueError( f"You have too many bots. Bot persistence must be failing somewhere, see {self.config_file}" ) def save_config(self): """Save the config to the config file.""" if not os.path.isabs(self.config_file): raise ValueError("Config file path must be absolute.") with open(self.config_file, "w", encoding="utf-8") as f: toml.dump( { "flags": self._flags_data, "bots": self._bots_data, "values": self._values_data, "lists": self._list_data, }, f, ) def add_bot(self, assistant_id: str, name: str) -> None: """Add a bot to the config. Args: assistant_id (str): The ID of the bot. name (str): The name of the bot. """ bot = Bot(assistant_id, name) self._bots_data.append(dataclasses.asdict(bot)) self.save_config() def set_flag(self, flag_name: str, value: bool) -> None: """Set the value of the given flag. Args: flag_name (str): The name of the flag. value (str): The value of the flag. """ self._flags_data[flag_name] = value self.save_config() def cleanup(self) -> None: """Remove bots that have been deleted on OpenAI's side.""" openai.api_key = os.getenv("OPENAI_API_KEY") existing_bots = openai.beta.assistants.list() assistant_ids = [bot.id for bot in existing_bots.data] # Remove bots that no longer exist in OpenAI self._bots_data = [bot for bot in self._bots_data if bot["assistant_id"] in assistant_ids] self.save_config() def get_bots(self) -> list[Bot]: """Return a list of Bot objects.""" return [Bot(**bot) for bot in self._bots_data] def get_bot(self, name: str) -> Optional[Bot]: """Return a Bot object with the given name. Args: name (str): The name of the bot. Returns: Optional[Bot]: The bot with the given name, or None if no bot with that name exists. """ for bot in self._bots_data: if bot["name"] == name: return Bot(**bot) return None def get_flag(self, flag_name: str, default_value: Optional[bool] = None) -> Optional[bool]: """Return the value of the given flag. Args: flag_name (str): The name of the flag. default_value (Optional[bool], optional): The default value to return if the flag does not exist. Defaults to None. Returns: Optional[bool]: The value of the flag, or None if the flag does not exist. """ return self._flags_data.get(flag_name, default_value) def get_value(self, name: str, default: Optional[str] = None) -> Optional[str]: """Return the value of the given named value. Args: name (str): The name of the config value. default (Optional[str], optional): The default value to return if the value does not exist. Defaults to None. Returns: Optional[str]: The value of the named value, or None if the name does not exist. """ return self._values_data.get(name, default) def set_list(self, list_name: str, value: list[str]) -> None: """Set the value of the given list of values. Args: list_name (str): The name of the config value. value (list[str]): The value of the list. """ self._list_data[list_name] = value self.save_config() def get_required_value(self, name: str) -> str: """Return the value of the given named value. Args: name (str): The name of the config value. Returns: str: The value. Raises: FatalConfigurationError: If the value does not exist. """ value = self._values_data.get(name, None) if value is None: raise FatalConfigurationError(f"Need {name} in config file") return value def get_list(self, list_name: str) -> list[str]: """Return the value of the given list of values. Args: list_name (str): The name of the config value. Returns: list[str]: The list from the config file. """ return self._list_data.get(list_name, [])
Methods
def add_bot(self, assistant_id: str, name: str) ‑> None
-
Add a bot to the config.
Args
assistant_id
:str
- The ID of the bot.
name
:str
- The name of the bot.
Expand source code
def add_bot(self, assistant_id: str, name: str) -> None: """Add a bot to the config. Args: assistant_id (str): The ID of the bot. name (str): The name of the bot. """ bot = Bot(assistant_id, name) self._bots_data.append(dataclasses.asdict(bot)) self.save_config()
def cleanup(self) ‑> None
-
Remove bots that have been deleted on OpenAI's side.
Expand source code
def cleanup(self) -> None: """Remove bots that have been deleted on OpenAI's side.""" openai.api_key = os.getenv("OPENAI_API_KEY") existing_bots = openai.beta.assistants.list() assistant_ids = [bot.id for bot in existing_bots.data] # Remove bots that no longer exist in OpenAI self._bots_data = [bot for bot in self._bots_data if bot["assistant_id"] in assistant_ids] self.save_config()
def get_bot(self, name: str) ‑> Optional[Bot]
-
Return a Bot object with the given name.
Args
name
:str
- The name of the bot.
Returns
Optional[Bot]
- The bot with the given name, or None if no bot with that name exists.
Expand source code
def get_bot(self, name: str) -> Optional[Bot]: """Return a Bot object with the given name. Args: name (str): The name of the bot. Returns: Optional[Bot]: The bot with the given name, or None if no bot with that name exists. """ for bot in self._bots_data: if bot["name"] == name: return Bot(**bot) return None
def get_bots(self) ‑> list[Bot]
-
Return a list of Bot objects.
Expand source code
def get_bots(self) -> list[Bot]: """Return a list of Bot objects.""" return [Bot(**bot) for bot in self._bots_data]
def get_flag(self, flag_name: str, default_value: Optional[bool] = None) ‑> Optional[bool]
-
Return the value of the given flag.
Args
flag_name
:str
- The name of the flag.
default_value
:Optional[bool]
, optional- The default value to return if the flag does not exist. Defaults to None.
Returns
Optional[bool]
- The value of the flag, or None if the flag does not exist.
Expand source code
def get_flag(self, flag_name: str, default_value: Optional[bool] = None) -> Optional[bool]: """Return the value of the given flag. Args: flag_name (str): The name of the flag. default_value (Optional[bool], optional): The default value to return if the flag does not exist. Defaults to None. Returns: Optional[bool]: The value of the flag, or None if the flag does not exist. """ return self._flags_data.get(flag_name, default_value)
def get_list(self, list_name: str) ‑> list[str]
-
Return the value of the given list of values.
Args
list_name
:str
- The name of the config value.
Returns
list[str]
- The list from the config file.
Expand source code
def get_list(self, list_name: str) -> list[str]: """Return the value of the given list of values. Args: list_name (str): The name of the config value. Returns: list[str]: The list from the config file. """ return self._list_data.get(list_name, [])
def get_required_value(self, name: str) ‑> str
-
Return the value of the given named value.
Args
name
:str
- The name of the config value.
Returns
str
- The value.
Raises
FatalConfigurationError
- If the value does not exist.
Expand source code
def get_required_value(self, name: str) -> str: """Return the value of the given named value. Args: name (str): The name of the config value. Returns: str: The value. Raises: FatalConfigurationError: If the value does not exist. """ value = self._values_data.get(name, None) if value is None: raise FatalConfigurationError(f"Need {name} in config file") return value
def get_value(self, name: str, default: Optional[str] = None) ‑> Optional[str]
-
Return the value of the given named value.
Args
name
:str
- The name of the config value.
default
:Optional[str]
, optional- The default value to return if the value does not exist. Defaults to None.
Returns
Optional[str]
- The value of the named value, or None if the name does not exist.
Expand source code
def get_value(self, name: str, default: Optional[str] = None) -> Optional[str]: """Return the value of the given named value. Args: name (str): The name of the config value. default (Optional[str], optional): The default value to return if the value does not exist. Defaults to None. Returns: Optional[str]: The value of the named value, or None if the name does not exist. """ return self._values_data.get(name, default)
def load_config(self) ‑> None
-
Load the config from the config file.
Expand source code
def load_config(self) -> None: """Load the config from the config file.""" if os.path.isfile(self.config_file): data = toml.load(self.config_file) self._flags_data = data["flags"] self._bots_data = data["bots"] self._values_data = data["values"] self._list_data = data["lists"] else: self.save_config() if len(self._bots_data) > 100: raise ValueError( f"You have too many bots. Bot persistence must be failing somewhere, see {self.config_file}" )
def save_config(self)
-
Save the config to the config file.
Expand source code
def save_config(self): """Save the config to the config file.""" if not os.path.isabs(self.config_file): raise ValueError("Config file path must be absolute.") with open(self.config_file, "w", encoding="utf-8") as f: toml.dump( { "flags": self._flags_data, "bots": self._bots_data, "values": self._values_data, "lists": self._list_data, }, f, )
def set_flag(self, flag_name: str, value: bool) ‑> None
-
Set the value of the given flag.
Args
flag_name
:str
- The name of the flag.
value
:str
- The value of the flag.
Expand source code
def set_flag(self, flag_name: str, value: bool) -> None: """Set the value of the given flag. Args: flag_name (str): The name of the flag. value (str): The value of the flag. """ self._flags_data[flag_name] = value self.save_config()
def set_list(self, list_name: str, value: list[str]) ‑> None
-
Set the value of the given list of values.
Args
list_name
:str
- The name of the config value.
value
:list[str]
- The value of the list.
Expand source code
def set_list(self, list_name: str, value: list[str]) -> None: """Set the value of the given list of values. Args: list_name (str): The name of the config value. value (list[str]): The value of the list. """ self._list_data[list_name] = value self.save_config()
class CutTool (root_folder: str, config: Config)
-
Simulates
cut
cli tool.Initialize the CatTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
@dataclasses.dataclass class CutTool: """ Simulates `cut` cli tool. """ def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the CatTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") @log() def cut_characters(self, file_path: str, character_ranges: str) -> str: """Reads a file and extracts characters based on specified ranges. Args: file_path: The name of the file to process. character_ranges: A string representing character ranges, e.g., "1-5,10". Returns: A string containing the selected characters from the file. """ if not is_file_in_root_folder(file_path, self.root_folder): raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.") ranges = parse_ranges(character_ranges) output = io.StringIO() try: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: for line in file: for i, char in enumerate(line, start=1): if is_in_ranges(i, ranges): output.write(char) # Optionally add a newline character after each line output.write("\n") except FileNotFoundError: tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {file_path} not found. Here are all the files you can see\n\n{tree_text}" return markdown_content return output.getvalue() @log() def cut_fields(self, filename: str, field_ranges: str, delimiter: str = ",") -> str: """Reads a file and extracts fields based on specified ranges using the given delimiter. Args: filename: The name of the file to process. field_ranges: A string representing field ranges, e.g., "1-3,5". delimiter: A single character used as the field delimiter. Returns: A string containing the selected fields from the file. """ if not is_file_in_root_folder(filename, self.root_folder): raise ValueError(f"File {filename} is not in root folder {self.root_folder}.") ranges = parse_ranges(field_ranges) output = io.StringIO() try: with open(filename, encoding="utf-8", errors=self.utf8_errors) as file: reader = csv.reader(file, delimiter=delimiter) for row in reader: selected_fields = [field for i, field in enumerate(row, start=1) if is_in_ranges(i, ranges)] output.write(delimiter.join(selected_fields) + "\n") except FileNotFoundError: # Host app should always have cwd == root dir. tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}" return markdown_content return output.getvalue() @log() def cut_fields_by_name(self, filename: str, field_names: list[str], delimiter: str = ",") -> str: """Reads a file and extracts fields based on specified field names using the given delimiter. Args: filename(str): The name of the file to process. field_names(list[str]): A list of field names to extract. delimiter(str): A single character used as the field delimiter. Returns: A string containing the selected fields from the file. """ if not is_file_in_root_folder(filename, self.root_folder): raise ValueError(f"File {filename} is not in root folder {self.root_folder}.") output = io.StringIO() try: with open(filename, encoding="utf-8", errors=self.utf8_errors) as file: reader = csv.DictReader(file, delimiter=delimiter) # field_indices = {field: i for i, field in enumerate(reader.fieldnames)} for row in reader: selected_fields = [row[field] for field in field_names if field in row] output.write(delimiter.join(selected_fields) + "\n") except FileNotFoundError: tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}" return markdown_content return output.getvalue()
Methods
def cut_characters(self, file_path: str, character_ranges: str) ‑> str
-
Reads a file and extracts characters based on specified ranges.
Args
file_path
- The name of the file to process.
character_ranges
- A string representing character ranges, e.g., "1-5,10".
Returns
A string containing the selected characters from the file.
Expand source code
@log() def cut_characters(self, file_path: str, character_ranges: str) -> str: """Reads a file and extracts characters based on specified ranges. Args: file_path: The name of the file to process. character_ranges: A string representing character ranges, e.g., "1-5,10". Returns: A string containing the selected characters from the file. """ if not is_file_in_root_folder(file_path, self.root_folder): raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.") ranges = parse_ranges(character_ranges) output = io.StringIO() try: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: for line in file: for i, char in enumerate(line, start=1): if is_in_ranges(i, ranges): output.write(char) # Optionally add a newline character after each line output.write("\n") except FileNotFoundError: tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {file_path} not found. Here are all the files you can see\n\n{tree_text}" return markdown_content return output.getvalue()
def cut_fields(self, filename: str, field_ranges: str, delimiter: str = ',') ‑> str
-
Reads a file and extracts fields based on specified ranges using the given delimiter.
Args
filename
- The name of the file to process.
field_ranges
- A string representing field ranges, e.g., "1-3,5".
delimiter
- A single character used as the field delimiter.
Returns
A string containing the selected fields from the file.
Expand source code
@log() def cut_fields(self, filename: str, field_ranges: str, delimiter: str = ",") -> str: """Reads a file and extracts fields based on specified ranges using the given delimiter. Args: filename: The name of the file to process. field_ranges: A string representing field ranges, e.g., "1-3,5". delimiter: A single character used as the field delimiter. Returns: A string containing the selected fields from the file. """ if not is_file_in_root_folder(filename, self.root_folder): raise ValueError(f"File {filename} is not in root folder {self.root_folder}.") ranges = parse_ranges(field_ranges) output = io.StringIO() try: with open(filename, encoding="utf-8", errors=self.utf8_errors) as file: reader = csv.reader(file, delimiter=delimiter) for row in reader: selected_fields = [field for i, field in enumerate(row, start=1) if is_in_ranges(i, ranges)] output.write(delimiter.join(selected_fields) + "\n") except FileNotFoundError: # Host app should always have cwd == root dir. tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}" return markdown_content return output.getvalue()
def cut_fields_by_name(self, filename: str, field_names: list[str], delimiter: str = ',') ‑> str
-
Reads a file and extracts fields based on specified field names using the given delimiter.
Args
filename(str): The name of the file to process. field_names(list[str]): A list of field names to extract. delimiter(str): A single character used as the field delimiter.
Returns
A string containing the selected fields from the file.
Expand source code
@log() def cut_fields_by_name(self, filename: str, field_names: list[str], delimiter: str = ",") -> str: """Reads a file and extracts fields based on specified field names using the given delimiter. Args: filename(str): The name of the file to process. field_names(list[str]): A list of field names to extract. delimiter(str): A single character used as the field delimiter. Returns: A string containing the selected fields from the file. """ if not is_file_in_root_folder(filename, self.root_folder): raise ValueError(f"File {filename} is not in root folder {self.root_folder}.") output = io.StringIO() try: with open(filename, encoding="utf-8", errors=self.utf8_errors) as file: reader = csv.DictReader(file, delimiter=delimiter) # field_indices = {field: i for i, field in enumerate(reader.fieldnames)} for row in reader: selected_fields = [row[field] for field in field_names if field in row] output.write(delimiter.join(selected_fields) + "\n") except FileNotFoundError: tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {filename} not found. Here are all the files you can see\n\n{tree_text}" return markdown_content return output.getvalue()
class DialogLoggerWithMarkdown (base_directory: str)
-
A class for logging dialog interactions in Markdown format.
Attributes
bot_name
:str
- Name of the bot.
model
:str
- Model used by the bot.
bot_instructions
:str
- Instructions or description for the bot.
base_directory
:str
- Base directory for storing log files.
Methods
write_header: Writes the header information to the log file. add_user: Logs the user's input text. add_bot: Logs the bot's response text. add_toolkit: Logs the tools used in the dialog. add_tool: Logs a single tool and its arguments used in the dialog. add_tool_result: Logs the results from a tool. add_error: Logs an error that occurred during the dialog. ensure_log: Context manager to ensure the log file is closed properly.
Initializes the DialogLoggerWithMarkdown object.
Args
base_directory
:str
, optional- Base directory for storing log files. Defaults to LOG_FOLDER if not provided.
Expand source code
class DialogLoggerWithMarkdown: """ A class for logging dialog interactions in Markdown format. Attributes: bot_name (str): Name of the bot. model (str): Model used by the bot. bot_instructions (str): Instructions or description for the bot. base_directory (str): Base directory for storing log files. Methods: write_header: Writes the header information to the log file. add_user: Logs the user's input text. add_bot: Logs the bot's response text. add_toolkit: Logs the tools used in the dialog. add_tool: Logs a single tool and its arguments used in the dialog. add_tool_result: Logs the results from a tool. add_error: Logs an error that occurred during the dialog. ensure_log: Context manager to ensure the log file is closed properly. """ def __init__(self, base_directory: str) -> None: """ Initializes the DialogLoggerWithMarkdown object. Args: base_directory (str, optional): Base directory for storing log files. Defaults to LOG_FOLDER if not provided. """ if not base_directory: raise ValueError("base_directory must be provided.") os.makedirs(base_directory, exist_ok=True) self.bot_name: Optional[str] = None self.model: Optional[str] = None self.bot_instructions: Optional[str] = None self.base_directory = base_directory log_files = [f for f in os.listdir(self.base_directory) if f.endswith(".md")] log_number = len(log_files) + 1 self.log_file_path = os.path.join(self.base_directory, f"dialog_{log_number}.md") os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True) # Context manager handles this, I think. # pylint: disable=consider-using-with self.log_file = open(self.log_file_path, "a", buffering=1, encoding="utf-8") self.pending_tools: dict[str, dict[str, str]] = {} def write_header(self, bot_name: str, model: str, bot_instructions: str) -> None: """ Writes the header information to the log file. Args: bot_name (str): Name of the bot. model (str): Model used by the bot. bot_instructions (str): Instructions or description for the bot. """ self.bot_name = bot_name self.model = model self.bot_instructions = bot_instructions header = f"# Bot Name: {self.bot_name}\n## Model: {self.model}\n### Instructions: {self.bot_instructions}\n\n" self.log_file.write(header) logger.info(header) def add_user(self, text: str) -> None: """ Logs the user's input text. Args: text (str): The text input by the user. """ users_message = f"**User**: {text}" self.log_file.write(f"{users_message}\n\n") logger.info(users_message) def add_bot(self, text: str) -> None: """ Logs the bot's response text. Args: text (str): The text response from the bot. """ bots_message = f"**Bot**: {text}" self.log_file.write(f"{bots_message}\n\n") logger.info(bots_message) def add_toolkit(self, tools: list[str]) -> None: """ Logs the tools used in the dialog. Args: tools (List[str]): A list of tool names used in the dialog. """ toolkit_str = "\n- ".join([f"`{tool}`" for tool in tools]) toolkit_message = f"**Toolkit**: \n\n- {toolkit_str}" self.log_file.write(f"{toolkit_message}\n\n") logger.info(toolkit_message.replace("\n", "")) def add_tool(self, tool_call_id: str, tool_name: str, tool_args: str) -> None: """ Logs a single tool and its arguments used in the dialog. Args: tool_call_id (str): The unique identifier for the tool call. tool_name (str): The name of the tool. tool_args (str): The arguments passed to the tool, in JSON string format. """ bot_wants = f"Bot wants to use `{tool_name}`\n" self.log_file.write(bot_wants) logger.info(bot_wants) try: json_bits = json.loads(tool_args) except BaseException: self.log_file.write(f"Bot gave us Bad JSON: {tool_args}") self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": tool_args} return args_lines: list[str] = [] for name, value in json_bits.items(): if value is not None: pair = f"{name} : {value}" args_lines.append(f" - {pair}\n") # logger.info(pair) self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": "\n".join(args_lines)} def add_tool_result(self, tool_results: list[dict[str, Any]]) -> None: """ Logs the results from a tool. Args: tool_results (List[Dict[str, Any]]): A list of dictionaries containing the tool results. """ # result should always be of the same dict type # tool_result = {"tool_call_id": tool_call.id, "output": json_result} for result in tool_results: self.log_file.write("### Result\n\n") self.log_file.write(f"Tool call Id: {result['tool_call_id']}\n") self.log_file.write(f"Tool name: {self.pending_tools[result['tool_call_id']]['tool_name']}\n") self.log_file.write(f"Tool args:\n {self.pending_tools[result['tool_call_id']]['tool_args']}\n") del self.pending_tools["tool_call_id"] json_string = result["output"] # json.loads here should work, it isn't bot-json any_type = json.loads(json_string) if isinstance(any_type, dict): if "type" in any_type and "title" in any_type and "status" in any_type and "detail" in any_type: self.log_file.write(f"ERROR: {any_type['type']} : {any_type['detail']}\n") else: for key, value in any_type.items(): self.log_file.write(f" - {key} : {value}\n") else: self.log_file.write(f"{any_type}\n") def add_error(self, error: Exception) -> None: """ Logs an error that occurred during the dialog. Args: error (Exception): The exception that was raised. """ error_message = f"**Error**: {error}" self.log_file.write(f"{error_message}\n\n") logger.error(error_message) @contextmanager def ensure_log(self) -> Iterator[None]: """ A context manager to ensure that the log file is closed properly. Yields: None: Yields control to the block of code using this context manager. Ensures: The log file is closed properly upon exiting the block of code. """ try: yield finally: self.log_file.close()
Methods
def add_bot(self, text: str) ‑> None
-
Logs the bot's response text.
Args
text
:str
- The text response from the bot.
Expand source code
def add_bot(self, text: str) -> None: """ Logs the bot's response text. Args: text (str): The text response from the bot. """ bots_message = f"**Bot**: {text}" self.log_file.write(f"{bots_message}\n\n") logger.info(bots_message)
def add_error(self, error: Exception) ‑> None
-
Logs an error that occurred during the dialog.
Args
error
:Exception
- The exception that was raised.
Expand source code
def add_error(self, error: Exception) -> None: """ Logs an error that occurred during the dialog. Args: error (Exception): The exception that was raised. """ error_message = f"**Error**: {error}" self.log_file.write(f"{error_message}\n\n") logger.error(error_message)
def add_tool(self, tool_call_id: str, tool_name: str, tool_args: str) ‑> None
-
Logs a single tool and its arguments used in the dialog.
Args
tool_call_id
:str
- The unique identifier for the tool call.
tool_name
:str
- The name of the tool.
tool_args
:str
- The arguments passed to the tool, in JSON string format.
Expand source code
def add_tool(self, tool_call_id: str, tool_name: str, tool_args: str) -> None: """ Logs a single tool and its arguments used in the dialog. Args: tool_call_id (str): The unique identifier for the tool call. tool_name (str): The name of the tool. tool_args (str): The arguments passed to the tool, in JSON string format. """ bot_wants = f"Bot wants to use `{tool_name}`\n" self.log_file.write(bot_wants) logger.info(bot_wants) try: json_bits = json.loads(tool_args) except BaseException: self.log_file.write(f"Bot gave us Bad JSON: {tool_args}") self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": tool_args} return args_lines: list[str] = [] for name, value in json_bits.items(): if value is not None: pair = f"{name} : {value}" args_lines.append(f" - {pair}\n") # logger.info(pair) self.pending_tools[tool_call_id] = {"tool_name": tool_name, "tool_args": "\n".join(args_lines)}
def add_tool_result(self, tool_results: list[dict[str, typing.Any]]) ‑> None
-
Logs the results from a tool.
Args
tool_results
:List[Dict[str, Any]]
- A list of dictionaries containing the tool results.
Expand source code
def add_tool_result(self, tool_results: list[dict[str, Any]]) -> None: """ Logs the results from a tool. Args: tool_results (List[Dict[str, Any]]): A list of dictionaries containing the tool results. """ # result should always be of the same dict type # tool_result = {"tool_call_id": tool_call.id, "output": json_result} for result in tool_results: self.log_file.write("### Result\n\n") self.log_file.write(f"Tool call Id: {result['tool_call_id']}\n") self.log_file.write(f"Tool name: {self.pending_tools[result['tool_call_id']]['tool_name']}\n") self.log_file.write(f"Tool args:\n {self.pending_tools[result['tool_call_id']]['tool_args']}\n") del self.pending_tools["tool_call_id"] json_string = result["output"] # json.loads here should work, it isn't bot-json any_type = json.loads(json_string) if isinstance(any_type, dict): if "type" in any_type and "title" in any_type and "status" in any_type and "detail" in any_type: self.log_file.write(f"ERROR: {any_type['type']} : {any_type['detail']}\n") else: for key, value in any_type.items(): self.log_file.write(f" - {key} : {value}\n") else: self.log_file.write(f"{any_type}\n")
def add_toolkit(self, tools: list[str]) ‑> None
-
Logs the tools used in the dialog.
Args
tools
:List[str]
- A list of tool names used in the dialog.
Expand source code
def add_toolkit(self, tools: list[str]) -> None: """ Logs the tools used in the dialog. Args: tools (List[str]): A list of tool names used in the dialog. """ toolkit_str = "\n- ".join([f"`{tool}`" for tool in tools]) toolkit_message = f"**Toolkit**: \n\n- {toolkit_str}" self.log_file.write(f"{toolkit_message}\n\n") logger.info(toolkit_message.replace("\n", ""))
def add_user(self, text: str) ‑> None
-
Logs the user's input text.
Args
text
:str
- The text input by the user.
Expand source code
def add_user(self, text: str) -> None: """ Logs the user's input text. Args: text (str): The text input by the user. """ users_message = f"**User**: {text}" self.log_file.write(f"{users_message}\n\n") logger.info(users_message)
def ensure_log(self) ‑> collections.abc.Iterator[None]
-
A context manager to ensure that the log file is closed properly.
Yields
None
- Yields control to the block of code using this context manager.
Ensures
The log file is closed properly upon exiting the block of code.
Expand source code
@contextmanager def ensure_log(self) -> Iterator[None]: """ A context manager to ensure that the log file is closed properly. Yields: None: Yields control to the block of code using this context manager. Ensures: The log file is closed properly upon exiting the block of code. """ try: yield finally: self.log_file.close()
def write_header(self, bot_name: str, model: str, bot_instructions: str) ‑> None
-
Writes the header information to the log file.
Args
bot_name
:str
- Name of the bot.
model
:str
- Model used by the bot.
bot_instructions
:str
- Instructions or description for the bot.
Expand source code
def write_header(self, bot_name: str, model: str, bot_instructions: str) -> None: """ Writes the header information to the log file. Args: bot_name (str): Name of the bot. model (str): Model used by the bot. bot_instructions (str): Instructions or description for the bot. """ self.bot_name = bot_name self.model = model self.bot_instructions = bot_instructions header = f"# Bot Name: {self.bot_name}\n## Model: {self.model}\n### Instructions: {self.bot_instructions}\n\n" self.log_file.write(header) logger.info(header)
class EdTool (root_folder: str, config: Config)
-
A python version of ed.
Initialize the EdTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class EdTool: """A python version of ed.""" def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the EdTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder if root_folder.endswith("/") else root_folder + "/" self.buffer = Buffer() self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") @log() def ed(self, script: str, file_name: str) -> list[str]: """A python version of ed. Args: script (str): Ed commands to run. file_name (str): Script creates or edits this file. Returns: list[str]: The output of the script. """ commands = script.split("\n") file_name = sanitize_path(file_name) file_path = self.root_folder + file_name if os.path.isfile(file_path): with open(file_path, encoding="utf-8", errors=self.utf8_errors) as f: self.buffer = Buffer(f.readlines()) command_lines: list[tuple[Any, list[str]]] = [] current_command = None current_list: list[str] = [] for command, upcoming in zip(commands, commands[1:], strict=False): command = command.strip() if command.startswith("# "): # comments, because this is unreadable without them continue if not command: # blank commands are noise continue if command.startswith("e "): # can't really load right now continue if command == ".": if current_command is not None: command_lines.append((current_command, current_list)) self._run_commands(command_lines) current_command = None current_list = [] command_lines = [] continue if command == "q": # stop processing commands break if command == "w": with open(file_path, "w", encoding="utf-8") as f: f.writelines(self.buffer.lines) continue if command.startswith("> "): # always a line current_list.append(f"{command[2:]}\n") else: if upcoming is None or not upcoming.startswith("> "): current_command = command command_lines.append((current_command, current_list)) current_command = command current_list = [] # always be running commands unless we're add/editing a line next. if upcoming is None: # last command, probably a q self._run_commands(command_lines) elif current_list or upcoming.startswith("> "): # waiting for . or expecting more lines pass else: self._run_commands(command_lines) return self.buffer.lines def _run_commands(self, command_lines: (list[tuple[Any, list[str]]])) -> None: """Run the commands. Args: command_lines (list[tuple[Any, list[str]]]): The commands to run. """ for command_now, lines_now in command_lines: print(command_now, "\n".join(lines_now)) self.buffer.run(command_now, lines_now)
Methods
def ed(self, script: str, file_name: str) ‑> list[str]
-
A python version of ed.
Args
script
:str
- Ed commands to run.
file_name
:str
- Script creates or edits this file.
Returns
list[str]
- The output of the script.
Expand source code
@log() def ed(self, script: str, file_name: str) -> list[str]: """A python version of ed. Args: script (str): Ed commands to run. file_name (str): Script creates or edits this file. Returns: list[str]: The output of the script. """ commands = script.split("\n") file_name = sanitize_path(file_name) file_path = self.root_folder + file_name if os.path.isfile(file_path): with open(file_path, encoding="utf-8", errors=self.utf8_errors) as f: self.buffer = Buffer(f.readlines()) command_lines: list[tuple[Any, list[str]]] = [] current_command = None current_list: list[str] = [] for command, upcoming in zip(commands, commands[1:], strict=False): command = command.strip() if command.startswith("# "): # comments, because this is unreadable without them continue if not command: # blank commands are noise continue if command.startswith("e "): # can't really load right now continue if command == ".": if current_command is not None: command_lines.append((current_command, current_list)) self._run_commands(command_lines) current_command = None current_list = [] command_lines = [] continue if command == "q": # stop processing commands break if command == "w": with open(file_path, "w", encoding="utf-8") as f: f.writelines(self.buffer.lines) continue if command.startswith("> "): # always a line current_list.append(f"{command[2:]}\n") else: if upcoming is None or not upcoming.startswith("> "): current_command = command command_lines.append((current_command, current_list)) current_command = command current_list = [] # always be running commands unless we're add/editing a line next. if upcoming is None: # last command, probably a q self._run_commands(command_lines) elif current_list or upcoming.startswith("> "): # waiting for . or expecting more lines pass else: self._run_commands(command_lines) return self.buffer.lines
class EdlinTool (root_folder: str, config: Config)
-
Initialize the EdlinTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class EdlinTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the EdlinTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder if root_folder.endswith("/") else root_folder + "/" self.config = config self.auto_cat = config.get_flag("auto_cat", True) @log() def edlin(self, script: str, file_name: str) -> list[str]: r"""An improved version of the edlin. Args: script (str): Edlin commands to run. file_name (str): Script creates or edits this file. Returns: list[str]: The output of the script. """ file_name = sanitize_path(file_name) if VERBOSE: config = dedlin.configure_logging() logging.config.dictConfig(config) logger.info("Verbose mode enabled") logger.info("Plain mode. UI should be dull.") output: list[str] = [] the_command_generator = StringCommandGenerator(script) editor = dedlin.Dedlin( inputter=the_command_generator, # These should be blank, insert and edit only from the commands. insert_document_inputter=EmptyStringGenerator(), edit_document_inputter=EmptyStringGenerator(), outputter=lambda text, end=None: output.append(text), headless=True, untrusted_user=True, history=True, ) # No interaction, bot can't recover, answer questions, see realtime trace! editor.halt_on_error = True editor.quit_safety = False editor.echo = False # joke! editor.vim_mode = False # This logging is not for a bot! editor.verbose = VERBOSE # pylint: disable=broad-except try: # to broad? Don't register this hook. # sys.excepthook = editor.save_on_crash editor.entry_point( file_name, ) # Bot often forgets to save. editor.save_document_safe() # must have quit. except Exception as the_exception: editor.save_on_crash(type(the_exception), the_exception, None) logger.error(traceback.format_exc()) # output.append(traceback.format_exc()) editor.save_document_safe() raise editor.save_document_safe() editor.final_report() if self.auto_cat: feedback = "\n".join(output) contents = CatTool(self.root_folder, self.config).cat_markdown([file_name]) return [f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"] return output
Methods
def edlin(self, script: str, file_name: str) ‑> list[str]
-
An improved version of the edlin.
Args
script
:str
- Edlin commands to run.
file_name
:str
- Script creates or edits this file.
Returns
list[str]
- The output of the script.
Expand source code
@log() def edlin(self, script: str, file_name: str) -> list[str]: r"""An improved version of the edlin. Args: script (str): Edlin commands to run. file_name (str): Script creates or edits this file. Returns: list[str]: The output of the script. """ file_name = sanitize_path(file_name) if VERBOSE: config = dedlin.configure_logging() logging.config.dictConfig(config) logger.info("Verbose mode enabled") logger.info("Plain mode. UI should be dull.") output: list[str] = [] the_command_generator = StringCommandGenerator(script) editor = dedlin.Dedlin( inputter=the_command_generator, # These should be blank, insert and edit only from the commands. insert_document_inputter=EmptyStringGenerator(), edit_document_inputter=EmptyStringGenerator(), outputter=lambda text, end=None: output.append(text), headless=True, untrusted_user=True, history=True, ) # No interaction, bot can't recover, answer questions, see realtime trace! editor.halt_on_error = True editor.quit_safety = False editor.echo = False # joke! editor.vim_mode = False # This logging is not for a bot! editor.verbose = VERBOSE # pylint: disable=broad-except try: # to broad? Don't register this hook. # sys.excepthook = editor.save_on_crash editor.entry_point( file_name, ) # Bot often forgets to save. editor.save_document_safe() # must have quit. except Exception as the_exception: editor.save_on_crash(type(the_exception), the_exception, None) logger.error(traceback.format_exc()) # output.append(traceback.format_exc()) editor.save_document_safe() raise editor.save_document_safe() editor.final_report() if self.auto_cat: feedback = "\n".join(output) contents = CatTool(self.root_folder, self.config).cat_markdown([file_name]) return [f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}"] return output
class FindTool (root_folder: str, config: Config)
-
Initialize the FindTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class FindTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the FindTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) @log() def find_files( self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None, ) -> list[str]: """ Recursively search for files or directories matching given criteria in a directory and its subdirectories. Args: name (Optional[str]): The exact name to match filenames against. regex (Optional[str]): The regex pattern to match filenames against. file_type (Optional[str]): The type to filter ('file' or 'directory'). size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes. Returns: list[str]: A list of paths to files or directories that match the criteria. """ logger.info(f"find --name {name} --regex {regex} --type {file_type} --size {size}") matching_files = [] for root, dirs, files in os.walk(os.getcwd()): # Combine files and directories for type filtering combined = files if file_type == "directory": combined += dirs for entry in combined: full_path = os.path.join(root, entry) # TODO: handle this differently if "__pycache__" not in full_path: # TODO: handle differently. The bot # is put into the root_folder as cwd, so as long as there isn't .. in path we should be good. # if is_file_in_root_folder(full_path, self.root_folder): short_path = remove_root_folder(full_path, self.root_folder) # Check for name, regex, and size match if (name and fnmatch.fnmatch(entry, name)) or name is None: if self._match_type_and_size(full_path, file_type, size): matching_files.append(short_path) elif regex and re.search(regex, entry): if self._match_type_and_size(full_path, file_type, size): matching_files.append(short_path) # Not the best way to remove hidden. return list(sorted(_ for _ in matching_files if not _.startswith("."))) def _match_type_and_size(self, path: str, file_type: Optional[str], size: Optional[str]) -> bool: """ Check if a file/directory matches the specified type and size criteria. Args: path (str): The path to the file/directory. file_type (Optional[str]): The type to filter ('file' or 'directory'). size (Optional[str]): The size to filter files by. Returns: bool: True if the file/directory matches the criteria, False otherwise. """ if file_type: if file_type == "file" and not os.path.isfile(path): return False if file_type == "directory" and not os.path.isdir(path): return False if size: size_prefix = size[0] size_value = int(size[1:]) file_size = os.path.getsize(path) if size_prefix == "+" and file_size <= size_value: return False if size_prefix == "-" and file_size >= size_value: return False return True @log() def find_files_markdown( self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None, ) -> str: """ Recursively search for files or directories matching given criteria in a directory and its subdirectories. Args: name (Optional[str]): The exact name to match filenames against. regex (Optional[str]): The regex pattern to match filenames against. file_type (Optional[str]): The type to filter ('file' or 'directory'). size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes. Returns: str: Markdown of paths to files or directories that match the criteria. """ output = StringIO() results = self.find_files(name, regex, file_type, size) for item in results: output.write(item) output.write("\n") output.seek(0) return output.read()
Methods
def find_files(self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None) ‑> list[str]
-
Recursively search for files or directories matching given criteria in a directory and its subdirectories.
Args
name
:Optional[str]
- The exact name to match filenames against.
regex
:Optional[str]
- The regex pattern to match filenames against.
file_type
:Optional[str]
- The type to filter ('file' or 'directory').
size
:Optional[str]
- The size to filter files by, e.g., '+100' for files larger than 100 bytes.
Returns
list[str]
- A list of paths to files or directories that match the criteria.
Expand source code
@log() def find_files( self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None, ) -> list[str]: """ Recursively search for files or directories matching given criteria in a directory and its subdirectories. Args: name (Optional[str]): The exact name to match filenames against. regex (Optional[str]): The regex pattern to match filenames against. file_type (Optional[str]): The type to filter ('file' or 'directory'). size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes. Returns: list[str]: A list of paths to files or directories that match the criteria. """ logger.info(f"find --name {name} --regex {regex} --type {file_type} --size {size}") matching_files = [] for root, dirs, files in os.walk(os.getcwd()): # Combine files and directories for type filtering combined = files if file_type == "directory": combined += dirs for entry in combined: full_path = os.path.join(root, entry) # TODO: handle this differently if "__pycache__" not in full_path: # TODO: handle differently. The bot # is put into the root_folder as cwd, so as long as there isn't .. in path we should be good. # if is_file_in_root_folder(full_path, self.root_folder): short_path = remove_root_folder(full_path, self.root_folder) # Check for name, regex, and size match if (name and fnmatch.fnmatch(entry, name)) or name is None: if self._match_type_and_size(full_path, file_type, size): matching_files.append(short_path) elif regex and re.search(regex, entry): if self._match_type_and_size(full_path, file_type, size): matching_files.append(short_path) # Not the best way to remove hidden. return list(sorted(_ for _ in matching_files if not _.startswith(".")))
def find_files_markdown(self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None) ‑> str
-
Recursively search for files or directories matching given criteria in a directory and its subdirectories.
Args
name
:Optional[str]
- The exact name to match filenames against.
regex
:Optional[str]
- The regex pattern to match filenames against.
file_type
:Optional[str]
- The type to filter ('file' or 'directory').
size
:Optional[str]
- The size to filter files by, e.g., '+100' for files larger than 100 bytes.
Returns
str
- Markdown of paths to files or directories that match the criteria.
Expand source code
@log() def find_files_markdown( self, name: Optional[str] = None, regex: Optional[str] = None, file_type: Optional[str] = None, size: Optional[str] = None, ) -> str: """ Recursively search for files or directories matching given criteria in a directory and its subdirectories. Args: name (Optional[str]): The exact name to match filenames against. regex (Optional[str]): The regex pattern to match filenames against. file_type (Optional[str]): The type to filter ('file' or 'directory'). size (Optional[str]): The size to filter files by, e.g., '+100' for files larger than 100 bytes. Returns: str: Markdown of paths to files or directories that match the criteria. """ output = StringIO() results = self.find_files(name, regex, file_type, size) for item in results: output.write(item) output.write("\n") output.seek(0) return output.read()
class GitTool (root_folder: str, config: Config)
-
Initialize the GitTool class.
Args
root_folder
:str
- The root folder path for repo operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class GitTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the GitTool class. Args: root_folder (str): The root folder path for repo operations. config (Config): The developer input that bot shouldn't set. """ # Initialize the repository self.repo_path = root_folder self.repo = Repo(root_folder) self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") def is_ignored_by_gitignore(self, file_path: str, gitignore_path: str = ".gitignore") -> bool: """ Check if a file is ignored by .gitignore. Args: file_path (str): The path of the file to check. gitignore_path (str): The path to the .gitignore file. Defaults to '.gitignore' in the current directory. Returns: bool: True if the file is ignored, False otherwise. Raises: FileNotFoundError: If the .gitignore file is not found. """ # Resolve the full path to the .gitignore file full_gitignore_path = os.path.join(self.repo_path, gitignore_path) if not os.path.isfile(full_gitignore_path): raise FileNotFoundError(f"No .gitignore file found at {full_gitignore_path}") # Normalize file path file_path = os.path.abspath(file_path) with open(full_gitignore_path, encoding="utf-8", errors=self.utf8_errors) as gitignore: for line in gitignore: line = line.strip() # Skip empty lines and comments if not line or line.startswith("#"): continue # Convert the .gitignore pattern to a glob pattern gitignore_pattern = os.path.join(os.path.dirname(gitignore_path), line) if fnmatch.fnmatch(file_path, gitignore_pattern): return True return False @log() def git_status(self) -> dict[str, Any]: """Returns the status of the repository. Returns: dict[str, Any]: Structured `git status` response """ logger.info("git status") changed_files = [item.a_path for item in self.repo.index.diff(None)] untracked_files = self.repo.untracked_files return {"changed_files": changed_files, "untracked_files": untracked_files} @log() def get_current_branch( self, ) -> str: """ Retrieves the current branch name of the repository. Returns: str: The current branch name. """ logger.info("git branch --show-current") return self.repo.active_branch.name @log() def get_recent_commits(self, n: int = 10, short_hash: bool = False) -> list[dict[str, Any]]: """ Retrieves the most recent commit hashes from the current branch. Args: n (int, optional): The number of recent commits to retrieve. Defaults to 10. short_hash (bool, optional): If True, return short hashes; otherwise, return full hashes. Defaults to False. Returns: list[dict[str, Any]]: A list of dictionaries, each containing 'short_hash' and 'full_hash' keys (if short_hash is True), or only 'full_hash' (if short_hash is False), representing the commit hashes. """ logger.info(f"git log --pretty=format:%H -n {n}") current_branch = self.get_current_branch() commits = list(self.repo.iter_commits(current_branch, max_count=n)) if short_hash: return [{"short_hash": commit.hexsha[:7], "full_hash": commit.hexsha} for commit in commits] return [{"full_hash": commit.hexsha} for commit in commits] @log() def git_diff(self) -> list[dict[str, Any]]: """Returns the differences in the working directory. Returns: list[dict[str, Any]]: Structured `git diff` response """ logger.info("git diff --name-only") diffs = self.repo.git.diff("HEAD", name_only=True).splitlines() return [{"file": diff} for diff in diffs] @log() def git_log_file(self, filename: str) -> list[dict[str, Any]]: """Returns the commit history for a specific file. Args: filename (str): The path to the file. Returns: list[dict[str, Any]]: Structured `git log` response """ logger.info(f"git log --pretty=format:%H -n 1 {filename}") commits = self.repo.git.log("--pretty=format:%H - %an, %ar : %s", filename).splitlines() return [{"commit": commit} for commit in commits] @log() def git_log_search(self, search_string: str) -> list[dict[str, Any]]: """Returns the commit history that matches the search string. Args: search_string (str): The search string. Returns: list of dict: Structured `git log` response """ logger.info(f"git log --pretty=format:%H -S {search_string}") commits = self.repo.git.log("-S", search_string, "--pretty=format:%H - %an, %ar : %s").splitlines() return [{"commit": commit} for commit in commits] @log() def git_show(self) -> list[dict[str, Any]]: """Shows various types of objects (commits, tags, etc.). Returns: list[dict[str, Any]]: Structured `git show` response """ logger.info("git show --pretty=format:%H -n 1") show_data = self.repo.git.show("--pretty=format:%H - %an, %ar : %s", n=1).splitlines() return [{"data": data} for data in show_data] @log() def git_diff_commit(self, commit1: str, commit2: str) -> list[dict[str, Any]]: """Shows changes between two commits. Args: commit1 (str): First commit commit2 (str): Second commit Returns: list[dict[str, Any]]: Structured `git diff` response """ logger.info(f"git diff --name-only {commit1} {commit2}") diffs = self.repo.git.diff(commit1, commit2, name_only=True).splitlines() return [{"file": diff} for diff in diffs]
Methods
def get_current_branch(self) ‑> str
-
Retrieves the current branch name of the repository.
Returns
str
- The current branch name.
Expand source code
@log() def get_current_branch( self, ) -> str: """ Retrieves the current branch name of the repository. Returns: str: The current branch name. """ logger.info("git branch --show-current") return self.repo.active_branch.name
def get_recent_commits(self, n: int = 10, short_hash: bool = False) ‑> list[dict[str, typing.Any]]
-
Retrieves the most recent commit hashes from the current branch.
Args
n
:int
, optional- The number of recent commits to retrieve. Defaults to 10.
short_hash
:bool
, optional- If True, return short hashes; otherwise, return full hashes. Defaults to False.
Returns
list[dict[str, Any]]
- A list of dictionaries, each containing 'short_hash' and 'full_hash' keys (if short_hash is True), or only 'full_hash' (if short_hash is False), representing the commit hashes.
Expand source code
@log() def get_recent_commits(self, n: int = 10, short_hash: bool = False) -> list[dict[str, Any]]: """ Retrieves the most recent commit hashes from the current branch. Args: n (int, optional): The number of recent commits to retrieve. Defaults to 10. short_hash (bool, optional): If True, return short hashes; otherwise, return full hashes. Defaults to False. Returns: list[dict[str, Any]]: A list of dictionaries, each containing 'short_hash' and 'full_hash' keys (if short_hash is True), or only 'full_hash' (if short_hash is False), representing the commit hashes. """ logger.info(f"git log --pretty=format:%H -n {n}") current_branch = self.get_current_branch() commits = list(self.repo.iter_commits(current_branch, max_count=n)) if short_hash: return [{"short_hash": commit.hexsha[:7], "full_hash": commit.hexsha} for commit in commits] return [{"full_hash": commit.hexsha} for commit in commits]
def git_diff(self) ‑> list[dict[str, typing.Any]]
-
Returns the differences in the working directory.
Returns
list[dict[str, Any]]
- Structured
git diff
response
Expand source code
@log() def git_diff(self) -> list[dict[str, Any]]: """Returns the differences in the working directory. Returns: list[dict[str, Any]]: Structured `git diff` response """ logger.info("git diff --name-only") diffs = self.repo.git.diff("HEAD", name_only=True).splitlines() return [{"file": diff} for diff in diffs]
def git_diff_commit(self, commit1: str, commit2: str) ‑> list[dict[str, typing.Any]]
-
Shows changes between two commits.
Args
commit1
:str
- First commit
commit2
:str
- Second commit
Returns
list[dict[str, Any]]
- Structured
git diff
response
Expand source code
@log() def git_diff_commit(self, commit1: str, commit2: str) -> list[dict[str, Any]]: """Shows changes between two commits. Args: commit1 (str): First commit commit2 (str): Second commit Returns: list[dict[str, Any]]: Structured `git diff` response """ logger.info(f"git diff --name-only {commit1} {commit2}") diffs = self.repo.git.diff(commit1, commit2, name_only=True).splitlines() return [{"file": diff} for diff in diffs]
def git_log_file(self, filename: str) ‑> list[dict[str, typing.Any]]
-
Returns the commit history for a specific file.
Args
filename
:str
- The path to the file.
Returns
list[dict[str, Any]]
- Structured
git log
response
Expand source code
@log() def git_log_file(self, filename: str) -> list[dict[str, Any]]: """Returns the commit history for a specific file. Args: filename (str): The path to the file. Returns: list[dict[str, Any]]: Structured `git log` response """ logger.info(f"git log --pretty=format:%H -n 1 {filename}") commits = self.repo.git.log("--pretty=format:%H - %an, %ar : %s", filename).splitlines() return [{"commit": commit} for commit in commits]
def git_log_search(self, search_string: str) ‑> list[dict[str, typing.Any]]
-
Returns the commit history that matches the search string.
Args
search_string
:str
- The search string.
Returns
list
ofdict
- Structured
git log
response
Expand source code
@log() def git_log_search(self, search_string: str) -> list[dict[str, Any]]: """Returns the commit history that matches the search string. Args: search_string (str): The search string. Returns: list of dict: Structured `git log` response """ logger.info(f"git log --pretty=format:%H -S {search_string}") commits = self.repo.git.log("-S", search_string, "--pretty=format:%H - %an, %ar : %s").splitlines() return [{"commit": commit} for commit in commits]
def git_show(self) ‑> list[dict[str, typing.Any]]
-
Shows various types of objects (commits, tags, etc.).
Returns
list[dict[str, Any]]
- Structured
git show
response
Expand source code
@log() def git_show(self) -> list[dict[str, Any]]: """Shows various types of objects (commits, tags, etc.). Returns: list[dict[str, Any]]: Structured `git show` response """ logger.info("git show --pretty=format:%H -n 1") show_data = self.repo.git.show("--pretty=format:%H - %an, %ar : %s", n=1).splitlines() return [{"data": data} for data in show_data]
def git_status(self) ‑> dict[str, typing.Any]
-
Returns the status of the repository.
Returns
dict[str, Any]
- Structured
git status
response
Expand source code
@log() def git_status(self) -> dict[str, Any]: """Returns the status of the repository. Returns: dict[str, Any]: Structured `git status` response """ logger.info("git status") changed_files = [item.a_path for item in self.repo.index.diff(None)] untracked_files = self.repo.untracked_files return {"changed_files": changed_files, "untracked_files": untracked_files}
def is_ignored_by_gitignore(self, file_path: str, gitignore_path: str = '.gitignore') ‑> bool
-
Check if a file is ignored by .gitignore.
Args
file_path
:str
- The path of the file to check.
gitignore_path
:str
- The path to the .gitignore file. Defaults to '.gitignore' in the current directory.
Returns
bool
- True if the file is ignored, False otherwise.
Raises
FileNotFoundError
- If the .gitignore file is not found.
Expand source code
def is_ignored_by_gitignore(self, file_path: str, gitignore_path: str = ".gitignore") -> bool: """ Check if a file is ignored by .gitignore. Args: file_path (str): The path of the file to check. gitignore_path (str): The path to the .gitignore file. Defaults to '.gitignore' in the current directory. Returns: bool: True if the file is ignored, False otherwise. Raises: FileNotFoundError: If the .gitignore file is not found. """ # Resolve the full path to the .gitignore file full_gitignore_path = os.path.join(self.repo_path, gitignore_path) if not os.path.isfile(full_gitignore_path): raise FileNotFoundError(f"No .gitignore file found at {full_gitignore_path}") # Normalize file path file_path = os.path.abspath(file_path) with open(full_gitignore_path, encoding="utf-8", errors=self.utf8_errors) as gitignore: for line in gitignore: line = line.strip() # Skip empty lines and comments if not line or line.startswith("#"): continue # Convert the .gitignore pattern to a glob pattern gitignore_pattern = os.path.join(os.path.dirname(gitignore_path), line) if fnmatch.fnmatch(file_path, gitignore_pattern): return True return False
class GrepTool (root_folder: str, config: Config)
-
A tool for searching files using regular expressions.
Initialize the GrepTool with a root folder.
Args
root_folder
:str
- The root folder to search within.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class GrepTool: """A tool for searching files using regular expressions.""" def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the GrepTool with a root folder. Args: root_folder (str): The root folder to search within. config (Config): The developer input that bot shouldn't set. """ self.root_folder: str = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") @log() def grep_markdown( self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches: int = -1 ) -> str: """ Search for lines matching a regular expression in files and returns markdown formatted results. Args: regex (str): A regular expression string to search for. glob_pattern (str): A glob pattern string to specify files. skip_first_matches (int): Number of initial matches to skip. maximum_matches (int): Maximum number of matches to return. Returns: str: Markdown formatted string of grep results. """ results = self.grep(regex, glob_pattern, skip_first_matches, maximum_matches) matches_found = results.matches_found output = StringIO() for file_match in results.data: output.write(file_match.filename + "\n") for match in file_match.found: output.write(f"line {match.line_number}: {match.line}\n") output.write( f"{matches_found} matches found and {min(matches_found, maximum_matches) if maximum_matches != -1 else matches_found} displayed. " f"Skipped {skip_first_matches}\n" ) output.seek(0) return output.read() @log() def grep( self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches_per_file: int = -1, maximum_matches_total: int = -1, ) -> GrepResults: """ Search for lines matching a regular expression in files specified by a glob pattern. Args: regex (str): A regular expression string to search for. glob_pattern (str): A glob pattern string to specify files. skip_first_matches (int): Number of initial matches to skip. maximum_matches_per_file (int): Maximum number of matches to return for one file. maximum_matches_total (int): Maximum number of matches to return total. Returns: GrepResults: The results of the grep operation. """ logger.info( f"grep --regex {regex} --glob_pattern {glob_pattern} " f"--skip_first_matches {skip_first_matches} " f"--maximum_matches_total {maximum_matches_total} " f"--maximum_matches_per_file {maximum_matches_per_file}" ) pattern = re.compile(regex) matches_total = 0 skip_count = 0 if skip_first_matches < 0 else skip_first_matches results = GrepResults(matches_found=-1) for filename in glob.glob(glob_pattern, root_dir=self.root_folder, recursive=True): matches_per_file = 0 if os.path.isdir(filename): logging.warning(f"Skipping directory {filename}, because it isn't a file.") continue if not os.path.exists(filename): # What a hack open_path = self.root_folder + "/" + filename else: open_path = filename with open(open_path, encoding="utf-8", errors=self.utf8_errors) as file: if not is_file_in_root_folder(filename, self.root_folder): logging.warning(f"Skipping file {filename}, because it isn't in the root folder.") continue line_number = 0 for line in file: below_maximum = matches_per_file < maximum_matches_per_file maximum_not_set = maximum_matches_per_file == -1 if below_maximum or maximum_not_set: line_number += 1 if pattern.search(line): matches_total += 1 matches_per_file += 1 if matches_total <= (matches_total + skip_count) or matches_total == -1: if (0 < skip_first_matches < matches_total) or skip_first_matches == -1: # This creates names like \..\..\..\ etc. minimal_filename = remove_root_folder(filename, self.root_folder) # avoid double count found = next((fm for fm in results.data if fm.filename == minimal_filename), None) if not found: found = FileMatches(filename=minimal_filename) results.data.append(found) found.found.append(Match(line_number=line_number, line=line.strip())) results.data = list(sorted(results.data, key=lambda x: x.filename)) results.matches_found = matches_total return results
Methods
def grep(self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches_per_file: int = -1, maximum_matches_total: int = -1) ‑> GrepResults
-
Search for lines matching a regular expression in files specified by a glob pattern.
Args
regex
:str
- A regular expression string to search for.
glob_pattern
:str
- A glob pattern string to specify files.
skip_first_matches
:int
- Number of initial matches to skip.
maximum_matches_per_file
:int
- Maximum number of matches to return for one file.
maximum_matches_total
:int
- Maximum number of matches to return total.
Returns
GrepResults
- The results of the grep operation.
Expand source code
@log() def grep( self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches_per_file: int = -1, maximum_matches_total: int = -1, ) -> GrepResults: """ Search for lines matching a regular expression in files specified by a glob pattern. Args: regex (str): A regular expression string to search for. glob_pattern (str): A glob pattern string to specify files. skip_first_matches (int): Number of initial matches to skip. maximum_matches_per_file (int): Maximum number of matches to return for one file. maximum_matches_total (int): Maximum number of matches to return total. Returns: GrepResults: The results of the grep operation. """ logger.info( f"grep --regex {regex} --glob_pattern {glob_pattern} " f"--skip_first_matches {skip_first_matches} " f"--maximum_matches_total {maximum_matches_total} " f"--maximum_matches_per_file {maximum_matches_per_file}" ) pattern = re.compile(regex) matches_total = 0 skip_count = 0 if skip_first_matches < 0 else skip_first_matches results = GrepResults(matches_found=-1) for filename in glob.glob(glob_pattern, root_dir=self.root_folder, recursive=True): matches_per_file = 0 if os.path.isdir(filename): logging.warning(f"Skipping directory {filename}, because it isn't a file.") continue if not os.path.exists(filename): # What a hack open_path = self.root_folder + "/" + filename else: open_path = filename with open(open_path, encoding="utf-8", errors=self.utf8_errors) as file: if not is_file_in_root_folder(filename, self.root_folder): logging.warning(f"Skipping file {filename}, because it isn't in the root folder.") continue line_number = 0 for line in file: below_maximum = matches_per_file < maximum_matches_per_file maximum_not_set = maximum_matches_per_file == -1 if below_maximum or maximum_not_set: line_number += 1 if pattern.search(line): matches_total += 1 matches_per_file += 1 if matches_total <= (matches_total + skip_count) or matches_total == -1: if (0 < skip_first_matches < matches_total) or skip_first_matches == -1: # This creates names like \..\..\..\ etc. minimal_filename = remove_root_folder(filename, self.root_folder) # avoid double count found = next((fm for fm in results.data if fm.filename == minimal_filename), None) if not found: found = FileMatches(filename=minimal_filename) results.data.append(found) found.found.append(Match(line_number=line_number, line=line.strip())) results.data = list(sorted(results.data, key=lambda x: x.filename)) results.matches_found = matches_total return results
def grep_markdown(self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches: int = -1) ‑> str
-
Search for lines matching a regular expression in files and returns markdown formatted results.
Args
regex
:str
- A regular expression string to search for.
glob_pattern
:str
- A glob pattern string to specify files.
skip_first_matches
:int
- Number of initial matches to skip.
maximum_matches
:int
- Maximum number of matches to return.
Returns
str
- Markdown formatted string of grep results.
Expand source code
@log() def grep_markdown( self, regex: str, glob_pattern: str, skip_first_matches: int = -1, maximum_matches: int = -1 ) -> str: """ Search for lines matching a regular expression in files and returns markdown formatted results. Args: regex (str): A regular expression string to search for. glob_pattern (str): A glob pattern string to specify files. skip_first_matches (int): Number of initial matches to skip. maximum_matches (int): Maximum number of matches to return. Returns: str: Markdown formatted string of grep results. """ results = self.grep(regex, glob_pattern, skip_first_matches, maximum_matches) matches_found = results.matches_found output = StringIO() for file_match in results.data: output.write(file_match.filename + "\n") for match in file_match.found: output.write(f"line {match.line_number}: {match.line}\n") output.write( f"{matches_found} matches found and {min(matches_found, maximum_matches) if maximum_matches != -1 else matches_found} displayed. " f"Skipped {skip_first_matches}\n" ) output.seek(0) return output.read()
class HeadTailTool (root_folder: str, config: Config)
-
Initialize the HeadTailTool with a root folder.
Args
root_folder
:str
- The root folder where files will be checked.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class HeadTailTool: def __init__(self, root_folder: str, config: Config) -> None: """Initialize the HeadTailTool with a root folder. Args: root_folder (str): The root folder where files will be checked. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) @log() def head_markdown(self, file_path: str, lines: int = 10) -> str: """Return the first 'lines' lines of a file formatted as markdown. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Defaults to 10. Returns: str: String containing the first 'lines' lines of the file. """ return "\n".join(self.head(file_path, lines)) @log() def head(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]: """Return the first 'lines' or 'byte_count' from a file. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10. byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines. Returns: list[str]: Lines or byte_count of bytes from the start of the file. """ return self.head_tail(file_path, lines, "head", byte_count) @log() def tail_markdown(self, file_path: str, lines: int = 10) -> str: """Return the last 'lines' lines of a file formatted as markdown. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Defaults to 10. Returns: str: String containing the last 'lines' lines of the file. """ return "\n".join(self.tail(file_path, lines)) @log() def tail(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]: """Return the last 'lines' or 'bytes' from a file. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10. byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines. Returns: list[str]: Lines or bytes from the end of the file. """ return self.head_tail(file_path, lines, "tail", byte_count) def head_tail( self, file_path: str, lines: int = 10, mode: str = "head", byte_count: Optional[int] = None ) -> list[str]: """Read lines or bytes from the start ('head') or end ('tail') of a file. Args: file_path (str): Path to the file. lines (int): Number of lines to read. Ignored if byte_count is specified. Defaults to 10. mode (str): Operation mode, either 'head' or 'tail'. Defaults to 'head'. byte_count (Optional[int]): Number of bytes to read. If specified, overrides lines. Returns: list[str]: Requested lines or bytes from the file. Raises: ValueError: If mode is not 'head' or 'tail'. FileNotFoundError: If the file is not found in the root folder. """ if mode == "head": logger.info(f"head --file_path {file_path} --lines {lines}") else: logger.info(f"tail --file_path {file_path} --lines {lines}") if mode not in ["head", "tail"]: raise ValueError("Mode must be 'head' or 'tail'") if not is_file_in_root_folder(file_path, self.root_folder): raise FileNotFoundError(f"File {file_path} not found in root folder {self.root_folder}") with open(file_path, "rb") as file: if byte_count is not None: if mode == "head": return [file.read(byte_count).decode()] # mode == 'tail' file.seek(-byte_count, 2) # Seek from end of file return [file.read(byte_count).decode()] # Read by lines if byte_count is not specified if mode == "head": head_lines = [] for _ in range(lines): try: line = next(file).decode("utf-8") head_lines.append(line.rstrip("\r\n")) except StopIteration: break return head_lines # return [next(file).decode("utf-8").rstrip("\r\n") for _ in range(lines)] # mode == 'tail' return [line.decode("utf-8").rstrip("\r\n") for line in list(file)[-lines:]]
Methods
def head(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) ‑> list[str]
-
Return the first 'lines' or 'byte_count' from a file.
Args
file_path
:str
- Path to the file.
lines
:int
- Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
byte_count
:Optional[int]
- Number of bytes to return. If specified, overrides lines.
Returns
list[str]
- Lines or byte_count of bytes from the start of the file.
Expand source code
@log() def head(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]: """Return the first 'lines' or 'byte_count' from a file. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10. byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines. Returns: list[str]: Lines or byte_count of bytes from the start of the file. """ return self.head_tail(file_path, lines, "head", byte_count)
def head_markdown(self, file_path: str, lines: int = 10) ‑> str
-
Return the first 'lines' lines of a file formatted as markdown.
Args
file_path
:str
- Path to the file.
lines
:int
- Number of lines to return. Defaults to 10.
Returns
str
- String containing the first 'lines' lines of the file.
Expand source code
@log() def head_markdown(self, file_path: str, lines: int = 10) -> str: """Return the first 'lines' lines of a file formatted as markdown. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Defaults to 10. Returns: str: String containing the first 'lines' lines of the file. """ return "\n".join(self.head(file_path, lines))
def head_tail(self, file_path: str, lines: int = 10, mode: str = 'head', byte_count: Optional[int] = None) ‑> list[str]
-
Read lines or bytes from the start ('head') or end ('tail') of a file.
Args
file_path
:str
- Path to the file.
lines
:int
- Number of lines to read. Ignored if byte_count is specified. Defaults to 10.
mode
:str
- Operation mode, either 'head' or 'tail'. Defaults to 'head'.
byte_count
:Optional[int]
- Number of bytes to read. If specified, overrides lines.
Returns
list[str]
- Requested lines or bytes from the file.
Raises
ValueError
- If mode is not 'head' or 'tail'.
FileNotFoundError
- If the file is not found in the root folder.
Expand source code
def head_tail( self, file_path: str, lines: int = 10, mode: str = "head", byte_count: Optional[int] = None ) -> list[str]: """Read lines or bytes from the start ('head') or end ('tail') of a file. Args: file_path (str): Path to the file. lines (int): Number of lines to read. Ignored if byte_count is specified. Defaults to 10. mode (str): Operation mode, either 'head' or 'tail'. Defaults to 'head'. byte_count (Optional[int]): Number of bytes to read. If specified, overrides lines. Returns: list[str]: Requested lines or bytes from the file. Raises: ValueError: If mode is not 'head' or 'tail'. FileNotFoundError: If the file is not found in the root folder. """ if mode == "head": logger.info(f"head --file_path {file_path} --lines {lines}") else: logger.info(f"tail --file_path {file_path} --lines {lines}") if mode not in ["head", "tail"]: raise ValueError("Mode must be 'head' or 'tail'") if not is_file_in_root_folder(file_path, self.root_folder): raise FileNotFoundError(f"File {file_path} not found in root folder {self.root_folder}") with open(file_path, "rb") as file: if byte_count is not None: if mode == "head": return [file.read(byte_count).decode()] # mode == 'tail' file.seek(-byte_count, 2) # Seek from end of file return [file.read(byte_count).decode()] # Read by lines if byte_count is not specified if mode == "head": head_lines = [] for _ in range(lines): try: line = next(file).decode("utf-8") head_lines.append(line.rstrip("\r\n")) except StopIteration: break return head_lines # return [next(file).decode("utf-8").rstrip("\r\n") for _ in range(lines)] # mode == 'tail' return [line.decode("utf-8").rstrip("\r\n") for line in list(file)[-lines:]]
def tail(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) ‑> list[str]
-
Return the last 'lines' or 'bytes' from a file.
Args
file_path
:str
- Path to the file.
lines
:int
- Number of lines to return. Ignored if byte_count is specified. Defaults to 10.
byte_count
:Optional[int]
- Number of bytes to return. If specified, overrides lines.
Returns
list[str]
- Lines or bytes from the end of the file.
Expand source code
@log() def tail(self, file_path: str, lines: int = 10, byte_count: Optional[int] = None) -> list[str]: """Return the last 'lines' or 'bytes' from a file. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Ignored if byte_count is specified. Defaults to 10. byte_count (Optional[int]): Number of bytes to return. If specified, overrides lines. Returns: list[str]: Lines or bytes from the end of the file. """ return self.head_tail(file_path, lines, "tail", byte_count)
def tail_markdown(self, file_path: str, lines: int = 10) ‑> str
-
Return the last 'lines' lines of a file formatted as markdown.
Args
file_path
:str
- Path to the file.
lines
:int
- Number of lines to return. Defaults to 10.
Returns
str
- String containing the last 'lines' lines of the file.
Expand source code
@log() def tail_markdown(self, file_path: str, lines: int = 10) -> str: """Return the last 'lines' lines of a file formatted as markdown. Args: file_path (str): Path to the file. lines (int): Number of lines to return. Defaults to 10. Returns: str: String containing the last 'lines' lines of the file. """ return "\n".join(self.tail(file_path, lines))
class InsertTool (root_folder: str, config: Config)
-
Initialize the InsertTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class InsertTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the InsertTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.python_module = config.get_value("python_module") self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") @log() def insert_text_after_context(self, file_path: str, context: str, text_to_insert: str) -> str: """Inserts a given text immediately after a specified context in a file. This method opens the file, finds the line containing the specified context, and inserts the provided text immediately after this line. If the context matches multiple lines, it raises a ValueError due to ambiguity. Args: file_path (str): The path of the file in which the text is to be inserted. context (str): The context string to search for in the file. The text is inserted after the line containing this context. text_to_insert (str): The text to insert into the file. Returns: str: A message for the bot with the result of the insert. Raises: ValueError: If the provided context matches multiple lines in the file. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not context: raise TypeError("No context, please context so I can find where to insert the text.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: lines = file.readlines() original_lines = list(lines) context_line_indices = [i for i, line in enumerate(lines) if context in line] if len(context_line_indices) == 0: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: plain_text = file.read() raise ValueError( f"No matches found, no changes made, context is not a substring of any row. " f"For reference, here is the contents of the file:\n{plain_text}" ) # Check for ambiguity in the context match if len(context_line_indices) > 1: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: plain_text = file.read() found_at = ", ".join([str(i) for i in context_line_indices]) raise ValueError( f"Ambiguous context: The provided context matches multiple lines, namely {found_at}. A context line the " "string or substring of the line just before your desired insertion point. It must " "uniquely identify a location. Either use a longer substring to match or switch to using" "the insert_text_after_multiline_context tool.\n" f"For reference, here is the contents of the file:\n{plain_text}" ) # Index of the line after the context line insert_index = context_line_indices[0] + 1 # Insert the text lines.insert(insert_index, text_to_insert + "\n") return self._save_if_changed(file_path, original_lines, lines) @log() def insert_text_at_start_or_end(self, file_path: str, text_to_insert: str, position: str = "end") -> str: """Inserts text at the start or end of a file. Opens the file and inserts the specified text either at the beginning or the end of the file, based on the 'position' argument. If the position argument is neither 'start' nor 'end', it raises a ValueError. Args: file_path (str): The path of the file in which the text is to be inserted. text_to_insert (str): The text to insert into the file. position (str, optional): The position where the text should be inserted. Should be either 'start' or 'end'. Defaults to 'end'. Raises: ValueError: If the 'position' argument is not 'start' or 'end'. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not text_to_insert: raise TypeError("No text_to_insert, please provide so I have something to insert.") if position not in ("start", "end"): raise ValueError("position must be start or end, so I know where to insert text.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: lines = file.readlines() original_lines = list(lines) if position == "start": lines.insert(0, text_to_insert + "\n") elif position == "end": lines.append(text_to_insert + "\n") else: raise ValueError("Invalid position: choose 'start' or 'end'.") return self._save_if_changed(file_path, original_lines, lines) @log() def insert_text_after_multiline_context(self, file_path: str, context_lines: list[str], text_to_insert: str) -> str: """Inserts text immediately after a specified multiline context in a file. Opens the file and searches for a sequence of lines (context). Once the context is found, it inserts the specified text immediately after this context. If the context is not found, it raises a ValueError. Args: file_path (str): The path of the file in which the text is to be inserted. context_lines (list of str): A list of strings representing the multiline context to search for in the file. text_to_insert (str): The text to insert into the file after the context. Raises: ValueError: If the multiline context is not found in the file. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not context_lines: raise TypeError("No context_lines, please context lines so I can find where to insert the new lines.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: lines = file.readlines() try: ends_with_n = lines[:-1][0].endswith("\n") except IndexError: ends_with_n = False # this is going to make it hard to preserve whitespace. # Convert context_lines to a string for easier matching context_string = "".join([line + "\n" for line in context_lines]).rstrip("\n") # Convert file lines to a string file_string = "".join(lines) starts_at = file_string.find(context_string) if starts_at == -1: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: plain_text = file.read() raise ValueError( f"No matches found, no changes made, context_lines are not found in this document. " f"For reference, here is the contents of the file:\n{plain_text}" ) # Find the index where the context ends context_end_index = starts_at + len(context_string) # Split the file_string back into lines at the context end before_context = file_string[:context_end_index] after_context = file_string[context_end_index:] # Insert the new text new_file_string = before_context + "\n" + text_to_insert + "\n" + after_context.strip("\n") if ends_with_n: new_file_string += "\n" return self._save_if_changed(file_path, lines, new_file_string) def _save_if_changed(self, file_path: str, original_lines, new_file_string: Union[str, list[str]]) -> str: """ Save the file if it has changed. Args: file_path: The path of the file to save. original_lines: The original file contents. new_file_string: The new file contents. Returns: A message for the bot with the result of the save. """ if not new_file_string: raise TypeError("Something went wrong in insert and all text disappeared. Cancelling.") if isinstance(new_file_string, str) and "\n".join(original_lines) == new_file_string: return ( "File not changed this means the old file contents are the same as the new. This has nothing " "to do with file permissions." ) if isinstance(new_file_string, list) and original_lines == new_file_string: return ( "File not changed, this means the old file contents are the same as the new. This has nothing " "to do with file permissions." ) # if is_python_file(file_path): # is_valid, error = is_valid_python_source(source) # if not is_valid and error: # return f"Invalid Python source code. No changes made. {error.lineno} {error.msg} {error.text}" # if not is_valid: # return f"Invalid Python source code. No changes made. {error}." # Write back to the file BackupRestore.backup_file(file_path) with open(file_path, "w", encoding="utf-8", errors=self.utf8_errors) as file: if isinstance(new_file_string, str): file.write(new_file_string) else: file.writelines(new_file_string) validation = self._validate_code(file_path) if validation: BackupRestore.revert_to_latest_backup(file_path) return f"File not rewritten because of problems.\n{validation.message}" if self.auto_cat: feedback = "Insert completed and no exceptions thrown." contents = CatTool(self.root_folder, self.config).cat_markdown([file_path]) return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}" return "Insert completed and no exceptions thrown. Please verify by other means." def _validate_code(self, full_path: str) -> Optional[ValidationMessageForBot]: """ Validate python Args: full_path (str): The path to the file to validate. Returns: Optional[ValidationMessageForBot]: A validation message if the file is invalid, otherwise None. """ if not is_python_file(full_path): return None if not self.python_module: logger.warning("No python module set, skipping validation.") return None validator = ValidateModule(self.python_module) results = validator.validate() explanation = validator.explain_to_bot(results) if explanation.is_valid: return None return explanation
Methods
def insert_text_after_context(self, file_path: str, context: str, text_to_insert: str) ‑> str
-
Inserts a given text immediately after a specified context in a file.
This method opens the file, finds the line containing the specified context, and inserts the provided text immediately after this line. If the context matches multiple lines, it raises a ValueError due to ambiguity.
Args
file_path
:str
- The path of the file in which the text is to be inserted.
context
:str
- The context string to search for in the file. The text is inserted after the line containing this context.
text_to_insert
:str
- The text to insert into the file.
Returns
str
- A message for the bot with the result of the insert.
Raises
ValueError
- If the provided context matches multiple lines in the file.
Expand source code
@log() def insert_text_after_context(self, file_path: str, context: str, text_to_insert: str) -> str: """Inserts a given text immediately after a specified context in a file. This method opens the file, finds the line containing the specified context, and inserts the provided text immediately after this line. If the context matches multiple lines, it raises a ValueError due to ambiguity. Args: file_path (str): The path of the file in which the text is to be inserted. context (str): The context string to search for in the file. The text is inserted after the line containing this context. text_to_insert (str): The text to insert into the file. Returns: str: A message for the bot with the result of the insert. Raises: ValueError: If the provided context matches multiple lines in the file. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not context: raise TypeError("No context, please context so I can find where to insert the text.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: lines = file.readlines() original_lines = list(lines) context_line_indices = [i for i, line in enumerate(lines) if context in line] if len(context_line_indices) == 0: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: plain_text = file.read() raise ValueError( f"No matches found, no changes made, context is not a substring of any row. " f"For reference, here is the contents of the file:\n{plain_text}" ) # Check for ambiguity in the context match if len(context_line_indices) > 1: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: plain_text = file.read() found_at = ", ".join([str(i) for i in context_line_indices]) raise ValueError( f"Ambiguous context: The provided context matches multiple lines, namely {found_at}. A context line the " "string or substring of the line just before your desired insertion point. It must " "uniquely identify a location. Either use a longer substring to match or switch to using" "the insert_text_after_multiline_context tool.\n" f"For reference, here is the contents of the file:\n{plain_text}" ) # Index of the line after the context line insert_index = context_line_indices[0] + 1 # Insert the text lines.insert(insert_index, text_to_insert + "\n") return self._save_if_changed(file_path, original_lines, lines)
def insert_text_after_multiline_context(self, file_path: str, context_lines: list[str], text_to_insert: str) ‑> str
-
Inserts text immediately after a specified multiline context in a file.
Opens the file and searches for a sequence of lines (context). Once the context is found, it inserts the specified text immediately after this context. If the context is not found, it raises a ValueError.
Args
file_path
:str
- The path of the file in which the text is to be inserted.
context_lines
:list
ofstr
- A list of strings representing the multiline context to search for in the file.
text_to_insert
:str
- The text to insert into the file after the context.
Raises
ValueError
- If the multiline context is not found in the file.
Expand source code
@log() def insert_text_after_multiline_context(self, file_path: str, context_lines: list[str], text_to_insert: str) -> str: """Inserts text immediately after a specified multiline context in a file. Opens the file and searches for a sequence of lines (context). Once the context is found, it inserts the specified text immediately after this context. If the context is not found, it raises a ValueError. Args: file_path (str): The path of the file in which the text is to be inserted. context_lines (list of str): A list of strings representing the multiline context to search for in the file. text_to_insert (str): The text to insert into the file after the context. Raises: ValueError: If the multiline context is not found in the file. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not context_lines: raise TypeError("No context_lines, please context lines so I can find where to insert the new lines.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: lines = file.readlines() try: ends_with_n = lines[:-1][0].endswith("\n") except IndexError: ends_with_n = False # this is going to make it hard to preserve whitespace. # Convert context_lines to a string for easier matching context_string = "".join([line + "\n" for line in context_lines]).rstrip("\n") # Convert file lines to a string file_string = "".join(lines) starts_at = file_string.find(context_string) if starts_at == -1: with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: plain_text = file.read() raise ValueError( f"No matches found, no changes made, context_lines are not found in this document. " f"For reference, here is the contents of the file:\n{plain_text}" ) # Find the index where the context ends context_end_index = starts_at + len(context_string) # Split the file_string back into lines at the context end before_context = file_string[:context_end_index] after_context = file_string[context_end_index:] # Insert the new text new_file_string = before_context + "\n" + text_to_insert + "\n" + after_context.strip("\n") if ends_with_n: new_file_string += "\n" return self._save_if_changed(file_path, lines, new_file_string)
def insert_text_at_start_or_end(self, file_path: str, text_to_insert: str, position: str = 'end') ‑> str
-
Inserts text at the start or end of a file.
Opens the file and inserts the specified text either at the beginning or the end of the file, based on the 'position' argument. If the position argument is neither 'start' nor 'end', it raises a ValueError.
Args
file_path
:str
- The path of the file in which the text is to be inserted.
text_to_insert
:str
- The text to insert into the file.
position
:str
, optional- The position where the text should be inserted. Should be either 'start' or 'end'. Defaults to 'end'.
Raises
ValueError
- If the 'position' argument is not 'start' or 'end'.
Expand source code
@log() def insert_text_at_start_or_end(self, file_path: str, text_to_insert: str, position: str = "end") -> str: """Inserts text at the start or end of a file. Opens the file and inserts the specified text either at the beginning or the end of the file, based on the 'position' argument. If the position argument is neither 'start' nor 'end', it raises a ValueError. Args: file_path (str): The path of the file in which the text is to be inserted. text_to_insert (str): The text to insert into the file. position (str, optional): The position where the text should be inserted. Should be either 'start' or 'end'. Defaults to 'end'. Raises: ValueError: If the 'position' argument is not 'start' or 'end'. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not text_to_insert: raise TypeError("No text_to_insert, please provide so I have something to insert.") if position not in ("start", "end"): raise ValueError("position must be start or end, so I know where to insert text.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: lines = file.readlines() original_lines = list(lines) if position == "start": lines.insert(0, text_to_insert + "\n") elif position == "end": lines.append(text_to_insert + "\n") else: raise ValueError("Invalid position: choose 'start' or 'end'.") return self._save_if_changed(file_path, original_lines, lines)
class LsTool (root_folder: str, config: Config)
-
Initialize the FindTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class LsTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the FindTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) @log() def ls_markdown(self, path: Optional[str] = ".", all_files: bool = False, long: bool = False) -> str: """List directory contents, with options to include all files and detailed view. Args: path (str, optional): The directory path to list. Defaults to the current directory '.'. all_files (bool): If True, include hidden files. Defaults to False. long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False. Returns: str: The markdown representation of the ls command output. """ try: entries_info = self.ls(path, all_files, long) except (FileNotFoundError, NotADirectoryError): tree_text = tree(Path(os.getcwd())) markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}" return markdown_content output = StringIO() is_first = True for line in entries_info: if not is_first: output.write("\n") is_first = False output.write(line) output.seek(0) return output.read() @log() def ls(self, path: Optional[str] = None, all_files: bool = False, long: bool = False) -> Union[list[str], str]: """ List directory contents, with options to include all files and detailed view. Args: path (str, optional): The directory path to list. Defaults to the current directory '.'. all_files (bool): If True, include hidden files. Defaults to False. long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False. Returns: List[str]: List of files and directories, optionally with details. """ logger.info(f"ls --path {path} --all_files {all_files} --long {long}") if path is None: path = "" if path is not None and ("?" in path or "*" in path or "[" in path or "]" in path): # Globs behave very different from non-globs. :( # or "{" in path or "}" <-- is this a glob pattern? entries = safe_glob(path, self.root_folder) else: try: # enumerate list to check if the path exists entries = list( (_ for _ in os.listdir(path)) if all_files else (entry for entry in os.listdir(path) if not entry.startswith(".")) ) except (FileNotFoundError, NotADirectoryError): # if not, just tell the bot everything. tree_text = tree(Path(os.getcwd())) markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}" return markdown_content entries_info = [] for entry in entries: # is this None-safety here correct? full_path = entry if path is None else os.path.join(path, entry) if not is_file_in_root_folder(full_path, self.root_folder): continue if os.path.isdir(full_path) and entry.endswith("__pycache__"): continue if long: stats = os.stat(full_path) # Always human readable, too many tokens for byte count. size = human_readable_size(stats.st_size) mod_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(stats.st_mtime)) entries_info.append(f"{size:} {mod_time} {entry}") else: entries_info.append(entry) if logger.level == logging.DEBUG: for line in entries_info: logger.debug(line) return entries_info
Methods
def ls(self, path: Optional[str] = None, all_files: bool = False, long: bool = False) ‑> Union[list[str], str]
-
List directory contents, with options to include all files and detailed view.
Args
path
:str
, optional- The directory path to list. Defaults to the current directory '.'.
all_files
:bool
- If True, include hidden files. Defaults to False.
long
:bool
- If True, include details like permissions, owner, size, and modification date. Defaults to False.
Returns
List[str]
- List of files and directories, optionally with details.
Expand source code
@log() def ls(self, path: Optional[str] = None, all_files: bool = False, long: bool = False) -> Union[list[str], str]: """ List directory contents, with options to include all files and detailed view. Args: path (str, optional): The directory path to list. Defaults to the current directory '.'. all_files (bool): If True, include hidden files. Defaults to False. long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False. Returns: List[str]: List of files and directories, optionally with details. """ logger.info(f"ls --path {path} --all_files {all_files} --long {long}") if path is None: path = "" if path is not None and ("?" in path or "*" in path or "[" in path or "]" in path): # Globs behave very different from non-globs. :( # or "{" in path or "}" <-- is this a glob pattern? entries = safe_glob(path, self.root_folder) else: try: # enumerate list to check if the path exists entries = list( (_ for _ in os.listdir(path)) if all_files else (entry for entry in os.listdir(path) if not entry.startswith(".")) ) except (FileNotFoundError, NotADirectoryError): # if not, just tell the bot everything. tree_text = tree(Path(os.getcwd())) markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}" return markdown_content entries_info = [] for entry in entries: # is this None-safety here correct? full_path = entry if path is None else os.path.join(path, entry) if not is_file_in_root_folder(full_path, self.root_folder): continue if os.path.isdir(full_path) and entry.endswith("__pycache__"): continue if long: stats = os.stat(full_path) # Always human readable, too many tokens for byte count. size = human_readable_size(stats.st_size) mod_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(stats.st_mtime)) entries_info.append(f"{size:} {mod_time} {entry}") else: entries_info.append(entry) if logger.level == logging.DEBUG: for line in entries_info: logger.debug(line) return entries_info
def ls_markdown(self, path: Optional[str] = '.', all_files: bool = False, long: bool = False) ‑> str
-
List directory contents, with options to include all files and detailed view.
Args
path
:str
, optional- The directory path to list. Defaults to the current directory '.'.
all_files
:bool
- If True, include hidden files. Defaults to False.
long
:bool
- If True, include details like permissions, owner, size, and modification date. Defaults to False.
Returns
str
- The markdown representation of the ls command output.
Expand source code
@log() def ls_markdown(self, path: Optional[str] = ".", all_files: bool = False, long: bool = False) -> str: """List directory contents, with options to include all files and detailed view. Args: path (str, optional): The directory path to list. Defaults to the current directory '.'. all_files (bool): If True, include hidden files. Defaults to False. long (bool): If True, include details like permissions, owner, size, and modification date. Defaults to False. Returns: str: The markdown representation of the ls command output. """ try: entries_info = self.ls(path, all_files, long) except (FileNotFoundError, NotADirectoryError): tree_text = tree(Path(os.getcwd())) markdown_content = f"# Bad `ls` command. Here are all the files you can see\n\n{tree_text}" return markdown_content output = StringIO() is_first = True for line in entries_info: if not is_first: output.write("\n") is_first = False output.write(line) output.seek(0) return output.read()
class PatchTool (root_folder: str, config: Config)
-
Edit a file by applying a git patch.
Initialize the PatchTool with a root folder.
Args
root_folder
:str
- The root folder for valid patchable files.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class PatchTool: """Edit a file by applying a git patch.""" def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the PatchTool with a root folder. Args: root_folder (str): The root folder for valid patchable files. config (Config): The developer input that bot shouldn't set. """ self.root_folder: str = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) @log() def apply_git_patch(self, patch_content: str) -> str: """ Apply a git patch to the files in the root folder. Args: patch_content (str): The content of the git patch. Returns: str: A message indicating successful patch application. Raises: RuntimeError: If the patch application fails. """ # Create a temporary file to store the patch content with tempfile.NamedTemporaryFile(suffix=".patch", delete=False) as tmp_patch: tmp_patch_name = tmp_patch.name tmp_patch.write(patch_content.encode("utf-8")) tmp_patch.flush() _patch = PatchSet.from_filename(tmp_patch_name, encoding="utf-8") cmd = ["git", "apply", tmp_patch_name, "--reject", "--verbose"] # Execute the command and capture stdout and stderr try: result = subprocess.run(cmd, capture_output=True, text=True, check=True, shell=False) # nosec # Log stdout and stderr logger.info("STDOUT:\n%s", result.stdout.replace(" ", ".").replace("\n", "\\n")) logger.info("STDERR:\n%s", result.stderr.replace(" ", ".").replace("\n", "\\n")) # Check for errors and return the result if result.returncode != 0: raise RuntimeError(f"Failed to apply patch: {result.stderr}") except subprocess.CalledProcessError as cpe: print(cpe) print(cpe.stdout) print(cpe.stderr) raise return "Patch applied without exception, please verify by other means to see if it was successful." def _extract_files_from_patch(self, patch_content: str) -> set[str]: """ Extract file names from the patch content. Args: patch_content (str): The content of the git patch. Returns: set[str]: A set of file names extracted from the patch. """ file_names = set() lines = patch_content.split("\n") for line in lines: if line.startswith("--- a/") or line.startswith("+++ b/"): # Extract the file name and add it to the set parts = line.split() if len(parts) > 1: file_name = parts[1] if file_name.startswith("a/") or file_name.startswith("b/"): file_name = file_name[2:] file_names.add(file_name) return file_names
Methods
def apply_git_patch(self, patch_content: str) ‑> str
-
Apply a git patch to the files in the root folder.
Args
patch_content
:str
- The content of the git patch.
Returns
str
- A message indicating successful patch application.
Raises
RuntimeError
- If the patch application fails.
Expand source code
@log() def apply_git_patch(self, patch_content: str) -> str: """ Apply a git patch to the files in the root folder. Args: patch_content (str): The content of the git patch. Returns: str: A message indicating successful patch application. Raises: RuntimeError: If the patch application fails. """ # Create a temporary file to store the patch content with tempfile.NamedTemporaryFile(suffix=".patch", delete=False) as tmp_patch: tmp_patch_name = tmp_patch.name tmp_patch.write(patch_content.encode("utf-8")) tmp_patch.flush() _patch = PatchSet.from_filename(tmp_patch_name, encoding="utf-8") cmd = ["git", "apply", tmp_patch_name, "--reject", "--verbose"] # Execute the command and capture stdout and stderr try: result = subprocess.run(cmd, capture_output=True, text=True, check=True, shell=False) # nosec # Log stdout and stderr logger.info("STDOUT:\n%s", result.stdout.replace(" ", ".").replace("\n", "\\n")) logger.info("STDERR:\n%s", result.stderr.replace(" ", ".").replace("\n", "\\n")) # Check for errors and return the result if result.returncode != 0: raise RuntimeError(f"Failed to apply patch: {result.stderr}") except subprocess.CalledProcessError as cpe: print(cpe) print(cpe.stdout) print(cpe.stderr) raise return "Patch applied without exception, please verify by other means to see if it was successful."
class PyCatTool (root_folder: str, config: Config)
-
Initialize the PyCatTool with a root folder.
Args
root_folder
:str
- The root folder path to start the file traversal from.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class PyCatTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the PyCatTool with a root folder. Args: root_folder (str): The root folder path to start the file traversal from. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") @log() def format_code_as_markdown( self, base_path: str, header: str, no_docs: bool = False, no_comments: bool = False, ) -> str: """ Combine all Python files in a directory into a single Markdown file. This method traverses the directory starting from base_path, and for each Python file found, its contents are formatted and appended to the Markdown file specified by output_file. Args: base_path (str): The base path of the directory to start traversing. header (str): A header string to be included at the beginning of the Markdown file. no_docs (bool): Whether to exclude docstrings from the output. Defaults to False. no_comments (bool): Whether to exclude comments from the output. Defaults to False. Returns: str: The Markdown file contents. """ output_file = StringIO() if header == "tree": tree_text = tree(Path(base_path)) markdown_content = f"# Source Code Filesystem Tree\n\n{tree_text}" output_file.write(markdown_content) markdown_content = f"# {header} Source Code\n\n" for root, _dirs, files in os.walk(base_path): for file in files: if not is_file_in_root_folder(file, self.root_folder): continue if is_python_file(file): full_path = os.path.join(root, file) relative_path = os.path.relpath(full_path, base_path) markdown_content += format_path_as_header(relative_path) markdown_content += "```python\n" with open(full_path, encoding="utf-8", errors=self.utf8_errors) as handle: text = handle.read() markdown_content += text markdown_content += "\n```\n\n" output_file.write(markdown_content) return output_file.getvalue()
Methods
def format_code_as_markdown(self, base_path: str, header: str, no_docs: bool = False, no_comments: bool = False) ‑> str
-
Combine all Python files in a directory into a single Markdown file.
This method traverses the directory starting from base_path, and for each Python file found, its contents are formatted and appended to the Markdown file specified by output_file.
Args
base_path
:str
- The base path of the directory to start traversing.
header
:str
- A header string to be included at the beginning of the Markdown file.
no_docs
:bool
- Whether to exclude docstrings from the output. Defaults to False.
no_comments
:bool
- Whether to exclude comments from the output. Defaults to False.
Returns
str
- The Markdown file contents.
Expand source code
@log() def format_code_as_markdown( self, base_path: str, header: str, no_docs: bool = False, no_comments: bool = False, ) -> str: """ Combine all Python files in a directory into a single Markdown file. This method traverses the directory starting from base_path, and for each Python file found, its contents are formatted and appended to the Markdown file specified by output_file. Args: base_path (str): The base path of the directory to start traversing. header (str): A header string to be included at the beginning of the Markdown file. no_docs (bool): Whether to exclude docstrings from the output. Defaults to False. no_comments (bool): Whether to exclude comments from the output. Defaults to False. Returns: str: The Markdown file contents. """ output_file = StringIO() if header == "tree": tree_text = tree(Path(base_path)) markdown_content = f"# Source Code Filesystem Tree\n\n{tree_text}" output_file.write(markdown_content) markdown_content = f"# {header} Source Code\n\n" for root, _dirs, files in os.walk(base_path): for file in files: if not is_file_in_root_folder(file, self.root_folder): continue if is_python_file(file): full_path = os.path.join(root, file) relative_path = os.path.relpath(full_path, base_path) markdown_content += format_path_as_header(relative_path) markdown_content += "```python\n" with open(full_path, encoding="utf-8", errors=self.utf8_errors) as handle: text = handle.read() markdown_content += text markdown_content += "\n```\n\n" output_file.write(markdown_content) return output_file.getvalue()
class PytestTool (root_folder: str, config: Config)
-
Optimized for AI version of pytest.
Initialize the PytestTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class PytestTool: """Optimized for AI version of pytest.""" def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the PytestTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.module = config.get_value("pytest_module") self.tests_folder = config.get_value("pytest_folder") self.min_coverage = float(config.get_value("pytest_min_coverage") or 0.0) @log() def pytest( self, ) -> str: """ Runs pytest on tests in tests folder.. Returns: str: Output from pytest. """ # Host script must set env vars, temp folder location and pwd! # with change_directory(self.root_folder): # What is -rA if not self.module or not self.tests_folder or self.min_coverage: raise FatalConfigurationError("Please set in ai_config module, test_folder and min_coverage") _passed_tests, _failed_tests, _coverage, command_result = count_pytest_results( self.module, self.tests_folder, self.min_coverage ) markdown_output = f"""## Pytest Output ### Standard Output {command_result.stdout} ### Standard Error {command_result.stderr} ### Return Code `{command_result.return_code}`""" return markdown_output
Methods
def pytest(self) ‑> str
-
Runs pytest on tests in tests folder..
Returns
str
- Output from pytest.
Expand source code
@log() def pytest( self, ) -> str: """ Runs pytest on tests in tests folder.. Returns: str: Output from pytest. """ # Host script must set env vars, temp folder location and pwd! # with change_directory(self.root_folder): # What is -rA if not self.module or not self.tests_folder or self.min_coverage: raise FatalConfigurationError("Please set in ai_config module, test_folder and min_coverage") _passed_tests, _failed_tests, _coverage, command_result = count_pytest_results( self.module, self.tests_folder, self.min_coverage ) markdown_output = f"""## Pytest Output ### Standard Output {command_result.stdout} ### Standard Error {command_result.stderr} ### Return Code `{command_result.return_code}`""" return markdown_output
class ReplaceTool (root_folder: str, config: Config)
-
Initialize the SedTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class ReplaceTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the SedTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.python_module = config.get_value("python_module") self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") @log() def replace_line_by_line( self, file_path: str, old_text: str, new_text: str, line_start: int = 0, line_end: int = -1 ) -> str: """Replaces occurrences of a specified text with new text in a range of lines in a file. Opens the file and replaces occurrences of 'old_text' with 'new_text' within the specified line range. If 'line_end' is -1, it defaults to the end of the file. Returns a message indicating whether changes were successfully applied or not. Args: file_path (str): The path to the file. old_text (str): The text to be replaced. new_text (str): The new text to replace the old text. line_start (int, optional): The starting line number (0-indexed) for the replacement. Defaults to 0. line_end (int, optional): The ending line number (0-indexed) for the replacement. If -1, it goes to the end of the file. Defaults to -1. Returns: str: A message indicating the success of the operation. Raises: TypeError: If file_path or old_text is None, or if no lines are left after replacement. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not old_text: raise TypeError("No old_text, please context so I can find the text to replace.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() lines = [] input_lines = input_text.splitlines() if line_end == -1: line_end = len(input_lines) for line_no, line in enumerate(input_lines): if line_start <= line_no < line_end and old_text in line: line = line.replace(old_text, new_text) lines.append(line) if not lines: raise TypeError("Nothing left after replace, something went wrong, cancelling.") final = "\n".join(lines) return self._save_if_changed(file_path, final, input_text) @log() def replace_all(self, file_path: str, old_text: str, new_text: str) -> str: """Replaces all occurrences of a specified text with new text in a file. Opens the file and replaces all occurrences of 'old_text' with 'new_text'. Returns a message indicating whether changes were successfully applied or not. Args: file_path (str): The path to the file. old_text (str): The text to be replaced. new_text (str): The new text to replace the old text. Returns: str: A message indicating the success of the operation. Raises: TypeError: If file_path or old_text is None. """ if new_text is None: new_text = "" if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not old_text: raise TypeError("No old_text, please context so I can find the text to replace.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() final = input_text.replace(old_text, new_text) return self._save_if_changed(file_path, final, input_text) @log() def replace_with_regex(self, file_path: str, regex_match_expression: str, replacement: str) -> str: """Replaces text in a file based on a regular expression match. Opens the file and replaces text that matches the regular expression 'regex_match_expression' with the 'replacement' text. Returns a message indicating whether changes were successfully applied or not. Args: file_path (str): The path to the file. regex_match_expression (str): The regular expression pattern to match. replacement (str): The text to replace the matched pattern. Returns: str: A message indicating the success of the operation. Raises: TypeError: If file_path or regex_match_expression is None. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not regex_match_expression: raise TypeError("No regex_match_expression, please context so I can find the text to replace.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() final = re.sub(regex_match_expression, replacement, input_text) return self._save_if_changed(file_path, final, input_text) def _save_if_changed(self, file_path: str, final: str, input_text: str) -> str: """Saves the modified text to the file if changes have been made. Compares the original text with the modified text and writes the modified text to the file if there are changes. Returns a message indicating whether any changes were made. Args: file_path (str): The path to the file. final (str): The modified text. input_text (str): The original text. Returns: str: A message indicating whether changes were made or not. Raises: TypeError: If file_path is None. """ if not final: raise TypeError("Something went wrong in replace and all text disappeared. Cancelling.") if input_text != final: BackupRestore.backup_file(file_path) with open(file_path, "w", encoding="utf-8", errors=self.utf8_errors) as output_file: output_file.write(final) validation = self._validate_code(file_path) if validation: BackupRestore.revert_to_latest_backup(file_path) return f"File not written because of problems.\n{validation.message}" if self.auto_cat: feedback = "Changes applied without exception, please verify by other means.\n" contents = CatTool(self.root_folder, self.config).cat_markdown([file_path]) return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}" return "Changes applied without exception, please verify by other means." return ( "No changes made, this means the old file contents are the same as the new. This has nothing " "to do with file permissions. Try again with a different match pattern." ) def _validate_code(self, full_path: str) -> Optional[ValidationMessageForBot]: """ Validate python Args: full_path (str): The path to the file to validate. Returns: Optional[ValidationMessageForBot]: A validation message if the file is invalid, otherwise None. """ if not is_python_file(full_path): return None if not self.python_module: logger.warning("No python module set, skipping validation.") return None validator = ValidateModule(self.python_module) results = validator.validate() explanation = validator.explain_to_bot(results) if explanation.is_valid: return None return explanation
Methods
def replace_all(self, file_path: str, old_text: str, new_text: str) ‑> str
-
Replaces all occurrences of a specified text with new text in a file.
Opens the file and replaces all occurrences of 'old_text' with 'new_text'. Returns a message indicating whether changes were successfully applied or not.
Args
file_path
:str
- The path to the file.
old_text
:str
- The text to be replaced.
new_text
:str
- The new text to replace the old text.
Returns
str
- A message indicating the success of the operation.
Raises
TypeError
- If file_path or old_text is None.
Expand source code
@log() def replace_all(self, file_path: str, old_text: str, new_text: str) -> str: """Replaces all occurrences of a specified text with new text in a file. Opens the file and replaces all occurrences of 'old_text' with 'new_text'. Returns a message indicating whether changes were successfully applied or not. Args: file_path (str): The path to the file. old_text (str): The text to be replaced. new_text (str): The new text to replace the old text. Returns: str: A message indicating the success of the operation. Raises: TypeError: If file_path or old_text is None. """ if new_text is None: new_text = "" if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not old_text: raise TypeError("No old_text, please context so I can find the text to replace.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() final = input_text.replace(old_text, new_text) return self._save_if_changed(file_path, final, input_text)
def replace_line_by_line(self, file_path: str, old_text: str, new_text: str, line_start: int = 0, line_end: int = -1) ‑> str
-
Replaces occurrences of a specified text with new text in a range of lines in a file.
Opens the file and replaces occurrences of 'old_text' with 'new_text' within the specified line range. If 'line_end' is -1, it defaults to the end of the file. Returns a message indicating whether changes were successfully applied or not.
Args
file_path
:str
- The path to the file.
old_text
:str
- The text to be replaced.
new_text
:str
- The new text to replace the old text.
line_start
:int
, optional- The starting line number (0-indexed) for the replacement. Defaults to 0.
line_end
:int
, optional- The ending line number (0-indexed) for the replacement. If -1, it goes to the end of the file. Defaults to -1.
Returns
str
- A message indicating the success of the operation.
Raises
TypeError
- If file_path or old_text is None, or if no lines are left after replacement.
Expand source code
@log() def replace_line_by_line( self, file_path: str, old_text: str, new_text: str, line_start: int = 0, line_end: int = -1 ) -> str: """Replaces occurrences of a specified text with new text in a range of lines in a file. Opens the file and replaces occurrences of 'old_text' with 'new_text' within the specified line range. If 'line_end' is -1, it defaults to the end of the file. Returns a message indicating whether changes were successfully applied or not. Args: file_path (str): The path to the file. old_text (str): The text to be replaced. new_text (str): The new text to replace the old text. line_start (int, optional): The starting line number (0-indexed) for the replacement. Defaults to 0. line_end (int, optional): The ending line number (0-indexed) for the replacement. If -1, it goes to the end of the file. Defaults to -1. Returns: str: A message indicating the success of the operation. Raises: TypeError: If file_path or old_text is None, or if no lines are left after replacement. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not old_text: raise TypeError("No old_text, please context so I can find the text to replace.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() lines = [] input_lines = input_text.splitlines() if line_end == -1: line_end = len(input_lines) for line_no, line in enumerate(input_lines): if line_start <= line_no < line_end and old_text in line: line = line.replace(old_text, new_text) lines.append(line) if not lines: raise TypeError("Nothing left after replace, something went wrong, cancelling.") final = "\n".join(lines) return self._save_if_changed(file_path, final, input_text)
def replace_with_regex(self, file_path: str, regex_match_expression: str, replacement: str) ‑> str
-
Replaces text in a file based on a regular expression match.
Opens the file and replaces text that matches the regular expression 'regex_match_expression' with the 'replacement' text. Returns a message indicating whether changes were successfully applied or not.
Args
file_path
:str
- The path to the file.
regex_match_expression
:str
- The regular expression pattern to match.
replacement
:str
- The text to replace the matched pattern.
Returns
str
- A message indicating the success of the operation.
Raises
TypeError
- If file_path or regex_match_expression is None.
Expand source code
@log() def replace_with_regex(self, file_path: str, regex_match_expression: str, replacement: str) -> str: """Replaces text in a file based on a regular expression match. Opens the file and replaces text that matches the regular expression 'regex_match_expression' with the 'replacement' text. Returns a message indicating whether changes were successfully applied or not. Args: file_path (str): The path to the file. regex_match_expression (str): The regular expression pattern to match. replacement (str): The text to replace the matched pattern. Returns: str: A message indicating the success of the operation. Raises: TypeError: If file_path or regex_match_expression is None. """ if not file_path: raise TypeError("No file_path, please provide file_path for each request.") if not regex_match_expression: raise TypeError("No regex_match_expression, please context so I can find the text to replace.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() final = re.sub(regex_match_expression, replacement, input_text) return self._save_if_changed(file_path, final, input_text)
class RewriteTool (root_folder: str, config: Config)
-
Initialize the RewriteTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class RewriteTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the RewriteTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.python_module = config.get_value("python_module") self.only_add_text = config.get_flag("only_add_text", False) @log() def write_new_file(self, file_path: str, text: str) -> str: """ Write a new file at file_path within the root_folder. Args: file_path (str): The relative path to the file to be written. text (str): The content to write into the file. Returns: str: A success message with the file path. Raises: ValueError: If the file already exists or if the file_path is outside the root_folder. """ file_path = sanitize_path(file_path) # Don't prepend root folder, we will have already cd'd to it. full_path = file_path if not is_file_in_root_folder(full_path, self.root_folder): raise ValueError("File path must be within the root folder.") try: if os.path.exists(full_path): raise FileExistsError("File already exists.") with open(full_path, "w", encoding="utf-8") as file: file.write(text) validation = self._validate_code(full_path) if validation: os.remove(full_path) return f"File not written because of problems.\n{validation.message}" return f"File written to {full_path}" except FileExistsError as e: tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {full_path} already exists. Here are all the files you can see\n\n{tree_text}" raise ValueError( str(e) + f" {markdown_content}\n Consider using rewrite_file method if you want to overwrite." ) from e @log() def rewrite_file(self, file_path: str, text: str) -> str: """ Backup and rewrite an existing file at file_path within the root_folder. This will completely replace the contents of the file with the new text. Args: file_path (str): The relative path to the file to be rewritten. text (str): The new content to write into the file. Returns: str: A success message with the file path. Raises: ValueError: If the file does not exist or if the file_path is outside the root_folder. """ if not text: raise TypeError("This would delete everything in the file. This is probably not what you want.") file_path = sanitize_path(file_path) # Don't prepend root folder, we will have already cd'd to it. full_path = file_path if not is_file_in_root_folder(full_path, self.root_folder): raise ValueError("File path must be within the root folder.") # not sure this is working right. _unchanged_proportion, initial, unchanged, added, removed = file_similarity(full_path, text.split("\n")) if self.only_add_text and removed > 0: raise TypeError("This would delete lines. Only add lines, do not remove them.") if self.only_add_text and len(text.split("\n")) < initial: raise TypeError("Line count decreased. Only add text, do not remove it.") # if 5 < initial <= removed: # # concern is taking a large file, and deleting everything (ie. confusing full rewrite for an insert or edit) # raise TypeError( # "Removed lines is equal initial number of lines. " # "When rewriting files, you have to re-write the previous lines, too." # ) # if unchanged > 0 and initial > 0 and added == 0 and removed == 0: # raise TypeError( # "Nothing changed, nothing was added or removed. " # "When rewriting files, you have to re-write the whole file " # "with lines changed, added or removed." # ) try: if not os.path.exists(full_path): raise FileNotFoundError("File does not exist, use ls tool to see what files there are.") BackupRestore.backup_file(full_path) with open(full_path, "w", encoding="utf-8") as file: file.write(text) validation = self._validate_code(full_path) if validation: BackupRestore.revert_to_latest_backup(full_path) return f"File not rewritten because of problems.\n{validation.message}" feedback = f"File rewritten to {full_path}" if self.auto_cat: feedback = "Changes made without exception, please verify by other means.\n" contents = CatTool(self.root_folder, self.config).cat_markdown([file_path]) return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}" return feedback + ", please view to verify contents." except FileNotFoundError as e: raise FileNotFoundError( str(e) + " Consider using write_new_file method if you want to create a new file." ) from e def _validate_code(self, full_path: str) -> Optional[ValidationMessageForBot]: """ Validate python Args: full_path (str): The path to the file to validate. Returns: Optional[ValidationMessageForBot]: A validation message if the file is invalid, otherwise None. """ if not is_python_file(full_path): return None if not self.python_module: logger.warning("No python module set, skipping validation.") return None validator = ValidateModule(self.python_module) results = validator.validate() explanation = validator.explain_to_bot(results) if explanation.is_valid: return None return explanation
Methods
def rewrite_file(self, file_path: str, text: str) ‑> str
-
Backup and rewrite an existing file at file_path within the root_folder. This will completely replace the contents of the file with the new text.
Args
file_path
:str
- The relative path to the file to be rewritten.
text
:str
- The new content to write into the file.
Returns
str
- A success message with the file path.
Raises
ValueError
- If the file does not exist or if the file_path is outside the root_folder.
Expand source code
@log() def rewrite_file(self, file_path: str, text: str) -> str: """ Backup and rewrite an existing file at file_path within the root_folder. This will completely replace the contents of the file with the new text. Args: file_path (str): The relative path to the file to be rewritten. text (str): The new content to write into the file. Returns: str: A success message with the file path. Raises: ValueError: If the file does not exist or if the file_path is outside the root_folder. """ if not text: raise TypeError("This would delete everything in the file. This is probably not what you want.") file_path = sanitize_path(file_path) # Don't prepend root folder, we will have already cd'd to it. full_path = file_path if not is_file_in_root_folder(full_path, self.root_folder): raise ValueError("File path must be within the root folder.") # not sure this is working right. _unchanged_proportion, initial, unchanged, added, removed = file_similarity(full_path, text.split("\n")) if self.only_add_text and removed > 0: raise TypeError("This would delete lines. Only add lines, do not remove them.") if self.only_add_text and len(text.split("\n")) < initial: raise TypeError("Line count decreased. Only add text, do not remove it.") # if 5 < initial <= removed: # # concern is taking a large file, and deleting everything (ie. confusing full rewrite for an insert or edit) # raise TypeError( # "Removed lines is equal initial number of lines. " # "When rewriting files, you have to re-write the previous lines, too." # ) # if unchanged > 0 and initial > 0 and added == 0 and removed == 0: # raise TypeError( # "Nothing changed, nothing was added or removed. " # "When rewriting files, you have to re-write the whole file " # "with lines changed, added or removed." # ) try: if not os.path.exists(full_path): raise FileNotFoundError("File does not exist, use ls tool to see what files there are.") BackupRestore.backup_file(full_path) with open(full_path, "w", encoding="utf-8") as file: file.write(text) validation = self._validate_code(full_path) if validation: BackupRestore.revert_to_latest_backup(full_path) return f"File not rewritten because of problems.\n{validation.message}" feedback = f"File rewritten to {full_path}" if self.auto_cat: feedback = "Changes made without exception, please verify by other means.\n" contents = CatTool(self.root_folder, self.config).cat_markdown([file_path]) return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}" return feedback + ", please view to verify contents." except FileNotFoundError as e: raise FileNotFoundError( str(e) + " Consider using write_new_file method if you want to create a new file." ) from e
def write_new_file(self, file_path: str, text: str) ‑> str
-
Write a new file at file_path within the root_folder.
Args
file_path
:str
- The relative path to the file to be written.
text
:str
- The content to write into the file.
Returns
str
- A success message with the file path.
Raises
ValueError
- If the file already exists or if the file_path is outside the root_folder.
Expand source code
@log() def write_new_file(self, file_path: str, text: str) -> str: """ Write a new file at file_path within the root_folder. Args: file_path (str): The relative path to the file to be written. text (str): The content to write into the file. Returns: str: A success message with the file path. Raises: ValueError: If the file already exists or if the file_path is outside the root_folder. """ file_path = sanitize_path(file_path) # Don't prepend root folder, we will have already cd'd to it. full_path = file_path if not is_file_in_root_folder(full_path, self.root_folder): raise ValueError("File path must be within the root folder.") try: if os.path.exists(full_path): raise FileExistsError("File already exists.") with open(full_path, "w", encoding="utf-8") as file: file.write(text) validation = self._validate_code(full_path) if validation: os.remove(full_path) return f"File not written because of problems.\n{validation.message}" return f"File written to {full_path}" except FileExistsError as e: tree_text = tree(Path(os.getcwd())) markdown_content = f"# File {full_path} already exists. Here are all the files you can see\n\n{tree_text}" raise ValueError( str(e) + f" {markdown_content}\n Consider using rewrite_file method if you want to overwrite." ) from e
class SedTool (root_folder: str, config: Config)
-
Initialize the SedTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class SedTool: def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the SedTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config self.auto_cat = config.get_flag("auto_cat", True) self.utf8_errors = config.get_value("utf8_errors", "surrogateescape") @log() def sed(self, file_path: str, commands: list[str]) -> str: r""" Transform the contents of a file located at file_path as per the provided sed-like commands. Args: file_path (str): The path of the file to be transformed. commands (list[str]): A list of sed-like commands for text transformation. Returns: str: The transformed text from the file. Supported command syntax: - s/regex/replacement/flags: Regex substitution. - p: Print the current line. - a\text: Append text after the current line. - i\text: Insert text before the current line. - [number]c\text: Change the text of a specific line number. - [number]d: Delete a specific line number. Note: This function reads from a file and returns the transformed text. It does not modify the file in-place. """ if not is_file_in_root_folder(file_path, self.root_folder): raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() output_text = SedTool._process_sed(input_text, commands) if is_python_file(file_path): is_valid, error = is_valid_python_source(output_text) if not is_valid and error is not None: return f"Invalid Python source code. No changes made. {error.lineno} {error.msg} {error.text}" if input_text != output_text: with open(file_path, "w", encoding="utf-8") as output_file: output_file.write(output_text) if self.auto_cat: feedback = "Changes without exception, please verify by other means.\n" contents = CatTool(self.root_folder, self.config).cat_markdown([file_path]) return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}" return "Changes without exception, please verify by other means." return "No changes made." @classmethod def _process_sed(cls, input_text: str, commands: list[str]) -> str: r""" Transform input_text as per the provided sed-like commands. Args: input_text (str): The input text to be transformed. commands (list[str]): A list of sed-like commands for text transformation. Returns: str: The transformed text. Supported command syntax: - s/regex/replacement/flags: Regex substitution. - a\text: Append text after the current line. - i\text: Insert text before the current line. - [number]c\text: Change the text of a specific line number. - [number]d: Delete a specific line number. Example: >>> SedTool._process_sed("Hello World\\nThis is a test", ["s/World/Universe/", "a\\Appended text"]) 'Hello Universe\\nThis is a test\nAppended text' >>> SedTool._process_sed("First Line\\nSecond Line", ["2d", "i\\Inserted at Start"]) 'Inserted at Start\nFirst Line\\nSecond Line' """ if isinstance(commands, str): commands = [commands] # don't know how to fix the covariant/invariant typing issue here lines: list[str] = input_text.split("\n") for i in range(len(lines)): for command in commands: if command.startswith("s/") and re.match(r"s/.+/.*/", command): # Regex substitution: s/regex/replacement/flags parts = command[2:].rsplit("/", 2) regex, replacement, flags = parts[0], parts[1], parts[2] if len(parts) > 2 else "" count = 1 if "g" not in flags else 0 # replace all if 'g' is present lines[i] = re.sub(regex, replacement, lines[i], count=count) elif command.startswith("a\\"): # Append: a\text append_text = command[2:] lines[i] += "\n" + append_text elif re.match(r"\d+a\\", command): # insert after the specified line. a for after. target_line, change_text = command.split("a\\") if i + 1 == int(target_line): lines[i] = change_text elif command.startswith("i\\") and i == 0: # Insert: i\text (only at the beginning of the text) insert_text = command[2:] lines[i] = insert_text + "\n" + lines[i] elif re.match(r"\d+c\\", command): # Change specific line: [number]c\text target_line, change_text = command.split("c\\") if i + 1 == int(target_line): lines[i] = change_text elif re.match(r"\d+d", command): # Delete specific line: [number]d delete_line = int(command[:-1]) if i + 1 == delete_line: # None was a better deletion marker, but messes with mypy. lines[i] = "None # Mark for deletion" elif command == "p": # print? No action? pass else: raise TypeError( "Unknown command, expected prefix of s/ or a\\ or digit + c or digit + d for replace, append, change, or delete respectively" ) # Rebuild the output from modified lines, excluding deleted ones output = [line for line in lines if line is not None] return "\n".join(output) # # Rerun the regex substitution test with the corrected function # test_regex_substitution_corrected = lambda: simulate_sed_corrected(input_text, commands) == expected_output # test_regex_substitution_corrected()
Methods
def sed(self, file_path: str, commands: list[str]) ‑> str
-
Transform the contents of a file located at file_path as per the provided sed-like commands.
Args
file_path
:str
- The path of the file to be transformed.
commands
:list[str]
- A list of sed-like commands for text transformation.
Returns
str
- The transformed text from the file.
Supported command syntax: - s/regex/replacement/flags: Regex substitution. - p: Print the current line. - a\text: Append text after the current line. - i\text: Insert text before the current line. - [number]c\text: Change the text of a specific line number. - [number]d: Delete a specific line number.
Note: This function reads from a file and returns the transformed text. It does not modify the file in-place.
Expand source code
@log() def sed(self, file_path: str, commands: list[str]) -> str: r""" Transform the contents of a file located at file_path as per the provided sed-like commands. Args: file_path (str): The path of the file to be transformed. commands (list[str]): A list of sed-like commands for text transformation. Returns: str: The transformed text from the file. Supported command syntax: - s/regex/replacement/flags: Regex substitution. - p: Print the current line. - a\text: Append text after the current line. - i\text: Insert text before the current line. - [number]c\text: Change the text of a specific line number. - [number]d: Delete a specific line number. Note: This function reads from a file and returns the transformed text. It does not modify the file in-place. """ if not is_file_in_root_folder(file_path, self.root_folder): raise ValueError(f"File {file_path} is not in root folder {self.root_folder}.") with open(file_path, encoding="utf-8", errors=self.utf8_errors) as file: input_text = file.read() output_text = SedTool._process_sed(input_text, commands) if is_python_file(file_path): is_valid, error = is_valid_python_source(output_text) if not is_valid and error is not None: return f"Invalid Python source code. No changes made. {error.lineno} {error.msg} {error.text}" if input_text != output_text: with open(file_path, "w", encoding="utf-8") as output_file: output_file.write(output_text) if self.auto_cat: feedback = "Changes without exception, please verify by other means.\n" contents = CatTool(self.root_folder, self.config).cat_markdown([file_path]) return f"Tool feedback: {feedback}\n\nCurrent file contents:\n\n{contents}" return "Changes without exception, please verify by other means." return "No changes made."
class TaskBot (config: Config, name: str, bot_instructions: str, model: str, dialog_logger_md: DialogLoggerWithMarkdown, persist_bots: bool = False, persist_threads: bool = False, maximum_loops: int = 10)
-
Minimal bot management code.
Expand source code
class TaskBot: """Minimal bot management code.""" def __init__( self, config: Config, name: str, bot_instructions: str, model: str, dialog_logger_md: DialogLoggerWithMarkdown, persist_bots: bool = False, persist_threads: bool = False, maximum_loops: int = 10, ): self.model = model """Model, name and instructions uniquely identify a bot.""" self.name = name """Model, name and instructions uniquely identify a bot.""" self.bot_instructions = bot_instructions """Model, name and instructions uniquely identify a bot.""" self.client: openai.AsyncOpenAI = openai.AsyncOpenAI() self.thread: Optional[Thread] = None self.assistant: Optional[Assistant] = None self.dialog_logger_md = dialog_logger_md """Conversation style logger""" self.persist_bots = persist_bots """Keep bots or attempt to delete them at end of session""" self.persist_threads = persist_threads """Keep thread or attempt to delete them""" self.config = config """Stores bot, thread config and other global config.""" self.maximum_loops = maximum_loops """Prevent infinite loops and money wastage.""" self.toolkit: Optional[ToolKit] = None """Reference to toolkit so that goal checkers can check if any tools were used.""" self.allow_self_certification = False """Do you want to trust the bot when it says it has achieved the goal?""" self.conversation_over_marker = "DONE" """Goal function checker returns this when done.""" async def initialize(self) -> None: """Get or create a bot and store it in the config.""" bot = await self.get_create_bot() logger.debug(f"Assistant id: {bot.id}") self.assistant = bot self.dialog_logger_md.write_header(bot_name=self.name, model=self.model, bot_instructions=self.bot_instructions) async def get_create_bot(self) -> Assistant: """Get or create a bot and store it in the config.""" current_bot = self.config.get_bot(self.name) if not current_bot: await self.create_bot() else: try: self.assistant = await self.client.beta.assistants.retrieve(current_bot.assistant_id) logger.debug(f"Assistant retrieved: {self.assistant.id}") except openai.NotFoundError: await self.create_bot() if not self.assistant: raise TypeError("Assistant not found or created.") logger.debug(f"Assistant id: {self.assistant.id}") return self.assistant async def create_bot(self): """Create a bot and store it in the config.""" self.assistant = await self.client.beta.assistants.create( name=self.name, instructions=self.bot_instructions, model=self.model, ) self.config.add_bot(self.assistant.id, self.name) logger.debug(f"Assistant created: {self.assistant.id}") def toolkit_factory( self, root_folder: str, model: str, tool_names: list[str] ) -> tuple[ToolKit, list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction]]: self.toolkit = ToolKit(root_folder, model, 500, permitted_tools=tool_names, config=self.config) # sync COM self.toolkit.conversation_over_marker = self.conversation_over_marker initialize_all_tools(keeps=tool_names) tools_schema: list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction] = [ ToolAssistantToolsFunction(**{"function": cast(FunctionDefinition, schema), "type": "function"}) for schema in ALL_TOOLS ] if not tools_schema: raise Exception("Not enough tools!") return self.toolkit, tools_schema async def one_shot_ask(self, the_ask: str) -> Any: """Free-form request, structured response. Args: the_ask (str): The request. Returns: Any: The response. """ if not self.toolkit: raise TypeError("Missing toolkit before one_shot_ask") if not self.assistant: raise TypeError("Missing assistant before one_shot_ask") try: _, tool_schemas = self.toolkit_factory( ".", self.model, [ # "report_bool", "report_dict", "report_float", "report_int", # "report_json", "report_list", # "report_set", # "report_text", Why? Just do an unstructured query. # "report_toml", # "report_tuple", # "report_xml", ], ) thread = await self.client.beta.threads.create() logger.info(the_ask) _message = await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=the_ask, ) # pydantic_tools =[run_create_params.Tool(_) for _ in tool_schemas] run = await self.client.beta.threads.runs.create( thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas ) tool_use_count = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md) if tool_use_count == 0: logger.warning("No tool usage, something went wrong.") messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc") # logger.info(messages) last_words = parse_message(messages) self.dialog_logger_md.add_bot(last_words) except Exception as exception: self.dialog_logger_md.add_error(exception) raise finally: # clean up thread if self.thread and not self.persist_threads: await self.client.beta.threads.delete(self.thread.id) if self.assistant and not self.persist_bots: # clean up assistant await self.client.beta.assistants.delete(self.assistant.id) async def basic_tool_loop( self, the_ask: str, root_folder: str, tool_names: list[Any], keep_going_prompt: Callable[[ToolKit], Awaitable[str]], stop_on_no_tool_use: bool, ) -> int: """ Loop through tool requests. Args: the_ask (str): The initial request. root_folder (str): The root folder for file operations. tool_names (list[Any]): The tools to use. keep_going_prompt (str): The prompt to use to keep going. stop_on_no_tool_use (bool): Stop if no tools are used. Returns: None """ if not self.assistant: raise TypeError("Missing assistant before basic_tool_loop") if self.dialog_logger_md: self.dialog_logger_md.add_user(the_ask) self.dialog_logger_md.add_toolkit(tool_names) tool_loops = 0 total_tool_use_count = 0 try: if self.allow_self_certification: tool_names.append("report_text") tool_names = list(set(tool_names)) _, tool_schemas = self.toolkit_factory(root_folder, self.model, tool_names) if not self.toolkit: raise TypeError("Missing toolkit before basic_tool_loop") thread = await self.client.beta.threads.create() logger.info(the_ask) _message = await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=the_ask, ) run = await self.client.beta.threads.runs.create( thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas ) tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md) tool_loops += 1 if tool_loops > self.maximum_loops: raise TypeError("Too many tool loops") total_tool_use_count += tools_used_this_round messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc") initial_bot_response = parse_message(messages) self.dialog_logger_md.add_bot(initial_bot_response) # Did you use any tools? (maybe move this to goal function) # if tools_used_this_round == 0: # initial_user_response = ( # "I see you didn't use any tools. " # "Please list what tools you have available, and if there are some available, " # "why they were not useful." # ) # else: if not self.toolkit: raise TypeError("Missing toolkit before keep_going_prompt") initial_user_response = await keep_going_prompt(self.toolkit) # TODO: make into method. await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=initial_user_response, ) run = await self.client.beta.threads.runs.create(thread_id=thread.id, assistant_id=self.assistant.id) self.dialog_logger_md.add_user(initial_user_response) # "keep going/done" loop done = "NOPE" tools_used_this_round = -1 # TODO: initialize this in constructor if ( self.allow_self_certification and hasattr(self.toolkit, "tool_answer_collector") and self.toolkit.tool_answer_collector ): final_report = self.toolkit.tool_answer_collector.text_answer final_comment = self.toolkit.tool_answer_collector.comment self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}") return total_tool_use_count # Bot has at least 3 ways to stop # - return message of DONE # - use answer tool to submit DONE, or IMPOSSIBLE # - stop using tools if tools_used_this_round == 0 and stop_on_no_tool_use: logger.info("No tools used this round, conversation will end.") while done != self.conversation_over_marker or (tools_used_this_round == 0 and stop_on_no_tool_use): tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md) # Did we use any tools total_tool_use_count += tools_used_this_round # infinite loop protection tool_loops += 1 if tool_loops > self.maximum_loops: raise TypeError("Too many tool loops") messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc") successive_response = parse_message(messages) self.dialog_logger_md.add_bot(successive_response) if self.allow_self_certification: # TODO: move this to goal checker? done = capture_done_message(messages, self.conversation_over_marker) if done == self.conversation_over_marker: break # Did bot use tool to submit final report. Wow. Can't trust all bots. if ( self.allow_self_certification and hasattr(self.toolkit, "tool_answer_collector") and self.toolkit.tool_answer_collector ): final_report = str(self.toolkit.tool_answer_collector.text_answer).upper().strip() final_comment = self.toolkit.tool_answer_collector.comment self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}") break if done != self.conversation_over_marker: # Replace with 2nd bot? keep_going_text = await keep_going_prompt(self.toolkit) # This is *not* self certification if keep_going_text == self.conversation_over_marker: # The bot did a good job and we can certify that. break self.dialog_logger_md.add_user(keep_going_text) logger.info(keep_going_text) await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=keep_going_text, ) run = await self.client.beta.threads.runs.create( thread_id=thread.id, assistant_id=self.assistant.id ) except Exception as exception: self.dialog_logger_md.add_error(exception) raise finally: # clean up thread if self.thread and not self.persist_threads: await self.client.beta.threads.delete(self.thread.id) if self.assistant and not self.persist_bots: # clean up assistant await self.client.beta.assistants.delete(self.assistant.id) return total_tool_use_count
Instance variables
var allow_self_certification
-
Do you want to trust the bot when it says it has achieved the goal?
var bot_instructions
-
Model, name and instructions uniquely identify a bot.
var config
-
Stores bot, thread config and other global config.
var conversation_over_marker
-
Goal function checker returns this when done.
var dialog_logger_md
-
Conversation style logger
var maximum_loops
-
Prevent infinite loops and money wastage.
var model
-
Model, name and instructions uniquely identify a bot.
var name
-
Model, name and instructions uniquely identify a bot.
var persist_bots
-
Keep bots or attempt to delete them at end of session
var persist_threads
-
Keep thread or attempt to delete them
var toolkit
-
Reference to toolkit so that goal checkers can check if any tools were used.
Methods
async def basic_tool_loop(self, the_ask: str, root_folder: str, tool_names: list[typing.Any], keep_going_prompt: collections.abc.Callable[[ToolKit], collections.abc.Awaitable[str]], stop_on_no_tool_use: bool) ‑> int
-
Loop through tool requests.
Args
the_ask
:str
- The initial request.
root_folder
:str
- The root folder for file operations.
tool_names
:list[Any]
- The tools to use.
keep_going_prompt
:str
- The prompt to use to keep going.
stop_on_no_tool_use
:bool
- Stop if no tools are used.
Returns
None
Expand source code
async def basic_tool_loop( self, the_ask: str, root_folder: str, tool_names: list[Any], keep_going_prompt: Callable[[ToolKit], Awaitable[str]], stop_on_no_tool_use: bool, ) -> int: """ Loop through tool requests. Args: the_ask (str): The initial request. root_folder (str): The root folder for file operations. tool_names (list[Any]): The tools to use. keep_going_prompt (str): The prompt to use to keep going. stop_on_no_tool_use (bool): Stop if no tools are used. Returns: None """ if not self.assistant: raise TypeError("Missing assistant before basic_tool_loop") if self.dialog_logger_md: self.dialog_logger_md.add_user(the_ask) self.dialog_logger_md.add_toolkit(tool_names) tool_loops = 0 total_tool_use_count = 0 try: if self.allow_self_certification: tool_names.append("report_text") tool_names = list(set(tool_names)) _, tool_schemas = self.toolkit_factory(root_folder, self.model, tool_names) if not self.toolkit: raise TypeError("Missing toolkit before basic_tool_loop") thread = await self.client.beta.threads.create() logger.info(the_ask) _message = await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=the_ask, ) run = await self.client.beta.threads.runs.create( thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas ) tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md) tool_loops += 1 if tool_loops > self.maximum_loops: raise TypeError("Too many tool loops") total_tool_use_count += tools_used_this_round messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc") initial_bot_response = parse_message(messages) self.dialog_logger_md.add_bot(initial_bot_response) # Did you use any tools? (maybe move this to goal function) # if tools_used_this_round == 0: # initial_user_response = ( # "I see you didn't use any tools. " # "Please list what tools you have available, and if there are some available, " # "why they were not useful." # ) # else: if not self.toolkit: raise TypeError("Missing toolkit before keep_going_prompt") initial_user_response = await keep_going_prompt(self.toolkit) # TODO: make into method. await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=initial_user_response, ) run = await self.client.beta.threads.runs.create(thread_id=thread.id, assistant_id=self.assistant.id) self.dialog_logger_md.add_user(initial_user_response) # "keep going/done" loop done = "NOPE" tools_used_this_round = -1 # TODO: initialize this in constructor if ( self.allow_self_certification and hasattr(self.toolkit, "tool_answer_collector") and self.toolkit.tool_answer_collector ): final_report = self.toolkit.tool_answer_collector.text_answer final_comment = self.toolkit.tool_answer_collector.comment self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}") return total_tool_use_count # Bot has at least 3 ways to stop # - return message of DONE # - use answer tool to submit DONE, or IMPOSSIBLE # - stop using tools if tools_used_this_round == 0 and stop_on_no_tool_use: logger.info("No tools used this round, conversation will end.") while done != self.conversation_over_marker or (tools_used_this_round == 0 and stop_on_no_tool_use): tools_used_this_round = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md) # Did we use any tools total_tool_use_count += tools_used_this_round # infinite loop protection tool_loops += 1 if tool_loops > self.maximum_loops: raise TypeError("Too many tool loops") messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc") successive_response = parse_message(messages) self.dialog_logger_md.add_bot(successive_response) if self.allow_self_certification: # TODO: move this to goal checker? done = capture_done_message(messages, self.conversation_over_marker) if done == self.conversation_over_marker: break # Did bot use tool to submit final report. Wow. Can't trust all bots. if ( self.allow_self_certification and hasattr(self.toolkit, "tool_answer_collector") and self.toolkit.tool_answer_collector ): final_report = str(self.toolkit.tool_answer_collector.text_answer).upper().strip() final_comment = self.toolkit.tool_answer_collector.comment self.dialog_logger_md.add_bot(f"Final word: {final_report}, {final_comment}") break if done != self.conversation_over_marker: # Replace with 2nd bot? keep_going_text = await keep_going_prompt(self.toolkit) # This is *not* self certification if keep_going_text == self.conversation_over_marker: # The bot did a good job and we can certify that. break self.dialog_logger_md.add_user(keep_going_text) logger.info(keep_going_text) await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=keep_going_text, ) run = await self.client.beta.threads.runs.create( thread_id=thread.id, assistant_id=self.assistant.id ) except Exception as exception: self.dialog_logger_md.add_error(exception) raise finally: # clean up thread if self.thread and not self.persist_threads: await self.client.beta.threads.delete(self.thread.id) if self.assistant and not self.persist_bots: # clean up assistant await self.client.beta.assistants.delete(self.assistant.id) return total_tool_use_count
async def create_bot(self)
-
Create a bot and store it in the config.
Expand source code
async def create_bot(self): """Create a bot and store it in the config.""" self.assistant = await self.client.beta.assistants.create( name=self.name, instructions=self.bot_instructions, model=self.model, ) self.config.add_bot(self.assistant.id, self.name) logger.debug(f"Assistant created: {self.assistant.id}")
async def get_create_bot(self) ‑> openai.types.beta.assistant.Assistant
-
Get or create a bot and store it in the config.
Expand source code
async def get_create_bot(self) -> Assistant: """Get or create a bot and store it in the config.""" current_bot = self.config.get_bot(self.name) if not current_bot: await self.create_bot() else: try: self.assistant = await self.client.beta.assistants.retrieve(current_bot.assistant_id) logger.debug(f"Assistant retrieved: {self.assistant.id}") except openai.NotFoundError: await self.create_bot() if not self.assistant: raise TypeError("Assistant not found or created.") logger.debug(f"Assistant id: {self.assistant.id}") return self.assistant
async def initialize(self) ‑> None
-
Get or create a bot and store it in the config.
Expand source code
async def initialize(self) -> None: """Get or create a bot and store it in the config.""" bot = await self.get_create_bot() logger.debug(f"Assistant id: {bot.id}") self.assistant = bot self.dialog_logger_md.write_header(bot_name=self.name, model=self.model, bot_instructions=self.bot_instructions)
async def one_shot_ask(self, the_ask: str) ‑> Any
-
Free-form request, structured response.
Args
the_ask
:str
- The request.
Returns
Any
- The response.
Expand source code
async def one_shot_ask(self, the_ask: str) -> Any: """Free-form request, structured response. Args: the_ask (str): The request. Returns: Any: The response. """ if not self.toolkit: raise TypeError("Missing toolkit before one_shot_ask") if not self.assistant: raise TypeError("Missing assistant before one_shot_ask") try: _, tool_schemas = self.toolkit_factory( ".", self.model, [ # "report_bool", "report_dict", "report_float", "report_int", # "report_json", "report_list", # "report_set", # "report_text", Why? Just do an unstructured query. # "report_toml", # "report_tuple", # "report_xml", ], ) thread = await self.client.beta.threads.create() logger.info(the_ask) _message = await self.client.beta.threads.messages.create( thread_id=thread.id, role="user", content=the_ask, ) # pydantic_tools =[run_create_params.Tool(_) for _ in tool_schemas] run = await self.client.beta.threads.runs.create( thread_id=thread.id, assistant_id=self.assistant.id, tools=tool_schemas ) tool_use_count = await loop_tools(self.client, self.toolkit, run, thread, self.dialog_logger_md) if tool_use_count == 0: logger.warning("No tool usage, something went wrong.") messages = await self.client.beta.threads.messages.list(thread_id=thread.id, order="desc") # logger.info(messages) last_words = parse_message(messages) self.dialog_logger_md.add_bot(last_words) except Exception as exception: self.dialog_logger_md.add_error(exception) raise finally: # clean up thread if self.thread and not self.persist_threads: await self.client.beta.threads.delete(self.thread.id) if self.assistant and not self.persist_bots: # clean up assistant await self.client.beta.assistants.delete(self.assistant.id)
def toolkit_factory(self, root_folder: str, model: str, tool_names: list[str]) ‑> tuple[ToolKit, list[openai.types.beta.threads.run_create_params.ToolAssistantToolsCode | openai.types.beta.threads.run_create_params.ToolAssistantToolsRetrieval | openai.types.beta.threads.run_create_params.ToolAssistantToolsFunction]]
-
Expand source code
def toolkit_factory( self, root_folder: str, model: str, tool_names: list[str] ) -> tuple[ToolKit, list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction]]: self.toolkit = ToolKit(root_folder, model, 500, permitted_tools=tool_names, config=self.config) # sync COM self.toolkit.conversation_over_marker = self.conversation_over_marker initialize_all_tools(keeps=tool_names) tools_schema: list[ToolAssistantToolsCode | ToolAssistantToolsRetrieval | ToolAssistantToolsFunction] = [ ToolAssistantToolsFunction(**{"function": cast(FunctionDefinition, schema), "type": "function"}) for schema in ALL_TOOLS ] if not tools_schema: raise Exception("Not enough tools!") return self.toolkit, tools_schema
class TodoTool (root_folder: str, config: Config)
-
Keep track of tasks.
Initialize the TodoTool with a root folder.
Args
root_folder
:str
- The root folder for valid files.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class TodoTool: """Keep track of tasks.""" def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the TodoTool with a root folder. Args: root_folder (str): The root folder for valid files. config (Config): The developer input that bot shouldn't set. """ self.root_folder: str = root_folder self.config = config self.roles = config.get_list("todo_roles") self.task_manager = ai_todo.TaskManager(self.root_folder, self.roles) @log() def add_todo( self, title: str, description: str, category: str, source_code_ref: str, assignee: Optional[str] = None ) -> str: """ Adds a new task to the task manager. Args: title (str): The title of the task. description (str): A description of the task. category (str): The category of the task (e.g., 'bug', 'feature'). source_code_ref (str): Reference to the source code related to the task. assignee (str, optional): The name of the assignee. Defaults to None. Returns: str: A confirmation message indicating successful addition of the task. """ self.task_manager.add_task(title, description, category, source_code_ref, assignee) summary = self.task_manager.get_stats() return f"Successful added task {title}\n{summary}" @log() def remove_todo(self, title: str) -> str: """ Marks a task as finished based on its title. Args: title (str): The title of the task to be marked as finished. Returns: str: A confirmation message indicating the task was successfully marked as finished. """ self.task_manager.finish_task(title) summary = self.task_manager.get_stats() return f"Successful removed task {title}\n{summary}" @log() def query_todos_by_regex(self, regex_pattern: str = r"[\s\S]+") -> str: r""" Queries tasks by a keyword in their title, using a regular expression pattern. Args: regex_pattern (str, optional): The regular expression pattern to match in task titles. Defaults to "[\s\S]+", which matches any title. Returns: str: The rendered Markdown string of tasks matching the given pattern. """ return self.task_manager.query_by_title_keyword(regex_pattern) @log() def query_todos_by_assignee(self, assignee_name: str) -> str: """ Queries tasks assigned to a specific assignee. Currently, the assignee is hard-coded as 'Developer'. Args: assignee_name (str): The name of the assignee to query tasks for. Returns: str: The rendered Markdown string of tasks assigned to the specified assignee. """ return self.task_manager.query_by_assignee(assignee_name) @log() def list_valid_assignees( self, ) -> list[str]: """ Lists the valid assignees for tasks. Returns: list[str]: The rendered Markdown string of valid assignees. """ return self.task_manager.valid_assignees
Methods
def add_todo(self, title: str, description: str, category: str, source_code_ref: str, assignee: Optional[str] = None) ‑> str
-
Adds a new task to the task manager.
Args
title
:str
- The title of the task.
description
:str
- A description of the task.
category
:str
- The category of the task (e.g., 'bug', 'feature').
source_code_ref
:str
- Reference to the source code related to the task.
assignee
:str
, optional- The name of the assignee. Defaults to None.
Returns
str
- A confirmation message indicating successful addition of the task.
Expand source code
@log() def add_todo( self, title: str, description: str, category: str, source_code_ref: str, assignee: Optional[str] = None ) -> str: """ Adds a new task to the task manager. Args: title (str): The title of the task. description (str): A description of the task. category (str): The category of the task (e.g., 'bug', 'feature'). source_code_ref (str): Reference to the source code related to the task. assignee (str, optional): The name of the assignee. Defaults to None. Returns: str: A confirmation message indicating successful addition of the task. """ self.task_manager.add_task(title, description, category, source_code_ref, assignee) summary = self.task_manager.get_stats() return f"Successful added task {title}\n{summary}"
def list_valid_assignees(self) ‑> list[str]
-
Lists the valid assignees for tasks.
Returns
list[str]
- The rendered Markdown string of valid assignees.
Expand source code
@log() def list_valid_assignees( self, ) -> list[str]: """ Lists the valid assignees for tasks. Returns: list[str]: The rendered Markdown string of valid assignees. """ return self.task_manager.valid_assignees
def query_todos_by_assignee(self, assignee_name: str) ‑> str
-
Queries tasks assigned to a specific assignee. Currently, the assignee is hard-coded as 'Developer'.
Args
assignee_name
:str
- The name of the assignee to query tasks for.
Returns
str
- The rendered Markdown string of tasks assigned to the specified assignee.
Expand source code
@log() def query_todos_by_assignee(self, assignee_name: str) -> str: """ Queries tasks assigned to a specific assignee. Currently, the assignee is hard-coded as 'Developer'. Args: assignee_name (str): The name of the assignee to query tasks for. Returns: str: The rendered Markdown string of tasks assigned to the specified assignee. """ return self.task_manager.query_by_assignee(assignee_name)
def query_todos_by_regex(self, regex_pattern: str = '[\\s\\S]+') ‑> str
-
Queries tasks by a keyword in their title, using a regular expression pattern.
Args
regex_pattern
:str
, optional- The regular expression pattern to match in task titles. Defaults to "[\s\S]+", which matches any title.
Returns
str
- The rendered Markdown string of tasks matching the given pattern.
Expand source code
@log() def query_todos_by_regex(self, regex_pattern: str = r"[\s\S]+") -> str: r""" Queries tasks by a keyword in their title, using a regular expression pattern. Args: regex_pattern (str, optional): The regular expression pattern to match in task titles. Defaults to "[\s\S]+", which matches any title. Returns: str: The rendered Markdown string of tasks matching the given pattern. """ return self.task_manager.query_by_title_keyword(regex_pattern)
def remove_todo(self, title: str) ‑> str
-
Marks a task as finished based on its title.
Args
title
:str
- The title of the task to be marked as finished.
Returns
str
- A confirmation message indicating the task was successfully marked as finished.
Expand source code
@log() def remove_todo(self, title: str) -> str: """ Marks a task as finished based on its title. Args: title (str): The title of the task to be marked as finished. Returns: str: A confirmation message indicating the task was successfully marked as finished. """ self.task_manager.finish_task(title) summary = self.task_manager.get_stats() return f"Successful removed task {title}\n{summary}"
class TokenCounterTool (root_folder: str, config: Config)
-
Count the number of tokens in a string.
Initialize the FindTool class.
Args
root_folder
:str
- The root folder path for file operations.
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class TokenCounterTool: """Count the number of tokens in a string.""" def __init__(self, root_folder: str, config: Config) -> None: """ Initialize the FindTool class. Args: root_folder (str): The root folder path for file operations. config (Config): The developer input that bot shouldn't set. """ self.root_folder = root_folder self.config = config model = config.get_value("token_model") if not model: raise ValueError("token_model must be set in the config") self.token_model = model def count_tokens(self, text: str) -> int: """Count the number of tokens in a string. Args: text (str): The text to count the tokens in. Returns: int: The number of tokens. """ if not text: return 0 # gpt3 turbo - cl100k_base # gpt2 (or r50k_base) Most GPT-3 models # p50k_base Code models, text-davinci-002, text-davinci-003 # cl100k_base text-embedding-ada-002 # enc = tiktoken.get_encoding("cl100k_base") encoding = tiktoken.encoding_for_model(self.token_model) tokens = encoding.encode(text) token_count = len(tokens) return token_count
Methods
def count_tokens(self, text: str) ‑> int
-
Count the number of tokens in a string.
Args
text
:str
- The text to count the tokens in.
Returns
int
- The number of tokens.
Expand source code
def count_tokens(self, text: str) -> int: """Count the number of tokens in a string. Args: text (str): The text to count the tokens in. Returns: int: The number of tokens. """ if not text: return 0 # gpt3 turbo - cl100k_base # gpt2 (or r50k_base) Most GPT-3 models # p50k_base Code models, text-davinci-002, text-davinci-003 # cl100k_base text-embedding-ada-002 # enc = tiktoken.get_encoding("cl100k_base") encoding = tiktoken.encoding_for_model(self.token_model) tokens = encoding.encode(text) token_count = len(tokens) return token_count
class ToolKit (root_folder: str, token_model: str, global_max_lines: int, permitted_tools: list[str], config: Config)
-
AI Shell Toolkit
Initialize the ToolKitBase class.
Args
root_folder
:str
- The root folder path for file operations.
token_model
:str
- The token model to use for the toolkit
global_max_lines
:int
- The global max lines to use for the toolkit
permitted_tools
:list[str]
- The permitted tools for the toolkit
config
:Config
- The developer input that bot shouldn't set.
Expand source code
class ToolKit(ToolKitBase): """AI Shell Toolkit""" def __init__( self, root_folder: str, token_model: str, global_max_lines: int, permitted_tools: list[str], config: Config ) -> None: super().__init__(root_folder, token_model, global_max_lines, permitted_tools, config) self._lookup: dict[str, Callable[[dict[str, Any]], Any]] = { "report_bool": self.report_bool, "report_dict": self.report_dict, "report_float": self.report_float, "report_int": self.report_int, "report_json": self.report_json, "report_list": self.report_list, "report_set": self.report_set, "report_text": self.report_text, "report_toml": self.report_toml, "report_tuple": self.report_tuple, "report_xml": self.report_xml, "cat": self.cat, "cat_markdown": self.cat_markdown, "cut_characters": self.cut_characters, "cut_fields": self.cut_fields, "cut_fields_by_name": self.cut_fields_by_name, "ed": self.ed, "edlin": self.edlin, "find_files": self.find_files, "find_files_markdown": self.find_files_markdown, "get_current_branch": self.get_current_branch, "get_recent_commits": self.get_recent_commits, "git_diff": self.git_diff, "git_diff_commit": self.git_diff_commit, "git_log_file": self.git_log_file, "git_log_search": self.git_log_search, "git_show": self.git_show, "git_status": self.git_status, "is_ignored_by_gitignore": self.is_ignored_by_gitignore, "grep": self.grep, "grep_markdown": self.grep_markdown, "head": self.head, "head_markdown": self.head_markdown, "head_tail": self.head_tail, "tail": self.tail, "tail_markdown": self.tail_markdown, "insert_text_after_context": self.insert_text_after_context, "insert_text_after_multiline_context": self.insert_text_after_multiline_context, "insert_text_at_start_or_end": self.insert_text_at_start_or_end, "ls": self.ls, "ls_markdown": self.ls_markdown, "apply_git_patch": self.apply_git_patch, "format_code_as_markdown": self.format_code_as_markdown, "pytest": self.pytest, "replace_all": self.replace_all, "replace_line_by_line": self.replace_line_by_line, "replace_with_regex": self.replace_with_regex, "rewrite_file": self.rewrite_file, "write_new_file": self.write_new_file, "sed": self.sed, "add_todo": self.add_todo, "list_valid_assignees": self.list_valid_assignees, "query_todos_by_assignee": self.query_todos_by_assignee, "query_todos_by_regex": self.query_todos_by_regex, "remove_todo": self.remove_todo, "count_tokens": self.count_tokens, } # Stateful tool support. Useless assignment to make mypy happy self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) def report_bool(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( bool, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_bool(answer=answer, comment=comment) def report_dict(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( Any, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_dict(answer=answer, comment=comment) def report_float(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( float, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_float(answer=answer, comment=comment) def report_int(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( int, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_int(answer=answer, comment=comment) def report_json(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_json(answer=answer, comment=comment) def report_list(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_list(answer=answer, comment=comment) def report_set(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( list[Any], arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_set(answer=answer, comment=comment) def report_text(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_text(answer=answer, comment=comment) def report_toml(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_toml(answer=answer, comment=comment) def report_tuple(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( list[Any], arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_tuple(answer=answer, comment=comment) def report_xml(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_xml(answer=answer, comment=comment) def cat(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CatTool(self.root_folder, self.config) file_paths = cast( str, arguments.get( "file_paths", ), ) number_lines = cast(bool, arguments.get("number_lines", True)) squeeze_blank = cast(bool, arguments.get("squeeze_blank", False)) return tool.cat(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank) def cat_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CatTool(self.root_folder, self.config) file_paths = cast( str, arguments.get( "file_paths", ), ) number_lines = cast(bool, arguments.get("number_lines", True)) squeeze_blank = cast(bool, arguments.get("squeeze_blank", False)) return tool.cat_markdown(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank) def cut_characters(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CutTool(self.root_folder, self.config) character_ranges = cast( str, arguments.get( "character_ranges", ), ) file_path = cast( str, arguments.get( "file_path", ), ) return tool.cut_characters(character_ranges=character_ranges, file_path=file_path) def cut_fields(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CutTool(self.root_folder, self.config) delimiter = cast(str, arguments.get("delimiter", ",")) field_ranges = cast( str, arguments.get( "field_ranges", ), ) filename = cast( str, arguments.get( "filename", ), ) return tool.cut_fields(delimiter=delimiter, field_ranges=field_ranges, filename=filename) def cut_fields_by_name(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CutTool(self.root_folder, self.config) delimiter = cast(str, arguments.get("delimiter", ",")) field_names = cast( str, arguments.get( "field_names", ), ) filename = cast( str, arguments.get( "filename", ), ) return tool.cut_fields_by_name(delimiter=delimiter, field_names=field_names, filename=filename) def ed(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = EdTool(self.root_folder, self.config) file_name = cast( str, arguments.get( "file_name", ), ) script = cast( str, arguments.get( "script", ), ) return tool.ed(file_name=file_name, script=script) def edlin(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = EdlinTool(self.root_folder, self.config) file_name = cast( str, arguments.get( "file_name", ), ) script = cast( str, arguments.get( "script", ), ) return tool.edlin(file_name=file_name, script=script) def find_files(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = FindTool(self.root_folder, self.config) file_type = cast( Optional[str], arguments.get( "file_type", ), ) name = cast( Optional[str], arguments.get( "name", ), ) regex = cast( Optional[str], arguments.get( "regex", ), ) size = cast( Optional[str], arguments.get( "size", ), ) return tool.find_files(file_type=file_type, name=name, regex=regex, size=size) def find_files_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = FindTool(self.root_folder, self.config) file_type = cast( Optional[str], arguments.get( "file_type", ), ) name = cast( Optional[str], arguments.get( "name", ), ) regex = cast( Optional[str], arguments.get( "regex", ), ) size = cast( Optional[str], arguments.get( "size", ), ) return tool.find_files_markdown(file_type=file_type, name=name, regex=regex, size=size) def get_current_branch(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.get_current_branch() def get_recent_commits(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) n = cast(int, arguments.get("n", 10)) short_hash = cast(bool, arguments.get("short_hash", False)) return tool.get_recent_commits(n=n, short_hash=short_hash) def git_diff(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.git_diff() def git_diff_commit(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) commit1 = cast( str, arguments.get( "commit1", ), ) commit2 = cast( str, arguments.get( "commit2", ), ) return tool.git_diff_commit(commit1=commit1, commit2=commit2) def git_log_file(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) filename = cast( str, arguments.get( "filename", ), ) return tool.git_log_file(filename=filename) def git_log_search(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) search_string = cast( str, arguments.get( "search_string", ), ) return tool.git_log_search(search_string=search_string) def git_show(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.git_show() def git_status(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.git_status() def is_ignored_by_gitignore(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) gitignore_path = cast(str, arguments.get("gitignore_path", ".gitignore")) return tool.is_ignored_by_gitignore(file_path=file_path, gitignore_path=gitignore_path) def grep(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GrepTool(self.root_folder, self.config) glob_pattern = cast( str, arguments.get( "glob_pattern", ), ) maximum_matches_per_file = cast(int, arguments.get("maximum_matches_per_file", -1)) maximum_matches_total = cast(int, arguments.get("maximum_matches_total", -1)) regex = cast( str, arguments.get( "regex", ), ) skip_first_matches = cast(int, arguments.get("skip_first_matches", -1)) return tool.grep( glob_pattern=glob_pattern, maximum_matches_per_file=maximum_matches_per_file, maximum_matches_total=maximum_matches_total, regex=regex, skip_first_matches=skip_first_matches, ) def grep_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GrepTool(self.root_folder, self.config) glob_pattern = cast( str, arguments.get( "glob_pattern", ), ) maximum_matches = cast(int, arguments.get("maximum_matches", -1)) regex = cast( str, arguments.get( "regex", ), ) skip_first_matches = cast(int, arguments.get("skip_first_matches", -1)) return tool.grep_markdown( glob_pattern=glob_pattern, maximum_matches=maximum_matches, regex=regex, skip_first_matches=skip_first_matches, ) def head(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) byte_count = cast( Optional[int], arguments.get( "byte_count", ), ) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.head(byte_count=byte_count, file_path=file_path, lines=lines) def head_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.head_markdown(file_path=file_path, lines=lines) def head_tail(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) byte_count = cast( Optional[int], arguments.get( "byte_count", ), ) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) mode = cast(str, arguments.get("mode", "head")) return tool.head_tail(byte_count=byte_count, file_path=file_path, lines=lines, mode=mode) def tail(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) byte_count = cast( Optional[int], arguments.get( "byte_count", ), ) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.tail(byte_count=byte_count, file_path=file_path, lines=lines) def tail_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.tail_markdown(file_path=file_path, lines=lines) def insert_text_after_context(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = InsertTool(self.root_folder, self.config) context = cast( str, arguments.get( "context", ), ) file_path = cast( str, arguments.get( "file_path", ), ) text_to_insert = cast( str, arguments.get( "text_to_insert", ), ) return tool.insert_text_after_context(context=context, file_path=file_path, text_to_insert=text_to_insert) def insert_text_after_multiline_context(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = InsertTool(self.root_folder, self.config) context_lines = cast( str, arguments.get( "context_lines", ), ) file_path = cast( str, arguments.get( "file_path", ), ) text_to_insert = cast( str, arguments.get( "text_to_insert", ), ) return tool.insert_text_after_multiline_context( context_lines=context_lines, file_path=file_path, text_to_insert=text_to_insert ) def insert_text_at_start_or_end(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = InsertTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) position = cast(str, arguments.get("position", "end")) text_to_insert = cast( str, arguments.get( "text_to_insert", ), ) return tool.insert_text_at_start_or_end(file_path=file_path, position=position, text_to_insert=text_to_insert) def ls(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = LsTool(self.root_folder, self.config) all_files = cast(bool, arguments.get("all_files", False)) long = cast(bool, arguments.get("long", False)) path = cast( Optional[str], arguments.get( "path", ), ) return tool.ls(all_files=all_files, long=long, path=path) def ls_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = LsTool(self.root_folder, self.config) all_files = cast(bool, arguments.get("all_files", False)) long = cast(bool, arguments.get("long", False)) path = cast(Optional[str], arguments.get("path", ".")) return tool.ls_markdown(all_files=all_files, long=long, path=path) def apply_git_patch(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = PatchTool(self.root_folder, self.config) patch_content = cast( str, arguments.get( "patch_content", ), ) return tool.apply_git_patch(patch_content=patch_content) def format_code_as_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = PyCatTool(self.root_folder, self.config) base_path = cast( str, arguments.get( "base_path", ), ) header = cast( str, arguments.get( "header", ), ) no_comments = cast(bool, arguments.get("no_comments", False)) no_docs = cast(bool, arguments.get("no_docs", False)) return tool.format_code_as_markdown( base_path=base_path, header=header, no_comments=no_comments, no_docs=no_docs ) def pytest(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = PytestTool(self.root_folder, self.config) return tool.pytest() def replace_all(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = ReplaceTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) new_text = cast( str, arguments.get( "new_text", ), ) old_text = cast( str, arguments.get( "old_text", ), ) return tool.replace_all(file_path=file_path, new_text=new_text, old_text=old_text) def replace_line_by_line(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = ReplaceTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) line_end = cast(int, arguments.get("line_end", -1)) line_start = cast(int, arguments.get("line_start", 0)) new_text = cast( str, arguments.get( "new_text", ), ) old_text = cast( str, arguments.get( "old_text", ), ) return tool.replace_line_by_line( file_path=file_path, line_end=line_end, line_start=line_start, new_text=new_text, old_text=old_text ) def replace_with_regex(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = ReplaceTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) regex_match_expression = cast( str, arguments.get( "regex_match_expression", ), ) replacement = cast( str, arguments.get( "replacement", ), ) return tool.replace_with_regex( file_path=file_path, regex_match_expression=regex_match_expression, replacement=replacement ) def rewrite_file(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = RewriteTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) text = cast( str, arguments.get( "text", ), ) return tool.rewrite_file(file_path=file_path, text=text) def write_new_file(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = RewriteTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) text = cast( str, arguments.get( "text", ), ) return tool.write_new_file(file_path=file_path, text=text) def sed(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = SedTool(self.root_folder, self.config) commands = cast( str, arguments.get( "commands", ), ) file_path = cast( str, arguments.get( "file_path", ), ) return tool.sed(commands=commands, file_path=file_path) def add_todo(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) assignee = cast( Optional[str], arguments.get( "assignee", ), ) category = cast( str, arguments.get( "category", ), ) description = cast( str, arguments.get( "description", ), ) source_code_ref = cast( str, arguments.get( "source_code_ref", ), ) title = cast( str, arguments.get( "title", ), ) return tool.add_todo( assignee=assignee, category=category, description=description, source_code_ref=source_code_ref, title=title ) def list_valid_assignees(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) return tool.list_valid_assignees() def query_todos_by_assignee(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) assignee_name = cast( str, arguments.get( "assignee_name", ), ) return tool.query_todos_by_assignee(assignee_name=assignee_name) def query_todos_by_regex(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) regex_pattern = cast(str, arguments.get("regex_pattern", r"[\s\S]+")) return tool.query_todos_by_regex(regex_pattern=regex_pattern) def remove_todo(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) title = cast( str, arguments.get( "title", ), ) return tool.remove_todo(title=title) def count_tokens(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TokenCounterTool(self.root_folder, self.config) text = cast( str, arguments.get( "text", ), ) return tool.count_tokens(text=text)
Ancestors
Methods
def add_todo(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def add_todo(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) assignee = cast( Optional[str], arguments.get( "assignee", ), ) category = cast( str, arguments.get( "category", ), ) description = cast( str, arguments.get( "description", ), ) source_code_ref = cast( str, arguments.get( "source_code_ref", ), ) title = cast( str, arguments.get( "title", ), ) return tool.add_todo( assignee=assignee, category=category, description=description, source_code_ref=source_code_ref, title=title )
def apply_git_patch(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def apply_git_patch(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = PatchTool(self.root_folder, self.config) patch_content = cast( str, arguments.get( "patch_content", ), ) return tool.apply_git_patch(patch_content=patch_content)
def cat(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def cat(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CatTool(self.root_folder, self.config) file_paths = cast( str, arguments.get( "file_paths", ), ) number_lines = cast(bool, arguments.get("number_lines", True)) squeeze_blank = cast(bool, arguments.get("squeeze_blank", False)) return tool.cat(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank)
def cat_markdown(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def cat_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CatTool(self.root_folder, self.config) file_paths = cast( str, arguments.get( "file_paths", ), ) number_lines = cast(bool, arguments.get("number_lines", True)) squeeze_blank = cast(bool, arguments.get("squeeze_blank", False)) return tool.cat_markdown(file_paths=file_paths, number_lines=number_lines, squeeze_blank=squeeze_blank)
def count_tokens(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def count_tokens(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TokenCounterTool(self.root_folder, self.config) text = cast( str, arguments.get( "text", ), ) return tool.count_tokens(text=text)
def cut_characters(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def cut_characters(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CutTool(self.root_folder, self.config) character_ranges = cast( str, arguments.get( "character_ranges", ), ) file_path = cast( str, arguments.get( "file_path", ), ) return tool.cut_characters(character_ranges=character_ranges, file_path=file_path)
def cut_fields(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def cut_fields(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CutTool(self.root_folder, self.config) delimiter = cast(str, arguments.get("delimiter", ",")) field_ranges = cast( str, arguments.get( "field_ranges", ), ) filename = cast( str, arguments.get( "filename", ), ) return tool.cut_fields(delimiter=delimiter, field_ranges=field_ranges, filename=filename)
def cut_fields_by_name(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def cut_fields_by_name(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = CutTool(self.root_folder, self.config) delimiter = cast(str, arguments.get("delimiter", ",")) field_names = cast( str, arguments.get( "field_names", ), ) filename = cast( str, arguments.get( "filename", ), ) return tool.cut_fields_by_name(delimiter=delimiter, field_names=field_names, filename=filename)
def ed(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def ed(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = EdTool(self.root_folder, self.config) file_name = cast( str, arguments.get( "file_name", ), ) script = cast( str, arguments.get( "script", ), ) return tool.ed(file_name=file_name, script=script)
def edlin(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def edlin(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = EdlinTool(self.root_folder, self.config) file_name = cast( str, arguments.get( "file_name", ), ) script = cast( str, arguments.get( "script", ), ) return tool.edlin(file_name=file_name, script=script)
def find_files(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def find_files(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = FindTool(self.root_folder, self.config) file_type = cast( Optional[str], arguments.get( "file_type", ), ) name = cast( Optional[str], arguments.get( "name", ), ) regex = cast( Optional[str], arguments.get( "regex", ), ) size = cast( Optional[str], arguments.get( "size", ), ) return tool.find_files(file_type=file_type, name=name, regex=regex, size=size)
def find_files_markdown(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def find_files_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = FindTool(self.root_folder, self.config) file_type = cast( Optional[str], arguments.get( "file_type", ), ) name = cast( Optional[str], arguments.get( "name", ), ) regex = cast( Optional[str], arguments.get( "regex", ), ) size = cast( Optional[str], arguments.get( "size", ), ) return tool.find_files_markdown(file_type=file_type, name=name, regex=regex, size=size)
def format_code_as_markdown(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def format_code_as_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = PyCatTool(self.root_folder, self.config) base_path = cast( str, arguments.get( "base_path", ), ) header = cast( str, arguments.get( "header", ), ) no_comments = cast(bool, arguments.get("no_comments", False)) no_docs = cast(bool, arguments.get("no_docs", False)) return tool.format_code_as_markdown( base_path=base_path, header=header, no_comments=no_comments, no_docs=no_docs )
def get_current_branch(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def get_current_branch(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.get_current_branch()
def get_recent_commits(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def get_recent_commits(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) n = cast(int, arguments.get("n", 10)) short_hash = cast(bool, arguments.get("short_hash", False)) return tool.get_recent_commits(n=n, short_hash=short_hash)
def git_diff(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def git_diff(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.git_diff()
def git_diff_commit(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def git_diff_commit(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) commit1 = cast( str, arguments.get( "commit1", ), ) commit2 = cast( str, arguments.get( "commit2", ), ) return tool.git_diff_commit(commit1=commit1, commit2=commit2)
def git_log_file(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def git_log_file(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) filename = cast( str, arguments.get( "filename", ), ) return tool.git_log_file(filename=filename)
def git_log_search(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def git_log_search(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) search_string = cast( str, arguments.get( "search_string", ), ) return tool.git_log_search(search_string=search_string)
def git_show(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def git_show(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.git_show()
def git_status(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def git_status(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) return tool.git_status()
def grep(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def grep(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GrepTool(self.root_folder, self.config) glob_pattern = cast( str, arguments.get( "glob_pattern", ), ) maximum_matches_per_file = cast(int, arguments.get("maximum_matches_per_file", -1)) maximum_matches_total = cast(int, arguments.get("maximum_matches_total", -1)) regex = cast( str, arguments.get( "regex", ), ) skip_first_matches = cast(int, arguments.get("skip_first_matches", -1)) return tool.grep( glob_pattern=glob_pattern, maximum_matches_per_file=maximum_matches_per_file, maximum_matches_total=maximum_matches_total, regex=regex, skip_first_matches=skip_first_matches, )
def grep_markdown(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def grep_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GrepTool(self.root_folder, self.config) glob_pattern = cast( str, arguments.get( "glob_pattern", ), ) maximum_matches = cast(int, arguments.get("maximum_matches", -1)) regex = cast( str, arguments.get( "regex", ), ) skip_first_matches = cast(int, arguments.get("skip_first_matches", -1)) return tool.grep_markdown( glob_pattern=glob_pattern, maximum_matches=maximum_matches, regex=regex, skip_first_matches=skip_first_matches, )
def head(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def head(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) byte_count = cast( Optional[int], arguments.get( "byte_count", ), ) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.head(byte_count=byte_count, file_path=file_path, lines=lines)
def head_markdown(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def head_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.head_markdown(file_path=file_path, lines=lines)
def head_tail(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def head_tail(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) byte_count = cast( Optional[int], arguments.get( "byte_count", ), ) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) mode = cast(str, arguments.get("mode", "head")) return tool.head_tail(byte_count=byte_count, file_path=file_path, lines=lines, mode=mode)
def insert_text_after_context(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def insert_text_after_context(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = InsertTool(self.root_folder, self.config) context = cast( str, arguments.get( "context", ), ) file_path = cast( str, arguments.get( "file_path", ), ) text_to_insert = cast( str, arguments.get( "text_to_insert", ), ) return tool.insert_text_after_context(context=context, file_path=file_path, text_to_insert=text_to_insert)
def insert_text_after_multiline_context(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def insert_text_after_multiline_context(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = InsertTool(self.root_folder, self.config) context_lines = cast( str, arguments.get( "context_lines", ), ) file_path = cast( str, arguments.get( "file_path", ), ) text_to_insert = cast( str, arguments.get( "text_to_insert", ), ) return tool.insert_text_after_multiline_context( context_lines=context_lines, file_path=file_path, text_to_insert=text_to_insert )
def insert_text_at_start_or_end(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def insert_text_at_start_or_end(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = InsertTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) position = cast(str, arguments.get("position", "end")) text_to_insert = cast( str, arguments.get( "text_to_insert", ), ) return tool.insert_text_at_start_or_end(file_path=file_path, position=position, text_to_insert=text_to_insert)
def is_ignored_by_gitignore(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def is_ignored_by_gitignore(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = GitTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) gitignore_path = cast(str, arguments.get("gitignore_path", ".gitignore")) return tool.is_ignored_by_gitignore(file_path=file_path, gitignore_path=gitignore_path)
def list_valid_assignees(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def list_valid_assignees(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) return tool.list_valid_assignees()
def ls(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def ls(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = LsTool(self.root_folder, self.config) all_files = cast(bool, arguments.get("all_files", False)) long = cast(bool, arguments.get("long", False)) path = cast( Optional[str], arguments.get( "path", ), ) return tool.ls(all_files=all_files, long=long, path=path)
def ls_markdown(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def ls_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = LsTool(self.root_folder, self.config) all_files = cast(bool, arguments.get("all_files", False)) long = cast(bool, arguments.get("long", False)) path = cast(Optional[str], arguments.get("path", ".")) return tool.ls_markdown(all_files=all_files, long=long, path=path)
def pytest(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def pytest(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = PytestTool(self.root_folder, self.config) return tool.pytest()
def query_todos_by_assignee(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def query_todos_by_assignee(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) assignee_name = cast( str, arguments.get( "assignee_name", ), ) return tool.query_todos_by_assignee(assignee_name=assignee_name)
def query_todos_by_regex(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def query_todos_by_regex(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) regex_pattern = cast(str, arguments.get("regex_pattern", r"[\s\S]+")) return tool.query_todos_by_regex(regex_pattern=regex_pattern)
def remove_todo(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def remove_todo(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = TodoTool(self.root_folder, self.config) title = cast( str, arguments.get( "title", ), ) return tool.remove_todo(title=title)
def replace_all(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def replace_all(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = ReplaceTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) new_text = cast( str, arguments.get( "new_text", ), ) old_text = cast( str, arguments.get( "old_text", ), ) return tool.replace_all(file_path=file_path, new_text=new_text, old_text=old_text)
def replace_line_by_line(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def replace_line_by_line(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = ReplaceTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) line_end = cast(int, arguments.get("line_end", -1)) line_start = cast(int, arguments.get("line_start", 0)) new_text = cast( str, arguments.get( "new_text", ), ) old_text = cast( str, arguments.get( "old_text", ), ) return tool.replace_line_by_line( file_path=file_path, line_end=line_end, line_start=line_start, new_text=new_text, old_text=old_text )
def replace_with_regex(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def replace_with_regex(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = ReplaceTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) regex_match_expression = cast( str, arguments.get( "regex_match_expression", ), ) replacement = cast( str, arguments.get( "replacement", ), ) return tool.replace_with_regex( file_path=file_path, regex_match_expression=regex_match_expression, replacement=replacement )
def report_bool(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_bool(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( bool, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_bool(answer=answer, comment=comment)
def report_dict(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_dict(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( Any, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_dict(answer=answer, comment=comment)
def report_float(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_float(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( float, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_float(answer=answer, comment=comment)
def report_int(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_int(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( int, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_int(answer=answer, comment=comment)
def report_json(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_json(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_json(answer=answer, comment=comment)
def report_list(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_list(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_list(answer=answer, comment=comment)
def report_set(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_set(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( list[Any], arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_set(answer=answer, comment=comment)
def report_text(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_text(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_text(answer=answer, comment=comment)
def report_toml(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_toml(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_toml(answer=answer, comment=comment)
def report_tuple(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_tuple(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( list[Any], arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_tuple(answer=answer, comment=comment)
def report_xml(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def report_xml(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" self.tool_answer_collector = AnswerCollectorTool(self.root_folder, self.config) answer = cast( str, arguments.get( "answer", ), ) comment = cast( str, arguments.get( "comment", ), ) return self.tool_answer_collector.report_xml(answer=answer, comment=comment)
def rewrite_file(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def rewrite_file(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = RewriteTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) text = cast( str, arguments.get( "text", ), ) return tool.rewrite_file(file_path=file_path, text=text)
def sed(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def sed(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = SedTool(self.root_folder, self.config) commands = cast( str, arguments.get( "commands", ), ) file_path = cast( str, arguments.get( "file_path", ), ) return tool.sed(commands=commands, file_path=file_path)
def tail(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def tail(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) byte_count = cast( Optional[int], arguments.get( "byte_count", ), ) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.tail(byte_count=byte_count, file_path=file_path, lines=lines)
def tail_markdown(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def tail_markdown(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = HeadTailTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) lines = cast(int, arguments.get("lines", 10)) return tool.tail_markdown(file_path=file_path, lines=lines)
def write_new_file(self, arguments: dict[str, typing.Any]) ‑> Any
-
Generated Do Not Edit
Expand source code
def write_new_file(self, arguments: dict[str, Any]) -> Any: """Generated Do Not Edit""" tool = RewriteTool(self.root_folder, self.config) file_path = cast( str, arguments.get( "file_path", ), ) text = cast( str, arguments.get( "text", ), ) return tool.write_new_file(file_path=file_path, text=text)
Inherited members