目录:
当你构建一个能够自主编写和执行代码的 AI 代理时,安全性和隔离性成为了首要考虑的问题。如何让 AI 安全地运行用户或自身生成的代码,而不会影响主系统?E2B(Execute to Build)正是为解决这个问题而生的云沙箱平台。本文将深入探讨 E2B 在 AI 基础设施中的最佳实践。
什么是 E2B?
E2B 是一个专为 AI 代理设计的安全代码执行平台,它提供了隔离的云沙箱环境,让 AI 能够安全地执行代码、操作文件系统、安装依赖包,而不会对主系统造成任何影响。
核心特性
- 安全隔离:每个沙箱都是完全隔离的虚拟环境
- 快速启动:沙箱启动时间通常在 1-3 秒内
- 多语言支持:支持 Python、JavaScript、Bash 等多种编程语言
- 文件系统操作:完整的文件读写能力
- 网络访问:可以访问外部 API 和服务
- 可定制化:支持自定义 Docker 镜像
E2B 的典型应用场景
1. AI 代码助手
最常见的应用是构建能够编写和执行代码的 AI 助手,例如:
from e2b_code_interpreter import CodeInterpreter
def run_ai_generated_code(code: str):
"""
Execute AI-generated code in a secure sandbox.
Args:
code: Python code string generated by AI
Returns:
Execution results including stdout, stderr, and any errors
"""
with CodeInterpreter() as sandbox:
# Execute the code in isolated environment
execution = sandbox.notebook.exec_cell(code)
return {
"success": not execution.error,
"results": execution.results,
"logs": execution.logs,
"error": execution.error
}
# Example: AI generates data analysis code
ai_code = """
import pandas as pd
import matplotlib.pyplot as plt
# Generate sample data
data = pd.DataFrame({
'month': ['Jan', 'Feb', 'Mar', 'Apr'],
'revenue': [10000, 12000, 11500, 13000]
})
# Calculate statistics
print(f"Total Revenue: ${data['revenue'].sum()}")
print(f"Average Revenue: ${data['revenue'].mean():.2f}")
"""
result = run_ai_generated_code(ai_code)
print(result)
2. 数据分析和可视化
E2B 特别适合需要动态生成数据分析代码的场景:
from e2b_code_interpreter import CodeInterpreter
import base64
def analyze_data_with_visualization(data_path: str, analysis_prompt: str):
"""
Perform data analysis and generate visualizations in sandbox.
Args:
data_path: Path to data file
analysis_prompt: User's analysis request
Returns:
Analysis results and generated charts
"""
with CodeInterpreter() as sandbox:
# Upload data file to sandbox
with open(data_path, 'rb') as f:
sandbox.filesystem.write('/tmp/data.csv', f.read())
# Generate and execute analysis code
code = f"""
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# Load data
df = pd.read_csv('/tmp/data.csv')
# Perform analysis based on prompt: {analysis_prompt}
summary = df.describe()
print(summary)
# Create visualization
plt.figure(figsize=(10, 6))
sns.heatmap(df.corr(), annot=True, cmap='coolwarm')
plt.savefig('/tmp/correlation.png')
"""
execution = sandbox.notebook.exec_cell(code)
# Retrieve generated visualization
chart_data = sandbox.filesystem.read('/tmp/correlation.png')
return {
"analysis": execution.logs.stdout,
"chart": base64.b64encode(chart_data).decode()
}
3. 自动化测试和验证
AI 可以生成测试代码并在沙箱中执行:
def validate_function_with_tests(function_code: str, test_cases: list):
"""
Validate AI-generated function with automated tests.
Args:
function_code: The function implementation
test_cases: List of test cases to run
Returns:
Test results and coverage information
"""
with CodeInterpreter() as sandbox:
# Install testing framework
sandbox.process.start("pip install pytest pytest-cov")
# Write function and tests
test_code = f"""
{function_code}
import pytest
{chr(10).join(test_cases)}
# Run tests with coverage
if __name__ == '__main__':
pytest.main(['-v', '--cov'])
"""
sandbox.filesystem.write('/tmp/test_module.py', test_code)
# Execute tests
process = sandbox.process.start("python /tmp/test_module.py")
process.wait()
return {
"passed": process.exit_code == 0,
"output": process.output.stdout,
"errors": process.output.stderr
}
E2B 最佳实践
1. 资源管理与生命周期控制
合理管理沙箱生命周期是保证性能和成本的关键:
from e2b_code_interpreter import CodeInterpreter
from contextlib import contextmanager
import time
class SandboxPool:
"""
Manage a pool of E2B sandboxes for better resource utilization.
"""
def __init__(self, pool_size: int = 3, timeout: int = 300):
self.pool_size = pool_size
self.timeout = timeout
self.sandboxes = []
self.last_used = {}
@contextmanager
def get_sandbox(self):
"""Get an available sandbox from pool or create new one."""
sandbox = None
# Try to reuse existing sandbox
for sb in self.sandboxes:
if time.time() - self.last_used.get(sb.id, 0) > 60:
sandbox = sb
break
# Create new sandbox if pool not full
if not sandbox and len(self.sandboxes) < self.pool_size:
sandbox = CodeInterpreter()
self.sandboxes.append(sandbox)
# Wait for available sandbox
if not sandbox:
time.sleep(1)
return self.get_sandbox()
try:
self.last_used[sandbox.id] = time.time()
yield sandbox
finally:
# Cleanup if timeout exceeded
if time.time() - self.last_used[sandbox.id] > self.timeout:
sandbox.close()
self.sandboxes.remove(sandbox)
# Usage
pool = SandboxPool(pool_size=5)
def execute_multiple_tasks(tasks: list):
"""Execute multiple tasks using sandbox pool."""
results = []
for task in tasks:
with pool.get_sandbox() as sandbox:
result = sandbox.notebook.exec_cell(task)
results.append(result)
return results
2. 错误处理和重试机制
构建健壮的错误处理策略:
from tenacity import retry, stop_after_attempt, wait_exponential
from e2b_code_interpreter import CodeInterpreter
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def execute_with_retry(code: str, timeout: int = 30):
"""
Execute code with automatic retry on transient failures.
Args:
code: Code to execute
timeout: Execution timeout in seconds
Returns:
Execution results
Raises:
Exception: If all retry attempts fail
"""
try:
with CodeInterpreter(timeout=timeout) as sandbox:
execution = sandbox.notebook.exec_cell(code)
if execution.error:
logger.error(f"Execution error: {execution.error}")
# Distinguish between recoverable and non-recoverable errors
if "timeout" in str(execution.error).lower():
raise Exception("Timeout - retryable")
elif "memory" in str(execution.error).lower():
raise Exception("Memory error - retryable")
else:
# Non-retryable error (syntax, logic)
return {
"success": False,
"error": execution.error,
"retryable": False
}
return {
"success": True,
"results": execution.results,
"logs": execution.logs
}
except Exception as e:
logger.warning(f"Sandbox error, will retry: {e}")
raise
3. 安全性强化
虽然 E2B 提供了隔离环境,但仍需要额外的安全措施:
import re
from typing import List, Optional
class CodeSecurityValidator:
"""
Validate code before execution for additional security.
"""
DANGEROUS_PATTERNS = [
r'import\s+os\s*;\s*os\.system',
r'__import__\s*\(\s*["\']os["\']\s*\)',
r'eval\s*\(',
r'exec\s*\(',
r'compile\s*\(',
r'__builtins__',
r'subprocess\.call',
r'open\s*\([^)]*,\s*["\']w', # Write mode file operations
]
ALLOWED_IMPORTS = {
'pandas', 'numpy', 'matplotlib', 'seaborn',
'sklearn', 'scipy', 'requests', 'json'
}
@staticmethod
def validate_code(code: str) -> tuple[bool, Optional[str]]:
"""
Validate code for security risks.
Returns:
(is_valid, error_message)
"""
# Check for dangerous patterns
for pattern in CodeSecurityValidator.DANGEROUS_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
return False, f"Potentially dangerous code pattern detected: {pattern}"
# Validate imports
import_pattern = r'^\s*(?:from|import)\s+(\w+)'
imports = re.findall(import_pattern, code, re.MULTILINE)
for imp in imports:
if imp not in CodeSecurityValidator.ALLOWED_IMPORTS:
return False, f"Unauthorized import: {imp}"
return True, None
@staticmethod
def sanitize_output(output: str, max_length: int = 10000) -> str:
"""Sanitize and limit output length."""
if len(output) > max_length:
return output[:max_length] + f"\n... (truncated {len(output) - max_length} characters)"
return output
def safe_execute(code: str):
"""Execute code with security validation."""
# Validate code first
is_valid, error = CodeSecurityValidator.validate_code(code)
if not is_valid:
return {"success": False, "error": f"Security validation failed: {error}"}
# Execute in sandbox
with CodeInterpreter() as sandbox:
execution = sandbox.notebook.exec_cell(code)
# Sanitize output
if execution.logs.stdout:
execution.logs.stdout = CodeSecurityValidator.sanitize_output(
execution.logs.stdout
)
return {
"success": not execution.error,
"results": execution.results,
"logs": execution.logs
}
4. 性能优化
优化沙箱性能和响应时间:
from e2b_code_interpreter import CodeInterpreter
import asyncio
from typing import List, Dict
class AsyncSandboxExecutor:
"""
Asynchronous executor for parallel code execution.
"""
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute_async(self, code: str, sandbox_id: str = None) -> Dict:
"""
Execute code asynchronously with concurrency control.
Args:
code: Code to execute
sandbox_id: Optional existing sandbox ID to reuse
Returns:
Execution results
"""
async with self.semaphore:
# Run synchronous E2B code in thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
self._sync_execute,
code,
sandbox_id
)
return result
def _sync_execute(self, code: str, sandbox_id: str = None) -> Dict:
"""Synchronous execution wrapper."""
with CodeInterpreter(sandbox_id=sandbox_id) as sandbox:
execution = sandbox.notebook.exec_cell(code)
return {
"sandbox_id": sandbox.id,
"success": not execution.error,
"results": execution.results,
"error": execution.error
}
async def execute_batch(self, code_list: List[str]) -> List[Dict]:
"""
Execute multiple code snippets in parallel.
Args:
code_list: List of code snippets to execute
Returns:
List of execution results
"""
tasks = [self.execute_async(code) for code in code_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Usage example
async def main():
executor = AsyncSandboxExecutor(max_concurrent=5)
code_snippets = [
"print(sum(range(100)))",
"import pandas as pd; print(pd.__version__)",
"print([x**2 for x in range(10)])",
]
results = await executor.execute_batch(code_snippets)
for i, result in enumerate(results):
print(f"Task {i}: {result}")
# Run async execution
# asyncio.run(main())
5. 监控和日志记录
实现全面的监控和日志系统:
import logging
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json
@dataclass
class ExecutionMetrics:
"""Track execution metrics for monitoring."""
sandbox_id: str
start_time: float
end_time: Optional[float] = None
code_length: int = 0
success: bool = False
error_type: Optional[str] = None
execution_time: Optional[float] = None
def to_dict(self) -> dict:
return {
"sandbox_id": self.sandbox_id,
"timestamp": datetime.fromtimestamp(self.start_time).isoformat(),
"execution_time_ms": self.execution_time * 1000 if self.execution_time else None,
"code_length": self.code_length,
"success": self.success,
"error_type": self.error_type
}
class MonitoredSandbox:
"""Sandbox wrapper with monitoring and logging."""
def __init__(self, log_path: str = "/tmp/e2b_executions.log"):
self.logger = self._setup_logger(log_path)
self.metrics: List[ExecutionMetrics] = []
def _setup_logger(self, log_path: str) -> logging.Logger:
"""Setup structured logging."""
logger = logging.getLogger("e2b_monitor")
logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_path)
handler.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
logger.addHandler(handler)
return logger
def execute_with_monitoring(self, code: str) -> Dict:
"""Execute code with comprehensive monitoring."""
metrics = ExecutionMetrics(
sandbox_id="",
start_time=time.time(),
code_length=len(code)
)
try:
self.logger.info(f"Starting execution: {code[:100]}...")
with CodeInterpreter() as sandbox:
metrics.sandbox_id = sandbox.id
execution = sandbox.notebook.exec_cell(code)
metrics.end_time = time.time()
metrics.execution_time = metrics.end_time - metrics.start_time
metrics.success = not execution.error
if execution.error:
metrics.error_type = type(execution.error).__name__
self.logger.error(
f"Execution failed: {execution.error}",
extra=metrics.to_dict()
)
else:
self.logger.info(
"Execution succeeded",
extra=metrics.to_dict()
)
self.metrics.append(metrics)
return {
"success": metrics.success,
"results": execution.results,
"metrics": metrics.to_dict()
}
except Exception as e:
metrics.end_time = time.time()
metrics.execution_time = metrics.end_time - metrics.start_time
metrics.error_type = type(e).__name__
self.logger.exception("Unexpected error during execution")
self.metrics.append(metrics)
return {
"success": False,
"error": str(e),
"metrics": metrics.to_dict()
}
def get_statistics(self) -> Dict:
"""Get execution statistics."""
if not self.metrics:
return {"total_executions": 0}
total = len(self.metrics)
successful = sum(1 for m in self.metrics if m.success)
avg_time = sum(m.execution_time for m in self.metrics if m.execution_time) / total
return {
"total_executions": total,
"successful": successful,
"failed": total - successful,
"success_rate": successful / total * 100,
"average_execution_time_ms": avg_time * 1000
}
与 AI 框架集成
与 LangChain 集成
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from e2b_code_interpreter import CodeInterpreter
def create_e2b_tool():
"""Create LangChain tool for E2B code execution."""
def execute_python(code: str) -> str:
"""Execute Python code in E2B sandbox."""
with CodeInterpreter() as sandbox:
execution = sandbox.notebook.exec_cell(code)
if execution.error:
return f"Error: {execution.error}"
output = execution.logs.stdout or ""
if execution.results:
output += "\n" + str(execution.results)
return output
return Tool(
name="Python Executor",
func=execute_python,
description="Execute Python code in a secure sandbox. Use this to run data analysis, calculations, or generate visualizations."
)
# Create agent with E2B tool
tools = [create_e2b_tool()]
llm = OpenAI(temperature=0)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# Use the agent
response = agent.run(
"Calculate the sum of squares for numbers 1 to 100 and show me the result"
)
与 AutoGen 集成
from autogen import AssistantAgent, UserProxyAgent
from e2b_code_interpreter import CodeInterpreter
def e2b_code_executor(code: str) -> tuple[int, str]:
"""
Code executor function for AutoGen using E2B.
Returns:
(exit_code, output)
"""
try:
with CodeInterpreter() as sandbox:
execution = sandbox.notebook.exec_cell(code)
if execution.error:
return (1, f"Error: {execution.error}")
output = execution.logs.stdout or ""
if execution.results:
output += "\n" + str(execution.results)
return (0, output)
except Exception as e:
return (1, f"Sandbox error: {str(e)}")
# Configure AutoGen agents
assistant = AssistantAgent(
name="assistant",
llm_config={"model": "gpt-4"}
)
user_proxy = UserProxyAgent(
name="user_proxy",
code_execution_config={
"executor": e2b_code_executor,
"last_n_messages": 3
}
)
# Start conversation
user_proxy.initiate_chat(
assistant,
message="Analyze the correlation between two random datasets"
)
成本优化策略
1. 沙箱复用
from datetime import datetime, timedelta
from typing import Dict, Optional
class SandboxCache:
"""Cache and reuse sandboxes to reduce startup costs."""
def __init__(self, ttl_minutes: int = 10):
self.cache: Dict[str, tuple[CodeInterpreter, datetime]] = {}
self.ttl = timedelta(minutes=ttl_minutes)
def get_or_create(self, cache_key: str = "default") -> CodeInterpreter:
"""Get cached sandbox or create new one."""
if cache_key in self.cache:
sandbox, created_at = self.cache[cache_key]
# Check if sandbox is still valid
if datetime.now() - created_at < self.ttl:
return sandbox
else:
# Expired, close and remove
sandbox.close()
del self.cache[cache_key]
# Create new sandbox
sandbox = CodeInterpreter()
self.cache[cache_key] = (sandbox, datetime.now())
return sandbox
def cleanup(self):
"""Clean up expired sandboxes."""
expired_keys = []
now = datetime.now()
for key, (sandbox, created_at) in self.cache.items():
if now - created_at >= self.ttl:
sandbox.close()
expired_keys.append(key)
for key in expired_keys:
del self.cache[key]
2. 批量执行
将多个小任务合并为单次执行,减少沙箱启动开销:
def batch_execute(tasks: List[Dict]) -> List[Dict]:
"""
Execute multiple tasks in a single sandbox session.
Args:
tasks: List of tasks, each containing 'code' and 'id'
Returns:
List of results with corresponding task IDs
"""
results = []
with CodeInterpreter() as sandbox:
for task in tasks:
execution = sandbox.notebook.exec_cell(task['code'])
results.append({
'task_id': task['id'],
'success': not execution.error,
'output': execution.logs.stdout,
'error': execution.error
})
return results
故障排查指南
常见问题和解决方案
沙箱启动超时
- 原因:网络问题或 E2B 服务负载高
- 解决:实现重试机制,增加超时时间
内存不足错误
- 原因:代码处理大型数据集
- 解决:分块处理数据,使用流式处理
执行超时
- 原因:代码包含无限循环或长时间计算
- 解决:设置合理的超时时间,添加代码复杂度检查
依赖安装失败
- 原因:PyPI 网络问题或包不兼容
- 解决:使用自定义 Docker 镜像预装依赖
# Troubleshooting helper
def diagnose_execution_failure(error: Exception) -> str:
"""Provide diagnostic information for execution failures."""
error_str = str(error).lower()
if "timeout" in error_str:
return "Execution timeout. Consider: 1) Optimizing code complexity, 2) Increasing timeout limit, 3) Breaking into smaller tasks"
elif "memory" in error_str:
return "Memory limit exceeded. Consider: 1) Processing data in chunks, 2) Using more memory-efficient algorithms, 3) Reducing data size"
elif "connection" in error_str or "network" in error_str:
return "Network error. Consider: 1) Implementing retry logic, 2) Checking E2B service status, 3) Verifying API credentials"
elif "import" in error_str or "module" in error_str:
return "Import error. Consider: 1) Installing missing dependencies, 2) Using custom Docker image with pre-installed packages"
else:
return f"Unknown error. Full error message: {error}"
总结
E2B 为 AI 代理提供了强大的代码执行能力,但要充分发挥其价值,需要注意以下关键点:
- 安全第一:即使在隔离环境中,也要实施代码验证和输出清理
- 资源优化:通过沙箱复用和批量执行降低成本
- 错误处理:实现健壮的重试和降级机制
- 监控告警:建立完善的日志和指标收集系统
- 性能调优:使用异步执行和并发控制提升吞吐量
通过遵循这些最佳实践,你可以构建一个安全、高效、可靠的 AI 代理执行环境,为用户提供卓越的代码生成和执行体验。
E2B 正在成为 AI 基础设施的重要组成部分,随着 AI 代理能力的不断增强,安全可靠的代码执行环境将变得越来越关键。希望本文的最佳实践能够帮助你更好地使用 E2B,构建下一代智能应用。