diff --git a/docs/sdk/agent_tools.mdx b/docs/sdk/agent_tools.mdx
index 859670a9..4ec7af9b 100644
--- a/docs/sdk/agent_tools.mdx
+++ b/docs/sdk/agent_tools.mdx
@@ -545,8 +545,13 @@ async def python(code: str, *, timeout: int = 120) -> str:
-Filesystem
-----------
+FilesystemBase
+--------------
+
+Base class for filesystem operations with common interface.
+
+This abstract base class defines the standard interface for filesystem operations
+and provides common utilities like path resolution and validation.
### fs\_options
@@ -556,6 +561,14 @@ fs_options: AnyDict | None = Config(default=None)
Extra options for the universal filesystem.
+### max\_concurrent\_reads
+
+```python
+max_concurrent_reads: int = Config(default=25)
+```
+
+Maximum number of concurrent file reads for grep operations.
+
### multi\_modal
```python
@@ -574,77 +587,6 @@ path: str | Path | UPath = Config(
Base path to work from.
-### cp
-
-```python
-cp(
- src: Annotated[str, "Source file"],
- dest: Annotated[str, "Destination path"],
-) -> FilesystemItem
-```
-
-Copy a file to a new location.
-
-
-```python
-@tool_method(variants=["write"], catch=True)
-def cp(
- self,
- src: t.Annotated[str, "Source file"],
- dest: t.Annotated[str, "Destination path"],
-) -> FilesystemItem:
- """Copy a file to a new location."""
- src_path = self._resolve(src)
- dest_path = self._resolve(dest)
-
- if not src_path.exists():
- raise ValueError(f"'{src}' not found")
-
- if not src_path.is_file():
- raise ValueError(f"'{src}' is not a file")
-
- dest_path.parent.mkdir(parents=True, exist_ok=True)
-
- with src_path.open("rb") as src_file, dest_path.open("wb") as dest_file:
- dest_file.write(src_file.read())
-
- return FilesystemItem.from_path(dest_path, self._upath)
-```
-
-
-
-
-### delete
-
-```python
-delete(path: Annotated[str, File or directory]) -> bool
-```
-
-Delete a file or directory.
-
-
-```python
-@tool_method(variants=["write"], catch=True)
-def delete(
- self,
- path: t.Annotated[str, "File or directory"],
-) -> bool:
- """Delete a file or directory."""
- _path = self._resolve(path)
- if not _path.exists():
- raise ValueError(f"'{path}' not found")
-
- if _path.is_dir():
- _path.rmdir()
- else:
- _path.unlink()
-
- return True
-```
-
-
-
-
### glob
```python
@@ -661,7 +603,7 @@ include \*\* for recursive matching, such as '/path/\**/dir/*.py'.
```python
@tool_method(catch=True)
-def glob(
+async def glob(
self,
pattern: t.Annotated[str, "Glob pattern for file matching"],
) -> list[FilesystemItem]:
@@ -669,7 +611,7 @@ def glob(
Returns a list of paths matching a valid glob pattern. The pattern can
include ** for recursive matching, such as '/path/**/dir/*.py'.
"""
- matches = list(self._upath.glob(pattern))
+ matches = await asyncio.to_thread(lambda: list(self._upath.glob(pattern)))
# Check to make sure all matches are within the base path
for match in matches:
@@ -699,7 +641,7 @@ grep(
recursive: Annotated[
bool, "Search recursively in directories"
] = False,
-) -> list[GrepMatch]
+) -> list[GrepMatch | str]
```
Search for pattern in files and return matches with line numbers and context.
@@ -709,14 +651,14 @@ For directories, all text files will be searched.
```python
@tool_method(variants=["read", "write"], catch=True)
-def grep(
+async def grep(
self,
pattern: t.Annotated[str, "Regular expression pattern to search for"],
path: t.Annotated[str, "File or directory path to search in"],
*,
max_results: t.Annotated[int, "Maximum number of results to return"] = 100,
recursive: t.Annotated[bool, "Search recursively in directories"] = False,
-) -> list[GrepMatch]:
+) -> list[GrepMatch | str]:
"""
Search for pattern in files and return matches with line numbers and context.
@@ -734,25 +676,31 @@ def grep(
files_to_search.append(target_path)
elif target_path.is_dir():
files_to_search.extend(
- list(target_path.rglob("*") if recursive else target_path.glob("*")),
+ await asyncio.to_thread(
+ lambda: list(target_path.rglob("*") if recursive else target_path.glob("*"))
+ ),
)
- matches: list[GrepMatch] = []
- for file_path in [f for f in files_to_search if f.is_file()]:
- if len(matches) >= max_results:
- break
+ # Filter to files only and check size
+ files_to_search = [
+ f for f in files_to_search if f.is_file() and f.stat().st_size <= MAX_GREP_FILE_SIZE
+ ]
- if file_path.stat().st_size > MAX_GREP_FILE_SIZE:
- continue
+ async def search_file(file_path: UPath) -> list[GrepMatch | str]:
+ """Search a single file for matches."""
+ file_matches: list[GrepMatch | str] = []
+ try:
+ # Use the subclass's read_file method
+ content = await self.read_file(self._relative(file_path))
+ if isinstance(content, bytes):
+ content = content.decode("utf-8")
+ elif isinstance(content, rg.ContentImageUrl):
+ # Can't grep images
+ return []
- with contextlib.suppress(Exception):
- with file_path.open("r") as f:
- lines = f.readlines()
+ lines = content.splitlines(keepends=True)
for i, line in enumerate(lines):
- if len(matches) >= max_results:
- break
-
if regex.search(line):
line_num = i + 1
context_start = max(0, i - 1)
@@ -765,7 +713,7 @@ def grep(
context.append(f"{prefix} {j + 1}: {shorten_string(line_text, 80)}")
rel_path = self._relative(file_path)
- matches.append(
+ file_matches.append(
GrepMatch(
path=rel_path,
line_number=line_num,
@@ -773,8 +721,38 @@ def grep(
context=context,
),
)
+ except (
+ FileNotFoundError,
+ PermissionError,
+ IsADirectoryError,
+ UnicodeDecodeError,
+ OSError,
+ ValueError,
+ ) as e:
+ file_matches.append(f"Error occurred while searching file {file_path}: {e}")
+
+ return file_matches
+
+ # Search files in parallel with concurrency limit
+ semaphore = asyncio.Semaphore(self.max_concurrent_reads)
+
+ async def search_file_limited(file_path: UPath) -> list[GrepMatch | str]:
+ """Search a single file with semaphore to limit concurrency."""
+ async with semaphore:
+ return await search_file(file_path)
+
+ all_matches: list[GrepMatch | str] = []
+ results = await asyncio.gather(
+ *[search_file_limited(file_path) for file_path in files_to_search]
+ )
+
+ # Flatten results and respect max_results
+ for file_matches in results:
+ all_matches.extend(file_matches)
+ if len(all_matches) >= max_results:
+ break
- return matches
+ return all_matches[:max_results]
```
@@ -793,7 +771,7 @@ List the contents of a directory.
```python
@tool_method(variants=["read", "write"], catch=True)
-def ls(
+async def ls(
self,
path: t.Annotated[str, "Directory path to list"] = "",
) -> list[FilesystemItem]:
@@ -806,7 +784,7 @@ def ls(
if not _path.is_dir():
raise ValueError(f"'{path}' is not a directory.")
- items = list(_path.iterdir())
+ items = await asyncio.to_thread(lambda: list(_path.iterdir()))
return [FilesystemItem.from_path(item, self._upath) for item in items]
```
@@ -826,18 +804,218 @@ Create a directory and any necessary parent directories.
```python
@tool_method(variants=["write"], catch=True)
-def mkdir(
+async def mkdir(
self,
path: t.Annotated[str, "Directory path to create"],
) -> FilesystemItem:
"""Create a directory and any necessary parent directories."""
dir_path = self._resolve(path)
- dir_path.mkdir(parents=True, exist_ok=True)
+ await asyncio.to_thread(lambda: dir_path.mkdir(parents=True, exist_ok=True))
return FilesystemItem.from_path(dir_path, self._upath)
```
+
+
+### read\_file
+
+```python
+read_file(
+ path: Annotated[str, "Path to the file to read"],
+) -> rg.ContentImageUrl | str
+```
+
+Must be implemented in subclasses
+
+
+```python
+async def read_file(
+ self, path: t.Annotated[str, "Path to the file to read"]
+) -> rg.ContentImageUrl | str:
+ """Must be implemented in subclasses"""
+ raise NotImplementedError("Subclasses must implement")
+```
+
+
+
+
+FilesystemItem
+--------------
+
+```python
+FilesystemItem(
+ type: Literal["file", "dir"],
+ name: str,
+ size: int | None = None,
+ modified: str | None = None,
+)
+```
+
+Item in the filesystem
+
+### from\_path
+
+```python
+from_path(
+ path: UPath, relative_base: UPath
+) -> FilesystemItem
+```
+
+Create an Item from a UPath.
+
+**Parameters:**
+
+* **`path`**
+ (`UPath`)
+ –The UPath to create an item from
+* **`relative_base`**
+ (`UPath`)
+ –The base path to calculate relative paths from
+
+**Returns:**
+
+* `FilesystemItem`
+ –FilesystemItem representing the path
+
+**Raises:**
+
+* `ValueError`
+ –If the path is neither a file nor a directory
+
+
+```python
+@classmethod
+def from_path(cls, path: "UPath", relative_base: "UPath") -> "FilesystemItem":
+ """Create an Item from a UPath.
+
+ Args:
+ path: The UPath to create an item from
+ relative_base: The base path to calculate relative paths from
+
+ Returns:
+ FilesystemItem representing the path
+
+ Raises:
+ ValueError: If the path is neither a file nor a directory
+ """
+ base_path: str = str(relative_base.resolve())
+ full_path: str = str(path.resolve())
+ relative: str = full_path[len(base_path) :]
+
+ if path.is_dir():
+ return cls(type="dir", name=relative, size=None, modified=None)
+
+ if path.is_file():
+ return cls(
+ type="file",
+ name=relative,
+ size=path.stat().st_size,
+ modified=datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc).strftime(
+ "%Y-%m-%d %H:%M:%S",
+ ),
+ )
+
+ raise ValueError(f"'{relative}' is not a valid file or directory.")
+```
+
+
+
+
+GrepMatch
+---------
+
+```python
+GrepMatch(
+ path: str,
+ line_number: int,
+ line: str,
+ context: list[str],
+)
+```
+
+Individual search match
+
+LocalFilesystem
+---------------
+
+Local filesystem implementation using aiofiles.
+
+Supports operations on the local disk using async file I/O.
+
+### cp
+
+```python
+cp(
+ src: Annotated[str, "Source file"],
+ dest: Annotated[str, "Destination path"],
+) -> FilesystemItem
+```
+
+Copy a file to a new location.
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def cp(
+ self,
+ src: t.Annotated[str, "Source file"],
+ dest: t.Annotated[str, "Destination path"],
+) -> FilesystemItem:
+ """Copy a file to a new location."""
+ src_path = self._resolve(src)
+ dest_path = self._resolve(dest)
+
+ if not src_path.exists():
+ raise ValueError(f"'{src}' not found")
+
+ if not src_path.is_file():
+ raise ValueError(f"'{src}' is not a file")
+
+ await asyncio.to_thread(lambda: dest_path.parent.mkdir(parents=True, exist_ok=True))
+
+ async with (
+ aiofiles.open(src_path, "rb") as src_file,
+ aiofiles.open(dest_path, "wb") as dest_file,
+ ):
+ content = await src_file.read()
+ await dest_file.write(content)
+
+ return FilesystemItem.from_path(dest_path, self._upath)
+```
+
+
+
+
+### delete
+
+```python
+delete(path: Annotated[str, File or directory]) -> bool
+```
+
+Delete a file or directory.
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def delete(
+ self,
+ path: t.Annotated[str, "File or directory"],
+) -> bool:
+ """Delete a file or directory."""
+ _path = self._resolve(path)
+ if not _path.exists():
+ raise ValueError(f"'{path}' not found")
+
+ if _path.is_dir():
+ await asyncio.to_thread(_path.rmdir)
+ else:
+ await asyncio.to_thread(_path.unlink)
+
+ return True
+```
+
+
### mv
@@ -854,7 +1032,7 @@ Move a file or directory to a new location.
```python
@tool_method(variants=["write"], catch=True)
-def mv(
+async def mv(
self,
src: t.Annotated[str, "Source path"],
dest: t.Annotated[str, "Destination path"],
@@ -866,9 +1044,9 @@ def mv(
if not src_path.exists():
raise ValueError(f"'{src}' not found")
- dest_path.parent.mkdir(parents=True, exist_ok=True)
+ await asyncio.to_thread(lambda: dest_path.parent.mkdir(parents=True, exist_ok=True))
- src_path.rename(dest_path)
+ await asyncio.to_thread(lambda: src_path.rename(dest_path))
return FilesystemItem.from_path(dest_path, self._upath)
```
@@ -886,19 +1064,40 @@ read_file(
Read a file and return its contents.
+**Returns:**
+
+* `ContentImageUrl | str`
+ –+ str: The file contents decoded as UTF-8 if possible.
+* `ContentImageUrl | str`
+ –+ rg.ContentImageUrl: If the file is non-text and multi\_modal is True.
+
+
+Callers should be prepared to handle raw bytes if the file is not valid UTF-8 and multi\_modal is False.
+
+
```python
@tool_method(variants=["read", "write"], catch=True)
-def read_file(
+async def read_file(
self,
path: t.Annotated[str, "Path to the file to read"],
) -> rg.ContentImageUrl | str:
- """Read a file and return its contents."""
+ """
+ Read a file and return its contents.
+
+ Returns:
+ - str: The file contents decoded as UTF-8 if possible.
+ - rg.ContentImageUrl: If the file is non-text and multi_modal is True.
+
+ Note:
+ Callers should be prepared to handle raw bytes if the file is not valid UTF-8 and multi_modal is False.
+ """
_path = self._resolve(path)
- content = _path.read_bytes()
+ async with aiofiles.open(_path, "rb") as f:
+ content = await f.read()
try:
- return content.decode("utf-8")
+ return str(content.decode("utf-8"))
except UnicodeDecodeError as e:
if self.multi_modal:
return rg.ContentImageUrl.from_file(path)
@@ -926,7 +1125,7 @@ Negative line numbers count from the end.
```python
@tool_method(variants=["read", "write"], catch=True)
-def read_lines(
+async def read_lines(
self,
path: t.Annotated[str, "Path to the file to read"],
start_line: t.Annotated[int, "Start line number (0-indexed)"] = 0,
@@ -944,8 +1143,8 @@ def read_lines(
if not _path.is_file():
raise ValueError(f"'{path}' is not a file.")
- with _path.open("r") as f:
- lines = f.readlines()
+ async with aiofiles.open(_path) as f:
+ lines = await f.readlines()
if start_line < 0:
start_line = len(lines) + start_line
@@ -978,15 +1177,15 @@ Create or overwrite a file with the given contents.
```python
@tool_method(variants=["write"], catch=True)
-def write_file(
+async def write_file(
self,
path: t.Annotated[str, "Path to write the file to"],
contents: t.Annotated[str, "Content to write to the file"],
) -> FilesystemItem:
"""Create or overwrite a file with the given contents."""
- _path = self._safe_create_file(path)
- with _path.open("w") as f:
- f.write(contents)
+ _path = await self._safe_create_file(path)
+ async with aiofiles.open(_path, "w") as f:
+ await f.write(contents)
return FilesystemItem.from_path(_path, self._upath)
```
@@ -994,12 +1193,44 @@ def write_file(
-### write\_lines
+### write\_file\_bytes
```python
-write_lines(
- path: Annotated[str, "Path to write to"],
- contents: Annotated[str, "Content to write"],
+write_file_bytes(
+ path: Annotated[str, "Path to write the file to"],
+ byte_data: Annotated[
+ bytes, "Bytes to write to the file"
+ ],
+) -> FilesystemItem
+```
+
+Create or overwrite a file with the given bytes.
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def write_file_bytes(
+ self,
+ path: t.Annotated[str, "Path to write the file to"],
+ byte_data: t.Annotated[bytes, "Bytes to write to the file"],
+) -> FilesystemItem:
+ """Create or overwrite a file with the given bytes."""
+ _path = await self._safe_create_file(path)
+ async with aiofiles.open(_path, "wb") as f:
+ await f.write(byte_data)
+
+ return FilesystemItem.from_path(_path, self._upath)
+```
+
+
+
+
+### write\_lines
+
+```python
+write_lines(
+ path: Annotated[str, "Path to write to"],
+ contents: Annotated[str, "Content to write"],
insert_line: Annotated[
int,
"Line number to insert at (negative counts from end)",
@@ -1016,7 +1247,7 @@ Mode can be 'insert' to add lines or 'overwrite' to replace lines.
```python
@tool_method(variants=["write"], catch=True)
-def write_lines(
+async def write_lines(
self,
path: t.Annotated[str, "Path to write to"],
contents: t.Annotated[str, "Content to write"],
@@ -1030,11 +1261,11 @@ def write_lines(
if mode not in ["insert", "overwrite"]:
raise ValueError("Invalid mode. Use 'insert' or 'overwrite'")
- _path = self._safe_create_file(path)
+ _path = await self._safe_create_file(path)
lines: list[str] = []
- with _path.open("r") as f:
- lines = f.readlines()
+ async with aiofiles.open(_path) as f:
+ lines = await f.readlines()
# Normalize line endings in content
content_lines = [
@@ -1054,8 +1285,8 @@ def write_lines(
elif mode == "overwrite":
lines[insert_line : insert_line + len(content_lines)] = content_lines
- with _path.open("w") as f:
- f.writelines(lines)
+ async with aiofiles.open(_path, "w") as f:
+ await f.writelines(lines)
return FilesystemItem.from_path(_path, self._upath)
```
@@ -1063,72 +1294,534 @@ def write_lines(
-FilesystemItem
---------------
+S3Filesystem
+------------
+
+S3 filesystem implementation using aioboto3.
+
+Supports operations on AWS S3 buckets with async I/O.
+Requires aioboto3 and properly configured AWS credentials.
+
+### cp
```python
-FilesystemItem(
- type: Literal["file", "dir"],
- name: str,
- size: int | None = None,
- modified: str | None = None,
-)
+cp(
+ src: Annotated[str, "Source file"],
+ dest: Annotated[str, "Destination path"],
+) -> FilesystemItem
```
-Item in the filesystem
+Copy a file to a new location within S3.
-### from\_path
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def cp(
+ self,
+ src: t.Annotated[str, "Source file"],
+ dest: t.Annotated[str, "Destination path"],
+) -> FilesystemItem:
+ """Copy a file to a new location within S3."""
+ src_path = self._resolve(src)
+ dest_path = self._resolve(dest)
+
+ if not src_path.exists():
+ raise ValueError(f"'{src}' not found")
+
+ if not src_path.is_file():
+ raise ValueError(f"'{src}' is not a file")
+
+ src_bucket, src_key = self._get_s3_parts(src_path)
+ dest_bucket, dest_key = self._get_s3_parts(dest_path)
+
+ session = self._get_session()
+ async with session.client("s3") as s3_client:
+ # Use S3 copy_object for efficient server-side copy
+ copy_source = {"Bucket": src_bucket, "Key": src_key}
+ await s3_client.copy_object(CopySource=copy_source, Bucket=dest_bucket, Key=dest_key)
+
+ # Return FilesystemItem without calling stat
+ relative = self._relative(dest_path)
+ return FilesystemItem(type="file", name=relative, size=None, modified=None)
+```
+
+
+
+
+### delete
```python
-from_path(
- path: UPath, relative_base: UPath
+delete(path: Annotated[str, File or directory]) -> bool
+```
+
+Delete a file from S3.
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def delete(
+ self,
+ path: t.Annotated[str, "File or directory"],
+) -> bool:
+ """Delete a file from S3."""
+ _path = self._resolve(path)
+
+ if not _path.exists():
+ raise ValueError(f"'{path}' not found")
+
+ bucket, key = self._get_s3_parts(_path)
+
+ session = self._get_session()
+ async with session.client("s3") as s3_client:
+ await s3_client.delete_object(Bucket=bucket, Key=key)
+
+ return True
+```
+
+
+
+
+### mkdir
+
+```python
+mkdir(
+ path: Annotated[str, "Directory path to create"],
) -> FilesystemItem
```
-Create an Item from a UPath
+Create a directory marker in S3.
+
+Note: S3 doesn't have true directories. This creates an empty object
+with a trailing slash to simulate a directory for compatibility.
```python
-@classmethod
-def from_path(cls, path: "UPath", relative_base: "UPath") -> "FilesystemItem":
- """Create an Item from a UPath"""
+@tool_method(variants=["write"], catch=True)
+async def mkdir(
+ self,
+ path: t.Annotated[str, "Directory path to create"],
+) -> FilesystemItem:
+ """
+ Create a directory marker in S3.
- base_path = str(relative_base.resolve())
- full_path = str(path.resolve())
- relative = full_path[len(base_path) :]
+ Note: S3 doesn't have true directories. This creates an empty object
+ with a trailing slash to simulate a directory for compatibility.
+ """
+ _path = self._resolve(path)
+ bucket, key = self._get_s3_parts(_path)
- if path.is_dir():
- return cls(type="dir", name=relative, size=None, modified=None)
+ # Ensure key ends with slash for directory marker
+ if not key.endswith("/"):
+ key += "/"
- if path.is_file():
- return cls(
- type="file",
- name=relative,
- size=path.stat().st_size,
- modified=datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc).strftime(
- "%Y-%m-%d %H:%M:%S",
- ),
- )
+ session = self._get_session()
+ async with session.client("s3") as s3_client:
+ # Create empty object with trailing slash
+ await s3_client.put_object(Bucket=bucket, Key=key, Body=b"")
- raise ValueError(f"'{relative}' is not a valid file or directory.")
+ relative = self._relative(_path)
+ return FilesystemItem(type="dir", name=relative, size=None, modified=None)
```
-GrepMatch
----------
+### mv
```python
-GrepMatch(
- path: str,
- line_number: int,
- line: str,
- context: list[str],
-)
+mv(
+ src: Annotated[str, "Source path"],
+ dest: Annotated[str, "Destination path"],
+) -> FilesystemItem
```
-Individual search match
+Move a file to a new location within S3 (copy then delete).
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def mv(
+ self,
+ src: t.Annotated[str, "Source path"],
+ dest: t.Annotated[str, "Destination path"],
+) -> FilesystemItem:
+ """Move a file to a new location within S3 (copy then delete)."""
+ # Copy to destination
+ result = await self.cp(src, dest)
+
+ # Delete source
+ await self.delete(src)
+
+ return result
+```
+
+
+
+
+### read\_file
+
+```python
+read_file(
+ path: Annotated[str, "Path to the file to read"],
+) -> str
+```
+
+Read a file from S3 and return its contents.
+
+**Returns:**
+
+* `str`
+ –+ str: The file contents decoded as UTF-8 if possible.
+
+
+multi\_modal support for S3 is limited as we can't easily determine
+image types without downloading. Returns bytes for non-UTF-8 content.
+
+
+
+```python
+@tool_method(variants=["read", "write"], catch=True)
+async def read_file(
+ self,
+ path: t.Annotated[str, "Path to the file to read"],
+) -> str:
+ """
+ Read a file from S3 and return its contents.
+
+ Returns:
+ - str: The file contents decoded as UTF-8 if possible.
+
+ Note:
+ multi_modal support for S3 is limited as we can't easily determine
+ image types without downloading. Returns bytes for non-UTF-8 content.
+ """
+ _path = self._resolve(path)
+ bucket, key = self._get_s3_parts(_path)
+
+ session = self._get_session()
+ async with session.client("s3") as s3_client:
+ response = await s3_client.get_object(Bucket=bucket, Key=key)
+ content = await response["Body"].read()
+
+ try:
+ return str(content.decode("utf-8"))
+ except UnicodeDecodeError as e:
+ raise ValueError("File is not a valid text file.") from e
+```
+
+
+
+
+### read\_lines
+
+```python
+read_lines(
+ path: Annotated[str, "Path to the file to read"],
+ start_line: Annotated[
+ int, "Start line number (0-indexed)"
+ ] = 0,
+ end_line: Annotated[int, "End line number"] = -1,
+) -> str
+```
+
+Read a partial file from S3 and return the contents.
+Negative line numbers count from the end.
+
+
+```python
+@tool_method(variants=["read", "write"], catch=True)
+async def read_lines(
+ self,
+ path: t.Annotated[str, "Path to the file to read"],
+ start_line: t.Annotated[int, "Start line number (0-indexed)"] = 0,
+ end_line: t.Annotated[int, "End line number"] = -1,
+) -> str:
+ """
+ Read a partial file from S3 and return the contents.
+ Negative line numbers count from the end.
+ """
+ content = await self.read_file(path)
+ if isinstance(content, bytes):
+ content = content.decode("utf-8")
+ elif isinstance(content, rg.ContentImageUrl):
+ raise TypeError("Cannot read lines from non-text content")
+
+ lines = content.splitlines(keepends=True)
+
+ if start_line < 0:
+ start_line = len(lines) + start_line
+
+ if end_line < 0:
+ end_line = len(lines) + end_line + 1
+
+ start_line = max(0, min(start_line, len(lines)))
+ end_line = max(start_line, min(end_line, len(lines)))
+
+ return "".join(lines[start_line:end_line])
+```
+
+
+
+
+### write\_file
+
+```python
+write_file(
+ path: Annotated[str, "Path to write the file to"],
+ contents: Annotated[
+ str, "Content to write to the file"
+ ],
+) -> FilesystemItem
+```
+
+Create or overwrite a file in S3 with the given contents.
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def write_file(
+ self,
+ path: t.Annotated[str, "Path to write the file to"],
+ contents: t.Annotated[str, "Content to write to the file"],
+) -> FilesystemItem:
+ """Create or overwrite a file in S3 with the given contents."""
+ _path = self._resolve(path)
+ bucket, key = self._get_s3_parts(_path)
+
+ session = self._get_session()
+ async with session.client("s3") as s3_client:
+ await s3_client.put_object(Bucket=bucket, Key=key, Body=contents.encode("utf-8"))
+
+ # Return FilesystemItem without calling stat (S3 put is async)
+ relative = self._relative(_path)
+ return FilesystemItem(
+ type="file",
+ name=relative,
+ size=len(contents.encode("utf-8")),
+ modified=None,
+ )
+```
+
+
+
+
+### write\_file\_bytes
+
+```python
+write_file_bytes(
+ path: Annotated[str, "Path to write the file to"],
+ byte_data: Annotated[
+ bytes, "Bytes to write to the file"
+ ],
+) -> FilesystemItem
+```
+
+Create or overwrite a file in S3 with the given bytes.
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def write_file_bytes(
+ self,
+ path: t.Annotated[str, "Path to write the file to"],
+ byte_data: t.Annotated[bytes, "Bytes to write to the file"],
+) -> FilesystemItem:
+ """Create or overwrite a file in S3 with the given bytes."""
+ _path = self._resolve(path)
+ bucket, key = self._get_s3_parts(_path)
+
+ session = self._get_session()
+ async with session.client("s3") as s3_client:
+ await s3_client.put_object(Bucket=bucket, Key=key, Body=byte_data)
+
+ # Return FilesystemItem without calling stat (S3 put is async)
+ relative = self._relative(_path)
+ return FilesystemItem(type="file", name=relative, size=len(byte_data), modified=None)
+```
+
+
+
+
+### write\_lines
+
+```python
+write_lines(
+ path: Annotated[str, "Path to write to"],
+ contents: Annotated[str, "Content to write"],
+ insert_line: Annotated[
+ int,
+ "Line number to insert at (negative counts from end)",
+ ] = -1,
+ mode: Annotated[
+ str, "insert" or "overwrite"
+ ] = "insert",
+) -> FilesystemItem | str
+```
+
+Write content to a specific line in an S3 file.
+Mode can be 'insert' to add lines or 'overwrite' to replace lines.
+
+
+```python
+@tool_method(variants=["write"], catch=True)
+async def write_lines(
+ self,
+ path: t.Annotated[str, "Path to write to"],
+ contents: t.Annotated[str, "Content to write"],
+ insert_line: t.Annotated[int, "Line number to insert at (negative counts from end)"] = -1,
+ mode: t.Annotated[str, "'insert' or 'overwrite'"] = "insert",
+) -> FilesystemItem | str:
+ """
+ Write content to a specific line in an S3 file.
+ Mode can be 'insert' to add lines or 'overwrite' to replace lines.
+ """
+ if mode not in ["insert", "overwrite"]:
+ raise TypeError("Invalid mode. Use 'insert' or 'overwrite'")
+
+ # Read existing content
+ try:
+ existing_content = await self.read_file(path)
+ if isinstance(existing_content, bytes):
+ existing_content = existing_content.decode("utf-8")
+ elif isinstance(existing_content, rg.ContentImageUrl):
+ logger.warning("Cannot write lines to non-text content")
+ lines = []
+ lines = existing_content.splitlines(keepends=True)
+ except FileNotFoundError:
+ # File doesn't exist, start with empty lines
+ lines = []
+ except (PermissionError, IsADirectoryError, ClientError, BotoCoreError, ValueError) as e:
+ # File doesn't exist or can't be read, start with empty lines
+ return f"Error occurred while trying to write to the supplied filepath {path}: {e}"
+
+ # Normalize line endings in content
+ content_lines = [
+ line + "\n" if not line.endswith("\n") else line
+ for line in contents.splitlines(keepends=False)
+ ]
+
+ # Calculate insert position and ensure it's within bounds
+ if insert_line < 0:
+ insert_line = len(lines) + insert_line + 1
+
+ insert_line = max(0, min(insert_line, len(lines)))
+
+ # Apply the update
+ if mode == "insert":
+ lines[insert_line:insert_line] = content_lines
+ elif mode == "overwrite":
+ lines[insert_line : insert_line + len(content_lines)] = content_lines
+
+ # Write back
+ new_content = "".join(lines)
+ return await self.write_file(path, new_content)
+```
+
+
+
+
+Filesystem
+----------
+
+```python
+Filesystem(
+ path: str | Path | UPath, **kwargs: Any
+) -> LocalFilesystem | S3Filesystem
+```
+
+Factory function to create the appropriate filesystem instance based on path.
+
+Automatically detects the filesystem type from the path protocol and returns
+the corresponding implementation (LocalFilesystem or S3Filesystem).
+
+**Parameters:**
+
+* **`path`**
+ (`str | Path | UPath`)
+ –Local path, S3 URL (s3://), or other supported protocol
+* **`**kwargs`**
+ (`Any`, default:
+ `{}`
+ )
+ –Additional arguments passed to the filesystem constructor
+
+**Returns:**
+
+* `LocalFilesystem | S3Filesystem`
+ –LocalFilesystem for local paths, S3Filesystem for S3 URLs
+
+**Examples:**
+
+```python
+>>> # Local filesystem
+>>> fs = Filesystem(path="/tmp/data")
+>>> isinstance(fs, LocalFilesystem)
+True
+```
+
+```python
+>>> # S3 filesystem
+>>> fs = Filesystem(path="s3://my-bucket/data")
+>>> isinstance(fs, S3Filesystem)
+True
+```
+
+
+```python
+def Filesystem( # noqa: N802
+ path: str | Path | UPath, **kwargs: t.Any
+) -> LocalFilesystem | S3Filesystem:
+ """
+ Factory function to create the appropriate filesystem instance based on path.
+
+ Automatically detects the filesystem type from the path protocol and returns
+ the corresponding implementation (LocalFilesystem or S3Filesystem).
+
+ Args:
+ path: Local path, S3 URL (s3://), or other supported protocol
+ **kwargs: Additional arguments passed to the filesystem constructor
+
+ Returns:
+ LocalFilesystem for local paths, S3Filesystem for S3 URLs
+
+ Examples:
+ >>> # Local filesystem
+ >>> fs = Filesystem(path="/tmp/data")
+ >>> isinstance(fs, LocalFilesystem)
+ True
+
+ >>> # S3 filesystem
+ >>> fs = Filesystem(path="s3://my-bucket/data")
+ >>> isinstance(fs, S3Filesystem)
+ True
+ """
+ # Check if it's a string starting with s3://
+ if isinstance(path, str) and path.startswith("s3://"):
+ return S3Filesystem(path=path, **kwargs)
+
+ # Check if it's a UPath with S3 protocol
+ if isinstance(path, UPath) and path.protocol in ["s3", "s3a"]:
+ return S3Filesystem(path=path, **kwargs)
+
+ # Try to create UPath and check protocol
+ try:
+ fs_options = kwargs.get("fs_options", {})
+ upath = UPath(str(path), **fs_options)
+ if upath.protocol in ["s3", "s3a"]:
+ return S3Filesystem(path=path, **kwargs)
+ except (TypeError, ValueError) as e:
+ # If UPath creation fails, fall through to local
+ logger.warning(
+ f"Upath initialization failed ({type(e).__name__}: {e}), defaulting to local path"
+ )
+ return LocalFilesystem(path=path, **kwargs)
+
+ # Default to local filesystem
+ return LocalFilesystem(path=path, **kwargs)
+```
+
+
+
Memory
------
diff --git a/docs/sdk/api.mdx b/docs/sdk/api.mdx
index e892a8d1..e50127db 100644
--- a/docs/sdk/api.mdx
+++ b/docs/sdk/api.mdx
@@ -117,7 +117,8 @@ def __init__(
```python
create_project(
- name: str | UUID | None = None,
+ name: str,
+ key: str,
workspace_id: UUID | None = None,
organization_id: UUID | None = None,
) -> Project
@@ -128,9 +129,7 @@ Creates a new project.
**Parameters:**
* **`name`**
- (`str | UUID | None`, default:
- `None`
- )
+ (`str`)
–The name of the project. If None, a default name will be used.
* **`workspace_id`**
(`UUID | None`, default:
@@ -152,7 +151,8 @@ Creates a new project.
```python
def create_project(
self,
- name: str | UUID | None = None,
+ name: str,
+ key: str,
workspace_id: UUID | None = None,
organization_id: UUID | None = None,
) -> Project:
@@ -167,8 +167,8 @@ def create_project(
Project: The created Project object.
"""
payload: dict[str, t.Any] = {}
- if name is not None:
- payload["name"] = name
+ payload["name"] = name
+ payload["key"] = key
if workspace_id is not None:
payload["workspace_id"] = str(workspace_id)
if organization_id is not None:
@@ -861,7 +861,7 @@ Retrieves details of a specific project.
* **`project_identifier`**
(`str | UUID`)
- –The project identifier. ID, name, or slug.
+ –The project identifier. ID or key.
**Returns:**
@@ -874,7 +874,7 @@ def get_project(self, project_identifier: str | UUID, workspace_id: UUID) -> Pro
"""Retrieves details of a specific project.
Args:
- project_identifier (str | UUID): The project identifier. ID, name, or slug.
+ project_identifier (str | UUID): The project identifier. ID or key.
Returns:
Project: The Project object.
diff --git a/dreadnode/api/client.py b/dreadnode/api/client.py
index f2ceacca..302dcdab 100644
--- a/dreadnode/api/client.py
+++ b/dreadnode/api/client.py
@@ -288,7 +288,7 @@ def get_project(self, project_identifier: str | UUID, workspace_id: UUID) -> Pro
"""Retrieves details of a specific project.
Args:
- project_identifier (str | UUID): The project identifier. ID, name, or slug.
+ project_identifier (str | UUID): The project identifier. ID or key.
Returns:
Project: The Project object.
@@ -302,7 +302,8 @@ def get_project(self, project_identifier: str | UUID, workspace_id: UUID) -> Pro
def create_project(
self,
- name: str | UUID | None = None,
+ name: str,
+ key: str,
workspace_id: UUID | None = None,
organization_id: UUID | None = None,
) -> Project:
@@ -317,8 +318,8 @@ def create_project(
Project: The created Project object.
"""
payload: dict[str, t.Any] = {}
- if name is not None:
- payload["name"] = name
+ payload["name"] = name
+ payload["key"] = key
if workspace_id is not None:
payload["workspace_id"] = str(workspace_id)
if organization_id is not None:
diff --git a/dreadnode/constants.py b/dreadnode/constants.py
index 6b0b7c10..f4b96d54 100644
--- a/dreadnode/constants.py
+++ b/dreadnode/constants.py
@@ -35,6 +35,8 @@
DEFAULT_WORKSPACE_NAME = "Personal Workspace"
# default project name
DEFAULT_PROJECT_NAME = "Default"
+# default project key
+DEFAULT_PROJECT_KEY = "default"
#
# Environment Variable Names
diff --git a/dreadnode/main.py b/dreadnode/main.py
index a3d094ac..8e4237a0 100644
--- a/dreadnode/main.py
+++ b/dreadnode/main.py
@@ -32,6 +32,7 @@
)
from dreadnode.constants import (
DEFAULT_LOCAL_STORAGE_DIR,
+ DEFAULT_PROJECT_KEY,
DEFAULT_PROJECT_NAME,
DEFAULT_SERVER_URL,
ENV_API_KEY,
@@ -348,8 +349,8 @@ def _resolve_project(self) -> None:
"""
Resolve the project to use based on configuration.
- If a project is specified by name and doesn't exist, it will be created.
- If no project is specified, it will use or create one named 'default'.
+ If a project is specified by key and doesn't exist, it will be created.
+ If no project is specified, it will use or create one with key 'default'.
Raises:
RuntimeError: If the API client is not initialized.
@@ -366,7 +367,7 @@ def _resolve_project(self) -> None:
found_project: Project | None = None
try:
found_project = self._api.get_project(
- project_identifier=self.project or DEFAULT_PROJECT_NAME,
+ project_identifier=self.project or DEFAULT_PROJECT_KEY,
workspace_id=self._workspace.id,
)
except RuntimeError as e:
@@ -378,7 +379,9 @@ def _resolve_project(self) -> None:
if not found_project:
# create it in the workspace
found_project = self._api.create_project(
- name=self.project or DEFAULT_PROJECT_NAME, workspace_id=self._workspace.id
+ name=self.project or DEFAULT_PROJECT_NAME,
+ key=self.project or DEFAULT_PROJECT_KEY,
+ workspace_id=self._workspace.id,
)
# This is what's used in all of the Traces/Spans/Runs
self._project = found_project