-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexample.py
More file actions
86 lines (69 loc) · 2.86 KB
/
example.py
File metadata and controls
86 lines (69 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3
"""SubprocessExecutor example - Jupyter kernel-based isolated execution.
This example demonstrates using SubprocessExecutor which runs code in an
IPython kernel within a subprocess. It provides process isolation without
requiring Docker.
Capabilities:
- TIMEOUT: Yes (via message wait timeout)
- PROCESS_ISOLATION: Yes (code runs in subprocess)
- RESET: Yes (kernel restart clears state)
- NETWORK_ISOLATION: No
- FILESYSTEM_ISOLATION: No
Use SubprocessExecutor when:
- You need process isolation but Docker is unavailable or too heavy
- Development/testing where fast iteration matters
- CI environments without Docker access
- You need kernel restart capability to reset state
Use ContainerExecutor instead when:
- You need filesystem or network isolation
- Running untrusted code in production
- You need reproducible environments across machines
"""
import asyncio
from pathlib import Path
from py_code_mode import FileStorage, Session
from py_code_mode.execution import SubprocessConfig, SubprocessExecutor
# Shared tools and workflows directory
HERE = Path(__file__).parent
SHARED = HERE.parent / "shared"
async def main() -> None:
# Storage for workflows and artifacts only
storage = FileStorage(base_path=SHARED)
# Configure subprocess executor with tools from config
config = SubprocessConfig(
# Tools owned by executor
tools_path=SHARED / "tools",
# Python version for venv (defaults to current if not specified)
python_version="3.11",
# Execution timeout in seconds
default_timeout=60.0,
# Kernel startup timeout
startup_timeout=30.0,
# Delete temp venv on close (set False to reuse across runs)
cleanup_venv_on_close=True,
)
executor = SubprocessExecutor(config=config)
async with Session(storage=storage, executor=executor) as session:
# Basic execution
print("Running basic code...")
result = await session.run("1 + 1")
print(f" Result: {result.value}")
# Variables persist across calls within the same session
print("\nVariable persistence...")
await session.run("x = 42")
result = await session.run("x * 2")
print(f" x * 2 = {result.value}")
# Using tools (if available in shared directory)
print("\nSearching for tools...")
result = await session.run("tools.list()")
print(f" Available tools: {result.value}")
# Searching for workflows
print("\nSearching for workflows...")
result = await session.run('workflows.search("fetch")')
print(f" Found workflows: {result.value}")
# Demonstrate stdout capture
print("\nStdout capture...")
result = await session.run('print("Hello from subprocess!")')
print(f" Captured stdout: {result.stdout!r}")
if __name__ == "__main__":
asyncio.run(main())