Spaces:
Paused
Paused
File size: 17,148 Bytes
2f28b62 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
"""File and directory manipulation tool with sandbox support."""
from collections import defaultdict
from pathlib import Path
from typing import Any, DefaultDict, List, Literal, Optional, get_args
from app.config import config
from app.exceptions import ToolError
from app.tool import BaseTool
from app.tool.base import CLIResult, ToolResult
from app.tool.file_operators import (
FileOperator,
LocalFileOperator,
PathLike,
SandboxFileOperator,
)
Command = Literal[
"view",
"create",
"str_replace",
"insert",
"undo_edit",
]
# Constants
SNIPPET_LINES: int = 4
MAX_RESPONSE_LEN: int = 16000
TRUNCATED_MESSAGE: str = (
"<response clipped><NOTE>To save on context only part of this file has been shown to you. "
"You should retry this tool after you have searched inside the file with `grep -n` "
"in order to find the line numbers of what you are looking for.</NOTE>"
)
# Tool description
_STR_REPLACE_EDITOR_DESCRIPTION = """Custom editing tool for viewing, creating and editing files
* State is persistent across command calls and discussions with the user
* If `path` is a file, `view` displays the result of applying `cat -n`. If `path` is a directory, `view` lists non-hidden files and directories up to 2 levels deep
* The `create` command cannot be used if the specified `path` already exists as a file
* If a `command` generates a long output, it will be truncated and marked with `<response clipped>`
* The `undo_edit` command will revert the last edit made to the file at `path`
Notes for using the `str_replace` command:
* The `old_str` parameter should match EXACTLY one or more consecutive lines from the original file. Be mindful of whitespaces!
* If the `old_str` parameter is not unique in the file, the replacement will not be performed. Make sure to include enough context in `old_str` to make it unique
* The `new_str` parameter should contain the edited lines that should replace the `old_str`
"""
def maybe_truncate(
content: str, truncate_after: Optional[int] = MAX_RESPONSE_LEN
) -> str:
"""Truncate content and append a notice if content exceeds the specified length."""
if not truncate_after or len(content) <= truncate_after:
return content
return content[:truncate_after] + TRUNCATED_MESSAGE
class StrReplaceEditor(BaseTool):
"""A tool for viewing, creating, and editing files with sandbox support."""
name: str = "str_replace_editor"
description: str = _STR_REPLACE_EDITOR_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"command": {
"description": "The commands to run. Allowed options are: `view`, `create`, `str_replace`, `insert`, `undo_edit`.",
"enum": ["view", "create", "str_replace", "insert", "undo_edit"],
"type": "string",
},
"path": {
"description": "Absolute path to file or directory.",
"type": "string",
},
"file_text": {
"description": "Required parameter of `create` command, with the content of the file to be created.",
"type": "string",
},
"old_str": {
"description": "Required parameter of `str_replace` command containing the string in `path` to replace.",
"type": "string",
},
"new_str": {
"description": "Optional parameter of `str_replace` command containing the new string (if not given, no string will be added). Required parameter of `insert` command containing the string to insert.",
"type": "string",
},
"insert_line": {
"description": "Required parameter of `insert` command. The `new_str` will be inserted AFTER the line `insert_line` of `path`.",
"type": "integer",
},
"view_range": {
"description": "Optional parameter of `view` command when `path` points to a file. If none is given, the full file is shown. If provided, the file will be shown in the indicated line number range, e.g. [11, 12] will show lines 11 and 12. Indexing at 1 to start. Setting `[start_line, -1]` shows all lines from `start_line` to the end of the file.",
"items": {"type": "integer"},
"type": "array",
},
},
"required": ["command", "path"],
}
_file_history: DefaultDict[PathLike, List[str]] = defaultdict(list)
_local_operator: LocalFileOperator = LocalFileOperator()
_sandbox_operator: SandboxFileOperator = SandboxFileOperator()
# def _get_operator(self, use_sandbox: bool) -> FileOperator:
def _get_operator(self) -> FileOperator:
"""Get the appropriate file operator based on execution mode."""
return (
self._sandbox_operator
if config.sandbox.use_sandbox
else self._local_operator
)
async def execute(
self,
*,
command: Command,
path: str,
file_text: str | None = None,
view_range: list[int] | None = None,
old_str: str | None = None,
new_str: str | None = None,
insert_line: int | None = None,
**kwargs: Any,
) -> str:
"""Execute a file operation command."""
# Get the appropriate file operator
operator = self._get_operator()
# Validate path and command combination
await self.validate_path(command, Path(path), operator)
# Execute the appropriate command
if command == "view":
result = await self.view(path, view_range, operator)
elif command == "create":
if file_text is None:
raise ToolError("Parameter `file_text` is required for command: create")
await operator.write_file(path, file_text)
self._file_history[path].append(file_text)
result = ToolResult(output=f"File created successfully at: {path}")
elif command == "str_replace":
if old_str is None:
raise ToolError(
"Parameter `old_str` is required for command: str_replace"
)
result = await self.str_replace(path, old_str, new_str, operator)
elif command == "insert":
if insert_line is None:
raise ToolError(
"Parameter `insert_line` is required for command: insert"
)
if new_str is None:
raise ToolError("Parameter `new_str` is required for command: insert")
result = await self.insert(path, insert_line, new_str, operator)
elif command == "undo_edit":
result = await self.undo_edit(path, operator)
else:
# This should be caught by type checking, but we include it for safety
raise ToolError(
f'Unrecognized command {command}. The allowed commands for the {self.name} tool are: {", ".join(get_args(Command))}'
)
return str(result)
async def validate_path(
self, command: str, path: Path, operator: FileOperator
) -> None:
"""Validate path and command combination based on execution environment."""
# Check if path is absolute
if not path.is_absolute():
raise ToolError(f"The path {path} is not an absolute path")
# Only check if path exists for non-create commands
if command != "create":
if not await operator.exists(path):
raise ToolError(
f"The path {path} does not exist. Please provide a valid path."
)
# Check if path is a directory
is_dir = await operator.is_directory(path)
if is_dir and command != "view":
raise ToolError(
f"The path {path} is a directory and only the `view` command can be used on directories"
)
# Check if file exists for create command
elif command == "create":
exists = await operator.exists(path)
if exists:
raise ToolError(
f"File already exists at: {path}. Cannot overwrite files using command `create`."
)
async def view(
self,
path: PathLike,
view_range: Optional[List[int]] = None,
operator: FileOperator = None,
) -> CLIResult:
"""Display file or directory content."""
# Determine if path is a directory
is_dir = await operator.is_directory(path)
if is_dir:
# Directory handling
if view_range:
raise ToolError(
"The `view_range` parameter is not allowed when `path` points to a directory."
)
return await self._view_directory(path, operator)
else:
# File handling
return await self._view_file(path, operator, view_range)
@staticmethod
async def _view_directory(path: PathLike, operator: FileOperator) -> CLIResult:
"""Display directory contents."""
find_cmd = f"find {path} -maxdepth 2 -not -path '*/\\.*'"
# Execute command using the operator
returncode, stdout, stderr = await operator.run_command(find_cmd)
if not stderr:
stdout = (
f"Here's the files and directories up to 2 levels deep in {path}, "
f"excluding hidden items:\n{stdout}\n"
)
return CLIResult(output=stdout, error=stderr)
async def _view_file(
self,
path: PathLike,
operator: FileOperator,
view_range: Optional[List[int]] = None,
) -> CLIResult:
"""Display file content, optionally within a specified line range."""
# Read file content
file_content = await operator.read_file(path)
init_line = 1
# Apply view range if specified
if view_range:
if len(view_range) != 2 or not all(isinstance(i, int) for i in view_range):
raise ToolError(
"Invalid `view_range`. It should be a list of two integers."
)
file_lines = file_content.split("\n")
n_lines_file = len(file_lines)
init_line, final_line = view_range
# Validate view range
if init_line < 1 or init_line > n_lines_file:
raise ToolError(
f"Invalid `view_range`: {view_range}. Its first element `{init_line}` should be "
f"within the range of lines of the file: {[1, n_lines_file]}"
)
if final_line > n_lines_file:
raise ToolError(
f"Invalid `view_range`: {view_range}. Its second element `{final_line}` should be "
f"smaller than the number of lines in the file: `{n_lines_file}`"
)
if final_line != -1 and final_line < init_line:
raise ToolError(
f"Invalid `view_range`: {view_range}. Its second element `{final_line}` should be "
f"larger or equal than its first `{init_line}`"
)
# Apply range
if final_line == -1:
file_content = "\n".join(file_lines[init_line - 1 :])
else:
file_content = "\n".join(file_lines[init_line - 1 : final_line])
# Format and return result
return CLIResult(
output=self._make_output(file_content, str(path), init_line=init_line)
)
async def str_replace(
self,
path: PathLike,
old_str: str,
new_str: Optional[str] = None,
operator: FileOperator = None,
) -> CLIResult:
"""Replace a unique string in a file with a new string."""
# Read file content and expand tabs
file_content = (await operator.read_file(path)).expandtabs()
old_str = old_str.expandtabs()
new_str = new_str.expandtabs() if new_str is not None else ""
# Check if old_str is unique in the file
occurrences = file_content.count(old_str)
if occurrences == 0:
raise ToolError(
f"No replacement was performed, old_str `{old_str}` did not appear verbatim in {path}."
)
elif occurrences > 1:
# Find line numbers of occurrences
file_content_lines = file_content.split("\n")
lines = [
idx + 1
for idx, line in enumerate(file_content_lines)
if old_str in line
]
raise ToolError(
f"No replacement was performed. Multiple occurrences of old_str `{old_str}` "
f"in lines {lines}. Please ensure it is unique"
)
# Replace old_str with new_str
new_file_content = file_content.replace(old_str, new_str)
# Write the new content to the file
await operator.write_file(path, new_file_content)
# Save the original content to history
self._file_history[path].append(file_content)
# Create a snippet of the edited section
replacement_line = file_content.split(old_str)[0].count("\n")
start_line = max(0, replacement_line - SNIPPET_LINES)
end_line = replacement_line + SNIPPET_LINES + new_str.count("\n")
snippet = "\n".join(new_file_content.split("\n")[start_line : end_line + 1])
# Prepare the success message
success_msg = f"The file {path} has been edited. "
success_msg += self._make_output(
snippet, f"a snippet of {path}", start_line + 1
)
success_msg += "Review the changes and make sure they are as expected. Edit the file again if necessary."
return CLIResult(output=success_msg)
async def insert(
self,
path: PathLike,
insert_line: int,
new_str: str,
operator: FileOperator = None,
) -> CLIResult:
"""Insert text at a specific line in a file."""
# Read and prepare content
file_text = (await operator.read_file(path)).expandtabs()
new_str = new_str.expandtabs()
file_text_lines = file_text.split("\n")
n_lines_file = len(file_text_lines)
# Validate insert_line
if insert_line < 0 or insert_line > n_lines_file:
raise ToolError(
f"Invalid `insert_line` parameter: {insert_line}. It should be within "
f"the range of lines of the file: {[0, n_lines_file]}"
)
# Perform insertion
new_str_lines = new_str.split("\n")
new_file_text_lines = (
file_text_lines[:insert_line]
+ new_str_lines
+ file_text_lines[insert_line:]
)
# Create a snippet for preview
snippet_lines = (
file_text_lines[max(0, insert_line - SNIPPET_LINES) : insert_line]
+ new_str_lines
+ file_text_lines[insert_line : insert_line + SNIPPET_LINES]
)
# Join lines and write to file
new_file_text = "\n".join(new_file_text_lines)
snippet = "\n".join(snippet_lines)
await operator.write_file(path, new_file_text)
self._file_history[path].append(file_text)
# Prepare success message
success_msg = f"The file {path} has been edited. "
success_msg += self._make_output(
snippet,
"a snippet of the edited file",
max(1, insert_line - SNIPPET_LINES + 1),
)
success_msg += "Review the changes and make sure they are as expected (correct indentation, no duplicate lines, etc). Edit the file again if necessary."
return CLIResult(output=success_msg)
async def undo_edit(
self, path: PathLike, operator: FileOperator = None
) -> CLIResult:
"""Revert the last edit made to a file."""
if not self._file_history[path]:
raise ToolError(f"No edit history found for {path}.")
old_text = self._file_history[path].pop()
await operator.write_file(path, old_text)
return CLIResult(
output=f"Last edit to {path} undone successfully. {self._make_output(old_text, str(path))}"
)
def _make_output(
self,
file_content: str,
file_descriptor: str,
init_line: int = 1,
expand_tabs: bool = True,
) -> str:
"""Format file content for display with line numbers."""
file_content = maybe_truncate(file_content)
if expand_tabs:
file_content = file_content.expandtabs()
# Add line numbers to each line
file_content = "\n".join(
[
f"{i + init_line:6}\t{line}"
for i, line in enumerate(file_content.split("\n"))
]
)
return (
f"Here's the result of running `cat -n` on {file_descriptor}:\n"
+ file_content
+ "\n"
)
|