# sandbox_executor.py
import os
import subprocess
import signal
import resource
from pathlib import Path
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class SandboxConfig:
"""沙盒配置"""
def __init__(
self,
workspace_root: str,
max_timeout: int = 60,
max_memory_mb: int = 512,
allowed_commands: Optional[list[str]] = None,
allow_network: bool = False,
):
self.workspace_root = Path(workspace_root).resolve()
self.max_timeout = max_timeout
self.max_memory_mb = max_memory_mb
self.allowed_commands = allowed_commands or ["python", "node", "npm", "cargo"]
self.allow_network = allow_network
class SandboxExecutor:
"""安全的代码执行器"""
# 危险命令黑名单
DANGEROUS_PATTERNS = [
"rm -rf /",
"rm -rf ~",
"dd if=",
"mkfs",
":(){:|:&};:",
"fork()",
"curl | sh",
"wget | sh",
"> /dev/sda",
"chmod 777 /",
"chown -R",
]
def __init__(self, config: SandboxConfig):
self.config = config
def execute(self, command: str, cwd: Optional[str] = None) -> dict:
"""执行命令"""
# 1. 安全检查
self._check_command(command)
# 2. 工作目录检查
work_dir = self._resolve_workdir(cwd)
# 3. 设置资源限制
self._set_limits()
# 4. 清理环境变量
env = self._clean_env()
# 5. 执行
try:
result = subprocess.run(
command,
shell=True,
cwd=str(work_dir),
env=env,
capture_output=True,
timeout=self.config.max_timeout,
text=True,
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.returncode,
}
except subprocess.TimeoutExpired:
return {
"success": False,
"stdout": "",
"stderr": "Execution timeout",
"exit_code": -1,
}
def _check_command(self, command: str):
"""检查命令是否安全"""
# 黑名单检查
for pattern in self.DANGEROUS_PATTERNS:
if pattern in command:
raise SecurityError(f"Dangerous command detected: {pattern}")
# 白名单命令检查
if self.config.allowed_commands:
cmd_name = command.split()[0]
if cmd_name not in self.config.allowed_commands:
raise SecurityError(f"Command not allowed: {cmd_name}")
def _resolve_workdir(self, cwd: Optional[str]) -> Path:
"""解析工作目录"""
if cwd:
work_dir = (self.config.workspace_root / cwd).resolve()
else:
work_dir = self.config.workspace_root
# 检查是否在允许范围内
try:
work_dir.relative_to(self.config.workspace_root)
except ValueError:
raise SecurityError(f"Path outside workspace: {cwd}")
return work_dir
def _set_limits(self):
"""设置资源限制"""
# 超时通过 subprocess.run 的 timeout 参数处理
# 内存限制 (Linux)
try:
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
limit = self.config.max_memory_mb * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (limit, hard))
except (AttributeError, ValueError):
pass # macOS 不完全支持
def _clean_env(self) -> dict:
"""清理环境变量"""
env = os.environ.copy()
# 移除危险环境变量
dangerous_vars = [
"PATH", # 限制 PATH
"LD_PRELOAD",
"LD_LIBRARY_PATH",
"PYTHONPATH",
]
for var in dangerous_vars:
env.pop(var, None)
# 设置安全的 PATH
env["PATH"] = "/usr/local/bin:/usr/bin:/bin"
# 网络控制
if not self.config.allow_network:
env.pop("HTTP_PROXY", None)
env.pop("HTTPS_PROXY", None)
env.pop("http_proxy", None)
env.pop("https_proxy", None)
return env
# 使用示例
if __name__ == "__main__":
config = SandboxConfig(
workspace_root="/path/to/workspace",
max_timeout=30,
max_memory_mb=256,
allow_network=False,
)
executor = SandboxExecutor(config)
try:
result = executor.execute("python test.py")
print(f"Success: {result['success']}")
print(f"Output: {result['stdout']}")
except SecurityError as e:
print(f"Blocked: {e}")