构建一个类似 Linux Kernel 的 Agent 运行时内核,提供:
- 统一的资源管理和调度
- 完整的隔离和安全机制
- 标准化的 Agent 生命周期管理
- 透明的上下文(内存)管理
- 可观测和可审计的执行环境
┌─────────────────────────────────────────────────────────┐
│ Agent Applications │
│ (用户定义的具体 Agent: 代码助手/研究助手等) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Agent OS Kernel (AOS) │
│ ┌───────────┬───────────┬───────────┬────────────────┐ │
│ │ Context │ Process │ I/O │ Security │ │
│ │ Manager │ Scheduler │ Manager │ Subsystem │ │
│ └───────────┴───────────┴───────────┴────────────────┘ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Storage Layer (Database) │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Hardware Resources │
│ LLM API | Vector DB | Message Queue │
└─────────────────────────────────────────────────────────┘
核心职责: 像虚拟内存一样管理 LLM 的上下文窗口
class ContextHierarchy:
"""
模拟计算机存储层次:
L1 Cache → System Prompt (< 1K tokens, 始终在 context)
L2 Cache → Working Memory (10-20K tokens, 当前任务相关)
RAM → Session Context (50-100K tokens, 本次会话)
Disk → Long-term Memory (数据库, 无限容量)
"""
def __init__(self):
self.l1_system = SystemPrompt() # < 1K
self.l2_working = WorkingMemory() # 10-20K
self.ram_session = SessionContext() # 50-100K
self.disk_longterm = Database() # Unlimitedclass ContextPagingManager:
"""
实现类似操作系统的页面置换
"""
def __init__(self):
self.page_table = {} # 虚拟地址 → 物理地址映射
self.lru_cache = LRUCache()
self.access_count = Counter()
def page_fault_handler(self, context_id: str):
"""
当访问的上下文不在当前窗口时,从数据库加载
类似操作系统的缺页中断处理
"""
# 1. 检查是否需要换出页面
if self.is_context_full():
victim = self.select_victim_page() # LRU 或其他算法
self.swap_out(victim)
# 2. 从数据库加载
context_data = self.disk_longterm.load(context_id)
# 3. 加载到当前上下文窗口
self.swap_in(context_data)
def select_victim_page(self):
"""
选择要换出的页面
策略:LRU + 访问频率 + 语义重要性
"""
candidates = self.ram_session.get_all_pages()
# 多因素评分
scores = []
for page in candidates:
score = (
self.lru_cache.get_score(page) * 0.4 + # 最近使用时间
self.access_count[page] * 0.3 + # 访问频率
self.semantic_importance(page) * 0.3 # 语义重要性
)
scores.append((page, score))
# 选择得分最低的(最不重要的)
return min(scores, key=lambda x: x[1])[0]
def semantic_importance(self, page):
"""
使用向量相似度计算当前任务的语义重要性
"""
task_embedding = self.get_current_task_embedding()
page_embedding = self.get_page_embedding(page)
return cosine_similarity(task_embedding, page_embedding)class KVCacheOptimizer:
"""
最大化 KV-Cache 命中率
Manus 的核心经验:缓存命中率是最重要的性能指标
"""
def __init__(self):
self.cache_segments = []
self.hit_rate_tracker = {}
def organize_for_cache_hit(self, context: Context):
"""
组织上下文以最大化缓存命中
"""
# 1. 将固定不变的部分放在最前面(系统提示、工具定义)
static_part = [
context.system_prompt,
context.tool_definitions,
context.project_context
]
# 2. 将可能变化的部分放在后面
dynamic_part = [
context.conversation_history,
context.current_task
]
# 3. 在动态部分内部,按变化频率排序
dynamic_part.sort(key=lambda x: x.change_frequency)
return static_part + dynamic_part
def estimate_cache_hit_rate(self, new_context: Context) -> float:
"""
预估缓存命中率,用于决策是否需要重组上下文
"""
prev_context = self.get_previous_context()
# 计算相同的 token 数量
common_tokens = set(prev_context.tokens) & set(new_context.tokens)
hit_rate = len(common_tokens) / len(new_context.tokens)
return hit_rate核心职责: 管理多个 Agent 的并发执行和资源分配
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import uuid
class AgentState(Enum):
READY = "ready" # 就绪,等待执行
RUNNING = "running" # 正在执行
WAITING = "waiting" # 等待资源(如 API 限流)
SUSPENDED = "suspended" # 被挂起(主动暂停)
TERMINATED = "terminated" # 已终止
@dataclass
class AgentProcess:
"""Agent 进程控制块 (PCB)"""
pid: str # 进程ID
name: str # Agent 名称
state: AgentState # 当前状态
priority: int # 优先级 (0-100)
# 资源使用
token_usage: int # 已使用的 token 数
api_calls: int # API 调用次数
execution_time: float # 执行时间(秒)
# 上下文
context_snapshot: dict # 上下文快照
checkpoint: Optional[str] # 检查点 ID
# 调度信息
created_at: float
last_run: float
cpu_time: float # 实际 LLM 推理时间
def __init__(self, name: str, priority: int = 50):
self.pid = str(uuid.uuid4())
self.name = name
self.state = AgentState.READY
self.priority = priority
self.token_usage = 0
self.api_calls = 0
self.execution_time = 0
self.context_snapshot = {}
self.checkpoint = Nonefrom queue import PriorityQueue
import time
class AgentScheduler:
"""
Agent 调度器
支持多种调度策略:优先级、公平调度、抢占式调度
"""
def __init__(self):
self.ready_queue = PriorityQueue()
self.running: Optional[AgentProcess] = None
self.waiting_queue = []
self.process_table = {} # pid → AgentProcess
# 资源配额
self.token_quota = 100000 # 每小时 token 配额
self.api_quota = 1000 # 每小时 API 调用配额
def schedule(self) -> Optional[AgentProcess]:
"""
调度下一个要执行的 Agent
"""
# 1. 检查当前运行的进程是否需要抢占
if self.running and self.should_preempt(self.running):
self.suspend(self.running)
self.running = None
# 2. 从就绪队列选择下一个进程
if not self.ready_queue.empty():
priority, timestamp, agent = self.ready_queue.get()
self.running = agent
agent.state = AgentState.RUNNING
agent.last_run = time.time()
return agent
return None
def should_preempt(self, agent: AgentProcess) -> bool:
"""
决定是否抢占当前进程
"""
# 1. 检查是否有更高优先级的进程
if not self.ready_queue.empty():
top_priority = self.ready_queue.queue[0][0]
if top_priority < -agent.priority: # 优先级更高(数字更小)
return True
# 2. 检查时间片是否用完(公平调度)
time_slice = 60 # 60秒时间片
if time.time() - agent.last_run > time_slice:
return True
# 3. 检查资源使用是否超额
if agent.token_usage > self.token_quota * 0.3: # 单个进程不能用超过30%配额
return True
return False
def suspend(self, agent: AgentProcess):
"""
挂起 Agent(保存检查点)
"""
# 1. 创建检查点
checkpoint_id = self.save_checkpoint(agent)
agent.checkpoint = checkpoint_id
# 2. 修改状态
agent.state = AgentState.SUSPENDED
# 3. 重新加入就绪队列
self.add_to_ready_queue(agent)
def save_checkpoint(self, agent: AgentProcess) -> str:
"""
保存 Agent 的执行状态到数据库
类似操作系统的 swap out
"""
checkpoint = {
'pid': agent.pid,
'context': agent.context_snapshot,
'state': agent.state.value,
'timestamp': time.time()
}
# 存入数据库
checkpoint_id = str(uuid.uuid4())
db.save(f"checkpoint:{checkpoint_id}", checkpoint)
return checkpoint_id
def restore_checkpoint(self, checkpoint_id: str) -> AgentProcess:
"""
从检查点恢复 Agent
"""
checkpoint = db.load(f"checkpoint:{checkpoint_id}")
agent = AgentProcess(
name=checkpoint['name'],
priority=checkpoint['priority']
)
agent.context_snapshot = checkpoint['context']
return agentclass ResourceQuotaManager:
"""
管理 API 调用、Token 使用等资源配额
"""
def __init__(self):
self.hourly_limits = {
'tokens': 100000,
'api_calls': 1000,
}
self.current_usage = {
'tokens': 0,
'api_calls': 0,
}
self.per_agent_limits = {}
def request_quota(self, agent_id: str, resource: str, amount: int) -> bool:
"""
请求资源配额
返回是否批准
"""
# 1. 检查全局配额
if self.current_usage[resource] + amount > self.hourly_limits[resource]:
return False
# 2. 检查单个 Agent 配额
agent_usage = self.per_agent_limits.get(agent_id, {}).get(resource, 0)
max_per_agent = self.hourly_limits[resource] * 0.3 # 单个 Agent 最多 30%
if agent_usage + amount > max_per_agent:
return False
# 3. 批准并记录
self.current_usage[resource] += amount
if agent_id not in self.per_agent_limits:
self.per_agent_limits[agent_id] = {}
self.per_agent_limits[agent_id][resource] = agent_usage + amount
return True核心职责: 提供统一的持久化存储接口
-- Agent 进程表
CREATE TABLE agent_processes (
pid UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
state VARCHAR(50) NOT NULL,
priority INTEGER DEFAULT 50,
token_usage BIGINT DEFAULT 0,
api_calls INTEGER DEFAULT 0,
context_snapshot JSONB,
checkpoint_id UUID,
created_at TIMESTAMPTZ DEFAULT NOW(),
last_run TIMESTAMPTZ,
CONSTRAINT valid_state CHECK (state IN ('ready', 'running', 'waiting', 'suspended', 'terminated'))
);
-- 上下文存储表(使用 JSONB + 向量)
CREATE TABLE context_storage (
context_id UUID PRIMARY KEY,
agent_pid UUID REFERENCES agent_processes(pid),
content TEXT NOT NULL,
embedding vector(1536), -- pgvector
metadata JSONB,
importance_score FLOAT, -- 语义重要性
access_count INTEGER DEFAULT 0,
last_accessed TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 为向量检索创建索引
CREATE INDEX ON context_storage USING ivfflat (embedding vector_cosine_ops);
-- 检查点表
CREATE TABLE checkpoints (
checkpoint_id UUID PRIMARY KEY,
agent_pid UUID REFERENCES agent_processes(pid),
full_state JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 审计日志表
CREATE TABLE audit_logs (
log_id UUID PRIMARY KEY,
agent_pid UUID REFERENCES agent_processes(pid),
action_type VARCHAR(100) NOT NULL,
input_data JSONB,
output_data JSONB,
reasoning TEXT, -- Agent 的推理过程
timestamp TIMESTAMPTZ DEFAULT NOW()
);
-- 创建索引
CREATE INDEX idx_audit_agent ON audit_logs(agent_pid, timestamp DESC);
CREATE INDEX idx_context_agent ON context_storage(agent_pid);import psycopg2
from pgvector.psycopg2 import register_vector
import json
class StorageManager:
"""
统一的存储管理接口
"""
def __init__(self, db_url: str):
self.conn = psycopg2.connect(db_url)
register_vector(self.conn)
self.cur = self.conn.cursor()
def save_context(self, agent_pid: str, content: str, embedding: list, metadata: dict):
"""
保存上下文到数据库
"""
context_id = str(uuid.uuid4())
self.cur.execute("""
INSERT INTO context_storage
(context_id, agent_pid, content, embedding, metadata, importance_score)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
context_id,
agent_pid,
content,
embedding,
json.dumps(metadata),
metadata.get('importance', 0.5)
))
self.conn.commit()
return context_id
def semantic_search(self, agent_pid: str, query_embedding: list, limit: int = 10):
"""
语义检索相关上下文
"""
self.cur.execute("""
SELECT context_id, content, metadata, importance_score,
1 - (embedding <=> %s::vector) AS similarity
FROM context_storage
WHERE agent_pid = %s
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, agent_pid, query_embedding, limit))
return self.cur.fetchall()
def save_checkpoint(self, agent: AgentProcess):
"""
保存检查点
"""
checkpoint_id = str(uuid.uuid4())
self.cur.execute("""
INSERT INTO checkpoints (checkpoint_id, agent_pid, full_state)
VALUES (%s, %s, %s)
""", (
checkpoint_id,
agent.pid,
json.dumps(agent.__dict__, default=str)
))
self.conn.commit()
return checkpoint_id
def log_action(self, agent_pid: str, action_type: str,
input_data: dict, output_data: dict, reasoning: str):
"""
记录审计日志
"""
self.cur.execute("""
INSERT INTO audit_logs
(log_id, agent_pid, action_type, input_data, output_data, reasoning)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
str(uuid.uuid4()),
agent_pid,
action_type,
json.dumps(input_data),
json.dumps(output_data),
reasoning
))
self.conn.commit()核心职责: 标准化的工具调用接口
from abc import ABC, abstractmethod
from typing import Any, Dict
import subprocess
import json
class Tool(ABC):
"""
标准化工具接口
"""
@abstractmethod
def name(self) -> str:
"""工具名称"""
pass
@abstractmethod
def description(self) -> str:
"""工具描述(给 LLM 看)"""
pass
@abstractmethod
def parameters(self) -> dict:
"""参数 Schema"""
pass
@abstractmethod
def execute(self, **kwargs) -> Dict[str, Any]:
"""
执行工具
返回格式:
{
"success": bool,
"data": any,
"error": str | None,
"metadata": dict
}
"""
pass
class CLITool(Tool):
"""
CLI 工具包装器
将任意命令行工具包装成标准接口
"""
def __init__(self, command: str, tool_name: str, description: str):
self.command = command
self._name = tool_name
self._description = description
def name(self) -> str:
return self._name
def description(self) -> str:
return self._description
def execute(self, **kwargs) -> Dict[str, Any]:
"""
执行 CLI 命令
强制 JSON 输出(如果工具支持)
"""
# 构建命令
cmd_parts = [self.command]
# 添加参数
for key, value in kwargs.items():
cmd_parts.append(f"--{key}")
cmd_parts.append(str(value))
# 强制 JSON 输出
if '--json' not in cmd_parts:
cmd_parts.append('--json')
try:
result = subprocess.run(
cmd_parts,
capture_output=True,
text=True,
timeout=30
)
# 解析 JSON 输出
if result.returncode == 0:
try:
data = json.loads(result.stdout)
return {
"success": True,
"data": data,
"error": None,
"metadata": {
"exit_code": 0,
"stderr": result.stderr
}
}
except json.JSONDecodeError:
# 如果不是 JSON,返回原始文本
return {
"success": True,
"data": result.stdout,
"error": None,
"metadata": {"format": "text"}
}
else:
return {
"success": False,
"data": None,
"error": result.stderr,
"metadata": {"exit_code": result.returncode}
}
except subprocess.TimeoutExpired:
return {
"success": False,
"data": None,
"error": "Command timeout after 30 seconds",
"metadata": {"timeout": True}
}
except Exception as e:
return {
"success": False,
"data": None,
"error": str(e),
"metadata": {"exception": type(e).__name__}
}
class ToolRegistry:
"""
工具注册表
"""
def __init__(self):
self.tools = {}
def register(self, tool: Tool):
"""注册工具"""
self.tools[tool.name()] = tool
def get(self, name: str) -> Tool:
"""获取工具"""
return self.tools.get(name)
def list_tools(self) -> list:
"""列出所有工具"""
return [
{
"name": tool.name(),
"description": tool.description(),
"parameters": tool.parameters()
}
for tool in self.tools.values()
]
def auto_discover_cli_tools(self):
"""
自动发现系统 CLI 工具
"""
common_tools = [
('grep', 'Search text using patterns'),
('find', 'Find files and directories'),
('jq', 'Process JSON data'),
('curl', 'Transfer data from URLs'),
('psql', 'PostgreSQL interactive terminal'),
]
for cmd, desc in common_tools:
# 检查工具是否可用
try:
subprocess.run([cmd, '--version'],
capture_output=True,
timeout=1)
self.register(CLITool(cmd, cmd, desc))
except (FileNotFoundError, subprocess.TimeoutExpired):
pass核心职责: 隔离、审计、可观测性
import docker
from typing import Optional
class SandboxManager:
"""
沙箱管理器
使用 Docker 容器隔离 Agent 执行
"""
def __init__(self):
self.client = docker.from_env()
self.containers = {}
def create_sandbox(self, agent_pid: str) -> str:
"""
为 Agent 创建隔离的沙箱
"""
container = self.client.containers.run(
image="agent-sandbox:latest", # 预构建的沙箱镜像
detach=True,
network_mode="bridge",
mem_limit="512m", # 限制内存
cpu_quota=50000, # 限制 CPU (50%)
name=f"agent-{agent_pid}",
environment={
"AGENT_PID": agent_pid
},
volumes={
f"/tmp/agent-{agent_pid}": {
"bind": "/workspace",
"mode": "rw"
}
}
)
self.containers[agent_pid] = container
return container.id
def execute_in_sandbox(self, agent_pid: str, command: str) -> dict:
"""
在沙箱中执行命令
"""
container = self.containers.get(agent_pid)
if not container:
raise ValueError(f"No sandbox found for agent {agent_pid}")
try:
result = container.exec_run(command, demux=True)
stdout, stderr = result.output
return {
"success": result.exit_code == 0,
"stdout": stdout.decode() if stdout else "",
"stderr": stderr.decode() if stderr else "",
"exit_code": result.exit_code
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def destroy_sandbox(self, agent_pid: str):
"""
销毁沙箱
"""
container = self.containers.get(agent_pid)
if container:
container.stop()
container.remove()
del self.containers[agent_pid]from datetime import datetime
from typing import List
class ObservabilitySystem:
"""
可观测性系统
记录 Agent 的完整决策链路
"""
def __init__(self, storage: StorageManager):
self.storage = storage
def trace_decision(self, agent_pid: str,
input_context: str,
reasoning: str,
action: dict,
result: dict):
"""
追踪一次完整的决策过程
"""
trace = {
"agent_pid": agent_pid,
"timestamp": datetime.now().isoformat(),
"input": input_context,
"reasoning": reasoning,
"action": action,
"result": result
}
# 保存到审计日志
self.storage.log_action(
agent_pid=agent_pid,
action_type="decision",
input_data={"context": input_context},
output_data=result,
reasoning=reasoning
)
def get_agent_trace(self, agent_pid: str, limit: int = 100) -> List[dict]:
"""
获取 Agent 的执行轨迹
"""
self.storage.cur.execute("""
SELECT action_type, input_data, output_data, reasoning, timestamp
FROM audit_logs
WHERE agent_pid = %s
ORDER BY timestamp DESC
LIMIT %s
""", (agent_pid, limit))
return [
{
"action": row[0],
"input": row[1],
"output": row[2],
"reasoning": row[3],
"timestamp": row[4]
}
for row in self.storage.cur.fetchall()
]
def replay_execution(self, agent_pid: str, from_checkpoint: str):
"""
回放 Agent 的执行过程
用于调试和审计
"""
# 1. 加载检查点
checkpoint = self.storage.load_checkpoint(from_checkpoint)
# 2. 获取之后的所有操作
traces = self.get_agent_trace(agent_pid)
# 3. 逐步重放
for trace in reversed(traces):
if trace['timestamp'] <= checkpoint['created_at']:
break
print(f"[{trace['timestamp']}] {trace['action']}")
print(f" Input: {trace['input']}")
print(f" Reasoning: {trace['reasoning']}")
print(f" Output: {trace['output']}")
print()class AgentOS:
"""
Agent OS Kernel 主类
"""
def __init__(self):
self.context_manager = ContextManager()
self.scheduler = AgentScheduler()
self.storage = StorageManager("postgresql://localhost/agent_os")
self.io_manager = ToolRegistry()
self.sandbox = SandboxManager()
self.observability = ObservabilitySystem(self.storage)
def spawn_agent(self, name: str, task: str, priority: int = 50) -> str:
"""
启动一个新 Agent
"""
# 1. 创建进程
agent = AgentProcess(name=name, priority=priority)
# 2. 初始化上下文
initial_context = self.context_manager.create_context(
agent_id=agent.pid,
task=task
)
agent.context_snapshot = initial_context
# 3. 创建沙箱
sandbox_id = self.sandbox.create_sandbox(agent.pid)
# 4. 保存到数据库
self.storage.save_process(agent)
# 5. 加入调度队列
self.scheduler.add_to_ready_queue(agent)
print(f"✓ Agent {name} spawned with PID {agent.pid}")
return agent.pid
def run(self):
"""
主循环:调度和执行 Agent
"""
while True:
# 1. 调度下一个 Agent
agent = self.scheduler.schedule()
if agent is None:
time.sleep(0.1)
continue
# 2. 加载上下文
context = self.context_manager.load_context(agent.pid)
# 3. 执行 Agent 的推理循环
try:
result = self.execute_agent_step(agent, context)
# 4. 保存结果和状态
self.context_manager.save_context(agent.pid, result)
self.storage.save_process(agent)
# 5. 检查是否完成
if result.get('done'):
agent.state = AgentState.TERMINATED
self.scheduler.remove_process(agent.pid)
except Exception as e:
# 错误处理
print(f"Error in agent {agent.pid}: {e}")
self.observability.trace_decision(
agent.pid,
context,
f"Error: {e}",
{},
{"error": str(e)}
)
def execute_agent_step(self, agent: AgentProcess, context: dict) -> dict:
"""
执行 Agent 的一步推理
"""
# 1. 准备 LLM 输入
llm_input = self.context_manager.prepare_llm_input(context)
# 2. 调用 LLM
response = self.call_llm(llm_input)
# 3. 解析响应
reasoning = response.get('reasoning', '')
action = response.get('action', {})
# 4. 执行工具调用
if action:
tool_result = self.io_manager.get(action['tool']).execute(
**action['parameters']
)
else:
tool_result = None
# 5. 记录到审计日志
self.observability.trace_decision(
agent.pid,
llm_input,
reasoning,
action,
tool_result
)
# 6. 返回结果
return {
'reasoning': reasoning,
'action': action,
'result': tool_result,
'done': response.get('done', False)
}- 统一数据平面:关系数据 + 向量 + JSON + 全文搜索
- ACID 保证:检查点、审计日志需要事务一致性
- 丰富的索引:B-tree、IVFFLAT、GIN 满足不同查询需求
- 成熟的生态:连接池、备份、监控工具完善
- LLM 友好:模型训练数据中有大量 PostgreSQL 文档
- Token 成本优化:缓存命中率直接影响成本(10x 差异)
- 语义重要性评估:如何判断哪些上下文更重要?
- 实时性要求:页面置换不能阻塞推理过程
- 多 Agent 共享:如何在多个 Agent 间共享公共知识?
- 抢占 vs 协作:LLM 调用不可中断,只能在步骤间抢占
- 公平性 vs 优先级:如何平衡高优先级 Agent 和公平调度?
- 资源配额:API 限流、Token 配额如何分配?
- Context Manager 基础实现
- 简单的 Round-Robin 调度器
- PostgreSQL 存储层
- 基本的工具调用接口
- LRU + 语义重要性的页面置换
- 优先级调度和抢占
- Docker 沙箱集成
- 完整的审计日志
- 分布式调度(多节点)
- 监控和可观测性 Dashboard
- 性能优化(缓存、批处理)
- 安全加固和权限管理
-
操作系统经典:
- 《Operating System Concepts》(恐龙书)
- 《Modern Operating Systems》(Tanenbaum)
-
数据库系统:
- 《Designing Data-Intensive Applications》
- PostgreSQL 官方文档
-
Agent 实践:
- Manus Context Engineering 博客
- DeepSeek Engram 论文
- Anthropic MCP 文档
这个架构的核心思想是:不要重新发明轮子,而是站在 50 年操作系统演化的肩膀上。
内存管理、进程调度、I/O 抽象、安全隔离——这些问题操作系统已经用几十年时间打磨出了优雅的解决方案。我们要做的是把这些经验应用到 Agent 领域,而不是从零开始摸索。
最关键的创新点在于:
- Context = Memory: 把上下文当作虚拟内存来管理
- PostgreSQL = 统一存储层: 一个数据库解决所有持久化需求
- Agent-Native CLI: 不发明新协议,让现有工具更规范
- 可观测性优先: 信任建立在透明度之上
这不是一个玩具项目——这是未来 Agent 基础设施的雏形。