Skip to main content

Programmatic Tools

After generating programmatic tools from MCP Servers and Skills, you can execute them using the CodeModeExecutor.

Overview

The CodeModeExecutor runs Python code that composes your generated tools. Instead of making individual tool calls through LLM inference, agents write Python code that:

Important Architecture Note: The tools you call in your code (e.g., from generated.servers.example_mcp import tool) are actual MCP tools and communicate through the MCP protocol. The CodemodeToolset itself is a Pydantic AI toolset (not an MCP server), but it enables you to write Python code that orchestrates MCP tool calls. This is by design - MCP servers remain MCP servers, and codemode provides a code-first interface for composing them programmatically.

  • Calls multiple tools in sequence or parallel
  • Uses loops and conditionals for complex logic
  • Handles errors with try/except
  • Stores intermediate results in variables

Basic Execution

from agent_codemode import CodeModeExecutor, ToolRegistry

# Set up registry (see Tool Discovery)
registry = ToolRegistry()
# ... add servers ...
await registry.discover_all()

# Create and set up executor
executor = CodeModeExecutor(registry)
await executor.setup()

# Execute code
result = await executor.execute("""
from generated.servers.filesystem import read_file

content = await read_file({"path": "/tmp/data.txt"})
print(f"Read {len(content)} characters")
""")

print(result.stdout) # "Read 42 characters"
print(result.stderr) # ""
print(result.text) # Main result (if any)
print(result.success) # True

Execution Result

The execute() method returns an Execution object:

result = await executor.execute(code)

# Check success
if result.success:
print("Code executed successfully")
else:
print(f"Error: {result.error}")

# Get output (stdout/stderr from the code)
print(result.stdout)
print(result.stderr)

# Get main result text
print(result.text)

Execution Options

Timeout

Set a maximum execution time:

result = await executor.execute(
code,
timeout=30.0 # 30 seconds
)

Working Directory

Specify where the code runs:

result = await executor.execute(
code,
working_dir="/workspace/project"
)

Environment Variables

Pass environment variables:

result = await executor.execute(
code,
env={"API_KEY": "secret", "DEBUG": "true"}
)

Tool Composition Patterns

Sequential Execution

Call tools one after another:

result = await executor.execute("""
from generated.servers.filesystem import read_file, write_file

# Read, transform, write
content = await read_file({"path": "/input.txt"})
transformed = content.upper()
await write_file({"path": "/output.txt", "content": transformed})
""")

Parallel Execution

Use asyncio.gather for concurrent tool calls:

result = await executor.execute("""
import asyncio
from generated.servers.filesystem import read_file

files = ["a.txt", "b.txt", "c.txt", "d.txt"]

# Read all files in parallel
contents = await asyncio.gather(*[
read_file({"path": f"/data/{f}"}) for f in files
])

total = sum(len(c) for c in contents)
print(f"Total: {total} characters from {len(files)} files")
""")

Conditional Logic

Use if/else for decision making:

result = await executor.execute("""
from generated.servers.filesystem import read_file, write_file
from generated.servers.web import fetch_url

# Try local file first, fall back to web
try:
content = await read_file({"path": "/cache/data.json"})
print("Using cached data")
except:
content = await fetch_url({"url": "https://api.example.com/data"})
await write_file({"path": "/cache/data.json", "content": content})
print("Fetched fresh data")
""")

Loops

Process collections of items:

result = await executor.execute("""
from generated.servers.filesystem import list_directory, read_file

entries = await list_directory({"path": "/documents"})

for entry in entries["entries"]:
if entry.endswith(".md"):
content = await read_file({"path": f"/documents/{entry}"})
word_count = len(content.split())
print(f"{entry}: {word_count} words")
""")

Error Handling

Use try/except for robust execution:

result = await executor.execute("""
from generated.servers.filesystem import read_file, write_file

files = ["important.txt", "optional.txt", "extra.txt"]
results = []

for f in files:
try:
content = await read_file({"path": f"/data/{f}"})
results.append({"file": f, "status": "success", "size": len(content)})
except Exception as e:
results.append({"file": f, "status": "failed", "error": str(e)})

successful = [r for r in results if r["status"] == "success"]
print(f"Processed {len(successful)}/{len(files)} files")
""")

Sandbox Environment

Code executes in an isolated sandbox for safety.

Tool Call Architecture: When your code calls await generate_random_text(...) (a tool from generated.servers.example_mcp), it:

  1. Executes in the sandbox environment
  2. Makes an actual MCP protocol request to the MCP server
  3. Returns the result to your code

This means you get the safety and isolation of MCP servers while writing natural Python code. The sandbox handles the MCP protocol communication transparently.

Sandbox Configuration

from agent_codemode import CodeModeConfig, CodeModeExecutor

config = CodeModeConfig(
sandbox_variant="local", # "local" or "datalayer-runtime"
workspace_path="/workspace", # Working directory
generated_path="/tmp/generated", # Where to put generated bindings
default_timeout=30.0, # Default execution timeout
)

executor = CodeModeExecutor(registry, config=config)

Sandbox Variants

  • local: Runs in a subprocess on the same machine
  • datalayer-runtime: Runs in a cloud-based Datalayer Runtime

Available in Sandbox

The sandbox has access to:

  • Standard Python libraries
  • asyncio for async operations
  • Generated tool bindings (from generated.servers.X import Y)
  • Skills (from skills.name import func)

Tool Call Tracking

Track which tools were called during execution:

result = await executor.execute(code)

# Get tool call history
for call in result.tool_calls:
print(f"Called: {call.tool_name}")
print(f" Args: {call.arguments}")
print(f" Result: {call.result}")
print(f" Duration: {call.duration_ms}ms")

State Persistence

Variables persist across executions within the same executor:

# First execution
await executor.execute("""
data = {"count": 0}
""")

# Second execution - uses state from first
await executor.execute("""
data["count"] += 1
print(f"Count is now {data['count']}") # Prints: Count is now 1
""")

Clearing State

# Clear all state
executor.clear_state()

# Or create a fresh executor
executor = CodeModeExecutor(registry)
await executor.setup()

Best Practices

1. Keep Code Focused

Write small, focused code blocks rather than one large script:

# Good: Focused on one task
result = await executor.execute("""
content = await read_file({"path": "/data.txt"})
lines = content.strip().split("\\n")
print(f"File has {len(lines)} lines")
""")

# Avoid: Too much in one execution

2. Use Parallel Execution for I/O

MCP tools are I/O-bound. Use asyncio.gather for parallelism:

# Good: Parallel reads
contents = await asyncio.gather(*[read_file({"path": f}) for f in files])

# Slow: Sequential reads
contents = []
for f in files:
contents.append(await read_file({"path": f}))

3. Handle Errors Gracefully

Always wrap tool calls that might fail:

try:
result = await risky_tool({"param": value})
except Exception as e:
print(f"Tool failed: {e}")
result = default_value

4. Log Progress

Use print statements for visibility:

print(f"Processing {len(items)} items...")
for i, item in enumerate(items):
if i % 10 == 0:
print(f"Progress: {i}/{len(items)}")
await process(item)
print("Done!")