claude-code架构-引擎室

claude-code架构-引擎室
寒霜Architecture: The Engine Room
架构:引擎室
graph TB
subgraph "核心:tt控制循环"
Start([用户输入]) --> Init[初始化回合]
Init --> Compact{需要压缩?}
Compact -->|是| CompactLLM[LLM总结]
CompactLLM --> Assembly
Compact -->|否| Assembly[组装上下文]
Assembly --> Stream[流式传输到LLM]
Stream --> Process[处理事件]
Process --> Tools{工具请求?}
Tools -->|是| Execute[执行工具]
Execute --> Recurse[递归tt]
Recurse --> Init
Tools -->|否| End([完成])
end
style Init fill:#e1f5fe
style Stream fill:#fff3e0
style Execute fill:#e8f5e9
style Recurse fill:#fce4ec
tt控制循环:跳动的心脏
整个Claude Code系统围绕一个名为tt的异步生成器函数运转。这个函数协调每一次交互,从用户输入到LLM通信再到工具执行。让我们剖析这个卓越的工程奇迹:
// 来自代码库的实际tt函数签名
async function* tt(
currentMessages: CliMessage[], // 完整的对话历史
baseSystemPromptString: string, // 静态系统指令
currentGitContext: GitContext, // 实时git状态
currentClaudeMdContents: ClaudeMdContent[], // 项目上下文
permissionGranterFn: PermissionGranter, // 权限回调
toolUseContext: ToolUseContext, // 共享执行上下文
activeStreamingToolUse?: ToolUseBlock, // 恢复流式状态
loopState: {
turnId: string, // 本回合的UUID
turnCounter: number, // 递归深度跟踪器
compacted?: boolean, // 历史压缩标志
isResuming?: boolean // 从保存状态恢复
}
): AsyncGenerator<CliMessage, void, void>
这个签名揭示了在起作用的复杂状态管理。该函数产生CliMessage对象,这些对象驱动UI更新,同时保持对话流程。让我们检查每个阶段:
阶段1:回合初始化和上下文准备
{
// 通知UI处理已经开始
yield {
type: "ui_state_update",
uuid: `uistate-${loopState.turnId}-${Date.now()}`,
timestamp: new Date().toISOString(),
data: { status: "thinking", turnId: loopState.turnId }
};
// 检查上下文窗口压力
let messagesForLlm = currentMessages;
let wasCompactedThisIteration = false;
if (await shouldAutoCompact(currentMessages)) {
yield {
type: "ui_notification",
data: { message: "上下文较大,尝试压缩..." }
};
try {
const compactionResult = await compactAndStoreConversation(
currentMessages,
toolUseContext,
true
);
messagesForLlm = compactionResult.messagesAfterCompacting;
wasCompactedThisIteration = true;
loopState.compacted = true;
yield createSystemNotificationMessage(
`对话历史已自动压缩。摘要:${
compactionResult.summaryMessage.message.content[0].text
}`
);
} catch (compactionError) {
yield createSystemErrorMessage(
`压缩对话失败:${compactionError.message}`
);
}
}
}
阶段1性能概况:
| 操作 | 典型持续时间 | 复杂度 |
|---|---|---|
| Token计数 | 10-50ms | O(n) 消息 |
| 压缩决策 | <1ms | O(1) |
| LLM总结 | 2000-3000ms | 一次LLM调用 |
| 消息重构 | 5-10ms | O(n) 消息 |
阶段2:动态系统提示组装
系统提示不是静态的——它是为每个回合新组装的:
{
// 并行获取所有上下文源
const [toolSpecs, dirStructure] = await Promise.all([
// 将工具定义转换为LLM兼容的规格
Promise.all(
toolUseContext.options.tools
.filter(tool => tool.isEnabled ? tool.isEnabled() : true)
.map(async (tool) => convertToolDefinitionToToolSpecification(tool, toolUseContext))
),
// 获取当前目录结构
getDirectoryStructureSnapshot(toolUseContext)
]);
// 组装完整的系统提示
const systemPromptForLlm = assembleSystemPrompt(
baseSystemPromptString, // 核心指令
currentClaudeMdContents, // 项目特定上下文
currentGitContext, // Git状态/分支/提交
dirStructure, // 文件树
toolSpecs // 可用工具
);
// 准备带缓存控制的消息
const apiMessages = prepareMessagesForApi(
messagesForLlm,
true // applyEphemeralCacheControl
);
}
组装过程遵循严格的优先级顺序:
优先级1:基础指令 (~2KB)
↓
优先级2:模型特定适配 (~500B)
↓
优先级3:CLAUDE.md内容 (可变,通常5-50KB)
↓
优先级4:Git上下文 (~1-5KB)
↓
优先级5:目录结构 (截断以适应)
↓
优先级6:工具规格 (~10-20KB)
阶段3:LLM流初始化
{
// 初始化流式调用
const llmStream = callLlmStreamApi(
apiMessages,
systemPromptForLlm,
toolSpecificationsForLlm,
toolUseContext.options.mainLoopModel,
toolUseContext.abortController.signal
);
// 为流式响应初始化累加器
let accumulatedAssistantMessage: Partial<CliMessage> & {
message: Partial<ApiMessage> & { content: ContentBlock[] }
} = {
type: "assistant",
uuid: `assistant-${loopState.turnId}-${loopState.turnCounter}-${Date.now()}`,
timestamp: new Date().toISOString(),
message: { role: "assistant", content: [] }
};
let currentToolUsesFromLlm: ToolUseBlock[] = [];
let currentThinkingContent: string = "";
let currentToolInputJsonBuffer: string = "";
}
阶段4:流事件处理状态机
这就是魔法发生的地方——实时处理流事件:
{
for await (const streamEvent of llmStream) {
// 中止检查
if (toolUseContext.abortController.signal.aborted) {
yield createSystemNotificationMessage("LLM流处理被用户中止。");
return;
}
switch (streamEvent.type) {
case "message_start":
accumulatedAssistantMessage.message.id = streamEvent.message.id;
accumulatedAssistantMessage.message.model = streamEvent.message.model;
accumulatedAssistantMessage.message.usage = streamEvent.message.usage;
yield {
type: "ui_state_update",
data: {
status: "assistant_responding",
model: streamEvent.message.model
}
};
break;
case "content_block_start":
const newBlockPlaceholder = { ...streamEvent.content_block };
// 根据块类型初始化空内容
if (streamEvent.content_block.type === "thinking") {
currentThinkingContent = "";
newBlockPlaceholder.thinking = "";
} else if (streamEvent.content_block.type === "tool_use") {
currentToolInputJsonBuffer = "";
newBlockPlaceholder.input = "";
} else if (streamEvent.content_block.type === "text") {
newBlockPlaceholder.text = "";
}
accumulatedAssistantMessage.message.content.push(newBlockPlaceholder);
break;
case "content_block_delta":
const lastBlockIndex = accumulatedAssistantMessage.message.content.length - 1;
if (lastBlockIndex < 0) continue;
const currentBlock = accumulatedAssistantMessage.message.content[lastBlockIndex];
if (streamEvent.delta.type === "text_delta" && currentBlock.type === "text") {
currentBlock.text += streamEvent.delta.text;
yield {
type: "ui_text_delta",
data: {
textDelta: streamEvent.delta.text,
blockIndex: lastBlockIndex
}
};
} else if (streamEvent.delta.type === "input_json_delta" && currentBlock.type === "tool_use") {
currentToolInputJsonBuffer += streamEvent.delta.partial_json;
currentBlock.input = currentToolInputJsonBuffer;
// 尝试解析不完整的JSON进行预览
const parseAttempt = tryParsePartialJson(currentToolInputJsonBuffer);
if (parseAttempt.complete) {
yield {
type: "ui_tool_preview",
data: {
toolId: currentBlock.id,
preview: parseAttempt.value
}
};
}
}
break;
case "content_block_stop":
const completedBlock = accumulatedAssistantMessage.message.content[streamEvent.index];
if (completedBlock.type === "tool_use") {
try {
const parsedInput = JSON.parse(currentToolInputJsonBuffer);
completedBlock.input = parsedInput;
currentToolUsesFromLlm.push({
type: "tool_use",
id: completedBlock.id,
name: completedBlock.name,
input: parsedInput
});
} catch (e) {
// 处理来自LLM的格式错误的JSON
completedBlock.input = {
error: "来自LLM的无效JSON输入",
raw_json_string: currentToolInputJsonBuffer,
parse_error: e.message
};
}
currentToolInputJsonBuffer = "";
}
yield {
type: "ui_content_block_complete",
data: { block: completedBlock, blockIndex: streamEvent.index }
};
break;
case "message_stop":
// LLM生成完成
const finalAssistantMessage = finalizeCliMessage(
accumulatedAssistantMessage,
loopState.turnId,
loopState.turnCounter
);
yield finalAssistantMessage;
// 移动到阶段5或6...
break;
}
}
}
流处理性能:
- 首个token延迟:300-800ms(因模型而异)
- Token吞吐量:50-100 tokens/秒
- UI更新频率:文本每个token更新,工具输入批量更新
- 内存使用:无论响应长度如何都保持恒定
阶段5:工具执行编排
当LLM请求使用工具时,架构转换到执行模式:
{
if (finalAssistantMessage.message.stop_reason === "tool_use" &&
currentToolUsesFromLlm.length > 0) {
yield { type: "ui_state_update", data: { status: "executing_tools" } };
let toolResultMessages: CliMessage[] = [];
// 智能批量处理工具
for await (const toolOutcomeMessage of processToolCallsInParallelBatches(
currentToolUsesFromLlm,
finalAssistantMessage,
permissionGranterFn,
toolUseContext
)) {
yield toolOutcomeMessage;
if (toolOutcomeMessage.type === 'user' && toolOutcomeMessage.isMeta) {
toolResultMessages.push(toolOutcomeMessage);
}
}
// 检查工具执行期间的中止
if (toolUseContext.abortController.signal.aborted) {
yield createSystemNotificationMessage("工具执行被用户中止。");
return;
}
// 排序结果以匹配LLM的请求顺序
const sortedToolResultMessages = sortToolResultsByOriginalRequestOrder(
toolResultMessages,
currentToolUsesFromLlm
);
// 阶段6:递归处理结果
yield* tt(
[...messagesForLlm, finalAssistantMessage, ...sortedToolResultMessages],
baseSystemPromptString,
currentGitContext,
currentClaudeMdContents,
permissionGranterFn,
toolUseContext,
undefined,
{ ...loopState, turnCounter: loopState.turnCounter + 1 }
);
return;
}
}
阶段6:递归控制
tt函数是尾递归的,允许无限的对话深度(由安全措施限制):
// 递归安全措施
if (loopState.turnCounter >= 10) {
yield createSystemMessage(
"已达到最大对话深度。请开始新的查询。"
);
return;
}
// 递归前的内存压力检查
const estimatedMemory = estimateConversationMemory(messagesForLlm);
if (estimatedMemory > MEMORY_THRESHOLD) {
// 继续前强制压缩
const compacted = await forceCompaction(messagesForLlm);
messagesForLlm = compacted;
}
分层架构
Claude Code实现了一个清晰的分层架构,每一层都有明确的职责:
graph TD
subgraph "第1层:用户界面"
React[React组件]
Ink[Ink渲染器]
Yoga[Yoga布局引擎]
React --> Ink
Ink --> Yoga
end
subgraph "第2层:代理核心"
TT[tt控制循环]
Context[上下文组装]
Permission[权限系统]
State[会话状态]
TT --> Context
TT --> Permission
TT --> State
end
subgraph "第3层:LLM交互"
Stream[流处理器]
Retry[重试逻辑]
Token[Token计数器]
Stream --> Retry
Stream --> Token
end
subgraph "第4层:工具系统"
Executor[工具执行器]
Validator[输入验证器]
Sandbox[沙箱管理器]
Executor --> Validator
Executor --> Sandbox
end
subgraph "第5层:基础设施"
FS[文件系统]
Process[进程管理器]
Network[网络客户端]
Telemetry[遥测]
end
React -.-> TT
TT -.-> Stream
TT -.-> Executor
Executor -.-> FS
Executor -.-> Process
Stream -.-> Network
TT -.-> Telemetry
层通信模式
层之间的通信遵循严格的模式:
- 向下通信:直接函数调用
- 向上通信:事件和回调
- 跨层通信:共享上下文对象
// 示例:UI到代理核心的通信
class UIToAgentBridge {
async handleUserInput(input: string) {
// 向下:直接调用
const action = await pd(input, this.context);
// 根据操作类型处理
switch (action.type) {
case 'normal_prompt':
// 开始新的tt循环迭代
for await (const message of tt(...)) {
// 向上:产生事件
this.uiRenderer.handleMessage(message);
}
break;
}
}
}
// 示例:工具通过进度与UI通信
class ToolToUIBridge {
async *executeWithProgress(tool: ToolDefinition, input: any) {
// 工具产生进度
for await (const event of tool.call(input, this.context)) {
if (event.type === 'progress') {
// 转换为UI事件
yield {
type: 'ui_progress',
toolName: tool.name,
progress: event.data
};
}
}
}
}
事件驱动和流式架构
整个系统构建在流式原语之上:
流背压管理
class StreamBackpressureController {
private queue: StreamEvent[] = [];
private processing = false;
private pressure = {
high: 1000, // 开始丢弃非关键事件
critical: 5000 // 除错误外全部丢弃
};
async handleEvent(event: StreamEvent) {
this.queue.push(event);
// 应用背压策略
if (this.queue.length > this.pressure.critical) {
// 只保留关键事件
this.queue = this.queue.filter(e =>
e.type === 'error' ||
e.type === 'message_stop'
);
} else if (this.queue.length > this.pressure.high) {
// 丢弃文本增量,保留结构
this.queue = this.queue.filter(e =>
e.type !== 'content_block_delta' ||
e.delta.type !== 'text_delta'
);
}
if (!this.processing) {
await this.processQueue();
}
}
private async processQueue() {
this.processing = true;
while (this.queue.length > 0) {
const batch = this.queue.splice(0, 100); // 批量处理
await this.processBatch(batch);
// 让出给事件循环
await new Promise(resolve => setImmediate(resolve));
}
this.processing = false;
}
}
进度事件聚合
多个并发操作需要协调的进度报告:
class ProgressAggregator {
private progressStreams = new Map<string, AsyncIterator<ProgressEvent>>();
async *aggregateProgress(
operations: Array<{ id: string, operation: AsyncIterator<any> }>
): AsyncGenerator<AggregatedProgress> {
// 启动所有操作
for (const { id, operation } of operations) {
this.progressStreams.set(id, operation);
}
// 轮询所有流
while (this.progressStreams.size > 0) {
const promises = Array.from(this.progressStreams.entries()).map(
async ([id, stream]) => {
const { value, done } = await stream.next();
return { id, value, done };
}
);
// 竞争获取下一个事件
const result = await Promise.race(promises);
if (result.done) {
this.progressStreams.delete(result.id);
} else if (result.value.type === 'progress') {
yield {
type: 'aggregated_progress',
source: result.id,
progress: result.value
};
}
}
}
}
状态管理架构
Claude Code使用务实的方法进行状态管理:
全局会话状态
// 带直接突变的单例会话状态
class SessionState {
private static instance: SessionState;
// 核心状态
sessionId: string = crypto.randomUUID();
cwd: string = process.cwd();
totalCostUSD: number = 0;
totalAPIDuration: number = 0;
// 模型使用跟踪
modelTokens: Record<string, {
inputTokens: number;
outputTokens: number;
cacheReadInputTokens: number;
cacheCreationInputTokens: number;
}> = {};
// 直接突变方法
incrementCost(amount: number) {
this.totalCostUSD += amount;
this.persistToDisk(); // 异步,非阻塞
}
updateTokenUsage(model: string, usage: TokenUsage) {
if (!this.modelTokens[model]) {
this.modelTokens[model] = {
inputTokens: 0,
outputTokens: 0,
cacheReadInputTokens: 0,
cacheCreationInputTokens: 0
};
}
const tokens = this.modelTokens[model];
tokens.inputTokens += usage.input_tokens || 0;
tokens.outputTokens += usage.output_tokens || 0;
tokens.cacheReadInputTokens += usage.cache_read_input_tokens || 0;
tokens.cacheCreationInputTokens += usage.cache_creation_input_tokens || 0;
}
private async persistToDisk() {
// 防抖写入以避免过度I/O
clearTimeout(this.persistTimer);
this.persistTimer = setTimeout(async () => {
await fs.writeFile(
'.claude/session.json',
JSON.stringify(this, null, 2)
);
}, 1000);
}
}
带弱引用的文件状态
class ReadFileState {
private cache = new Map<string, WeakRef<FileContent>>();
private registry = new FinalizationRegistry((path: string) => {
// FileContent被垃圾回收时清理
this.cache.delete(path);
});
set(path: string, content: FileContent) {
const ref = new WeakRef(content);
this.cache.set(path, ref);
this.registry.register(content, path);
}
get(path: string): FileContent | undefined {
const ref = this.cache.get(path);
if (ref) {
const content = ref.deref();
if (!content) {
// 内容已被垃圾回收
this.cache.delete(path);
return undefined;
}
return content;
}
}
checkFreshness(path: string): 'fresh' | 'stale' | 'unknown' {
const cached = this.get(path);
if (!cached) return 'unknown';
const stats = fs.statSync(path);
if (stats.mtimeMs !== cached.timestamp) {
return 'stale';
}
return 'fresh';
}
}
安全架构
安全通过多个独立层实现:
第1层:权限系统
class PermissionEvaluator {
private ruleCache = new Map<string, CompiledRule>();
async evaluate(
tool: ToolDefinition,
input: any,
context: ToolPermissionContext
): Promise<PermissionDecision> {
// 优先级顺序评估
const scopes: PermissionRuleScope[] = [
'cliArg', // 最高优先级:命令行
'localSettings', // 项目特定覆盖
'projectSettings',// 共享项目规则
'policySettings', // 组织策略
'userSettings' // 最低优先级:用户偏好
];
for (const scope of scopes) {
const decision = await this.evaluateScope(
tool, input, context, scope
);
if (decision.behavior !== 'continue') {
return decision;
}
}
// 没有匹配规则 - 询问用户
return {
behavior: 'ask',
suggestions: this.generateSuggestions(tool, input)
};
}
private compileRule(rule: string): CompiledRule {
if (this.ruleCache.has(rule)) {
return this.ruleCache.get(rule)!;
}
// 解析规则语法:ToolName(glob/pattern)
const match = rule.match(/^(\\w+)(?:\\((.+)\\))?$/);
if (!match) throw new Error(`无效规则:${rule}`);
const [, toolPattern, pathPattern] = match;
const compiled = {
toolMatcher: new RegExp(
`^${toolPattern.replace('*', '.*')}$`
),
pathMatcher: pathPattern
? picomatch(pathPattern)
: null
};
this.ruleCache.set(rule, compiled);
return compiled;
}
}
第2层:沙箱架构
// macOS沙箱实现
class MacOSSandboxManager {
generateProfile(
command: string,
restrictions: SandboxRestrictions
): string {
const profile = `
(version 1)
(deny default)
; 基础权限
(allow process-exec (literal "/bin/bash"))
(allow process-exec (literal "/usr/bin/env"))
; 文件系统访问
${restrictions.allowRead ? '(allow file-read*)' : '(deny file-read*)'}
${restrictions.allowWrite ? '(allow file-write*)' : '(deny file-write*)'}
; 网络访问
${restrictions.allowNetwork ? `
(allow network-outbound)
(allow network-inbound)
` : `
(deny network*)
`}
; 系统操作
(allow sysctl-read)
(allow mach-lookup)
; 临时文件
(allow file-write* (subpath "/tmp"))
(allow file-write* (subpath "/var/tmp"))
`.trim();
return profile;
}
async executeSandboxed(
command: string,
profile: string
): Promise<ExecutionResult> {
// 将配置文件写入临时文件
const profilePath = await this.writeTemporaryProfile(profile);
try {
// 使用sandbox-exec执行
const result = await exec(
`sandbox-exec -p '${profilePath}' ${command}`
);
return result;
} finally {
// 清理
await fs.unlink(profilePath);
}
}
}
第3层:路径验证
class PathValidator {
private boundaries: Set<string>;
private deniedPatterns: RegExp[];
constructor(context: SecurityContext) {
this.boundaries = new Set([
context.projectRoot,
...context.additionalWorkingDirectories
]);
this.deniedPatterns = [
/\\/\\.(ssh|gnupg)\\//, // SSH/GPG密钥
/\\/(etc|sys|proc)\\//, // 系统目录
/\\.pem$|\\.key$/, // 私钥
/\\.(env|envrc)$/ // 环境文件
];
}
validate(requestedPath: string): ValidationResult {
const absolute = path.resolve(requestedPath);
// 检查边界
const inBoundary = Array.from(this.boundaries).some(
boundary => absolute.startsWith(boundary)
);
if (!inBoundary) {
return {
allowed: false,
reason: '路径在允许目录之外'
};
}
// 检查被拒绝的模式
for (const pattern of this.deniedPatterns) {
if (pattern.test(absolute)) {
return {
allowed: false,
reason: `路径匹配被拒绝的模式:${pattern}`
};
}
}
return { allowed: true };
}
}
性能架构
ANR(应用程序无响应)检测
ANR系统使用工作线程监控主事件循环:
// 工作线程脚本(嵌入为base64)
const anrWorkerScript = `
const { parentPort } = require('worker_threads');
let config = { anrThreshold: 5000, captureStackTrace: false };
let lastPing = Date.now();
let anrTimer = null;
function checkANR() {
const elapsed = Date.now() - lastPing;
if (elapsed > config.anrThreshold) {
// 主线程没有响应
parentPort.postMessage({
type: 'anr',
payload: {
elapsed,
stackTrace: config.captureStackTrace
? captureMainThreadStack()
: null
}
});
}
// 安排下次检查
anrTimer = setTimeout(checkANR, 100);
}
async function captureMainThreadStack() {
// 如果可用,使用检查器协议
try {
const { Session } = require('inspector');
const session = new Session();
session.connect();
const { result } = await session.post('Debugger.enable');
const stack = await session.post('Debugger.getStackTrace');
session.disconnect();
return stack;
} catch (e) {
return null;
}
}
parentPort.on('message', (msg) => {
if (msg.type === 'config') {
config = msg.payload;
lastPing = Date.now();
checkANR(); // 开始监控
} else if (msg.type === 'ping') {
lastPing = Date.now();
}
});
`;
// 主线程ANR集成
class ANRMonitor {
private worker: Worker;
private pingInterval: NodeJS.Timer;
constructor(options: ANROptions = {}) {
// 从嵌入脚本创建工作线程
this.worker = new Worker(anrWorkerScript, { eval: true });
// 配置工作线程
this.worker.postMessage({
type: 'config',
payload: {
anrThreshold: options.threshold || 5000,
captureStackTrace: options.captureStackTrace !== false
}
});
// 开始心跳
this.pingInterval = setInterval(() => {
this.worker.postMessage({ type: 'ping' });
}, options.pollInterval || 50);
// 处理ANR检测
this.worker.on('message', (msg) => {
if (msg.type === 'anr') {
this.handleANR(msg.payload);
}
});
}
private handleANR(data: ANRData) {
// 记录到遥测
Sentry.captureException(new Error(
`应用程序无响应${data.elapsed}毫秒`
), {
extra: {
stackTrace: data.stackTrace,
eventLoopDelay: this.getEventLoopDelay()
}
});
}
}
战略性缓存层
class CacheArchitecture {
// L1:内存缓存
private schemaCache = new LRUCache<string, JSONSchema>(100);
private patternCache = new LRUCache<string, CompiledPattern>(500);
private gitContextCache = new TTLCache<string, GitContext>(30_000); // 30秒TTL
// L2:弱引用缓存
private fileContentCache = new WeakRefCache<FileContent>();
// L3:磁盘缓存
private diskCache = new DiskCache('.claude/cache');
async get<T>(
key: string,
generator: () => Promise<T>,
options: CacheOptions = {}
): Promise<T> {
// 检查L1
if (this.schemaCache.has(key)) {
return this.schemaCache.get(key) as T;
}
// 检查L2
const weakRef = this.fileContentCache.get(key);
if (weakRef) {
return weakRef as T;
}
// 检查L3
if (options.persistent) {
const diskValue = await this.diskCache.get(key);
if (diskValue) {
return diskValue;
}
}
// 生成并缓存
const value = await generator();
// 存储在适当的缓存中
if (options.weak) {
this.fileContentCache.set(key, value);
} else if (options.persistent) {
await this.diskCache.set(key, value, options.ttl);
} else {
this.schemaCache.set(key, value as any);
}
return value;
}
}
遥测和可观测性设计
三支柱方法提供全面的可见性:
支柱1:错误跟踪(Sentry)
class ErrorBoundary {
static wrap<T extends (...args: any[]) => any>(
fn: T,
context: ErrorContext
): T {
return (async (...args: Parameters<T>) => {
const span = Sentry.startTransaction({
name: context.operation,
op: context.category
});
try {
const result = await fn(...args);
span.setStatus('ok');
return result;
} catch (error) {
span.setStatus('internal_error');
Sentry.captureException(error, {
contexts: {
operation: context,
state: this.captureState()
},
fingerprint: this.generateFingerprint(error, context)
});
throw error;
} finally {
span.finish();
}
}) as T;
}
private static captureState() {
return {
sessionId: SessionState.instance.sessionId,
conversationDepth: /* 当前深度 */,
activeTools: /* 当前执行中 */,
memoryUsage: process.memoryUsage(),
eventLoopDelay: this.getEventLoopDelay()
};
}
}
支柱2:指标(OpenTelemetry)
class MetricsCollector {
private meters = {
api: meter.createCounter('api_calls_total'),
tokens: meter.createHistogram('token_usage'),
tools: meter.createHistogram('tool_execution_duration'),
streaming: meter.createHistogram('streaming_latency')
};
recordApiCall(result: ApiCallResult) {
this.meters.api.add(1, {
model: result.model,
status: result.status,
provider: result.provider
});
this.meters.tokens.record(result.totalTokens, {
model: result.model,
type: 'total'
});
}
recordToolExecution(tool: string, duration: number, success: boolean) {
this.meters.tools.record(duration, {
tool,
success: String(success),
concurrent: /* 是否并行? */
});
}
}
支柱3:功能标志(Statsig)
class FeatureManager {
async checkGate(
gate: string,
context?: FeatureContext
): Promise<boolean> {
return statsig.checkGate(gate, {
userID: SessionState.instance.sessionId,
custom: {
model: context?.model,
toolsEnabled: context?.tools,
platform: process.platform
}
});
}
async getConfig<T>(
config: string,
defaultValue: T
): Promise<T> {
const dynamicConfig = statsig.getConfig(config);
return dynamicConfig.get('value', defaultValue);
}
}
资源管理
进程生命周期管理
class ProcessManager {
private processes = new Map<string, ChildProcess>();
private limits = {
maxProcesses: 10,
maxMemoryPerProcess: 512 * 1024 * 1024, // 512MB
maxTotalMemory: 2 * 1024 * 1024 * 1024 // 2GB
};
async spawn(
id: string,
command: string,
options: SpawnOptions
): Promise<ManagedProcess> {
// 检查限制
if (this.processes.size >= this.limits.maxProcesses) {
await this.killOldestProcess();
}
const child = spawn('bash', ['-c', command], {
...options,
// 资源限制
env: {
...options.env,
NODE_OPTIONS: `--max-old-space-size=${this.limits.maxMemoryPerProcess / 1024 / 1024}`
}
});
// 监控资源
const monitor = setInterval(() => {
this.checkProcessHealth(id, child);
}, 1000);
this.processes.set(id, child);
return new ManagedProcess(child, monitor);
}
private async checkProcessHealth(id: string, proc: ChildProcess) {
try {
const usage = await pidusage(proc.pid);
if (usage.memory > this.limits.maxMemoryPerProcess) {
console.warn(`进程${id}超出内存限制`);
proc.kill('SIGTERM');
}
} catch (e) {
// 进程可能已退出
this.processes.delete(id);
}
}
}
网络连接池
class NetworkPool {
private pools = new Map<string, ConnectionPool>();
getPool(provider: string): ConnectionPool {
if (!this.pools.has(provider)) {
this.pools.set(provider, new ConnectionPool({
maxConnections: provider === 'anthropic' ? 10 : 5,
maxIdleTime: 30_000,
keepAlive: true
}));
}
return this.pools.get(provider)!;
}
async request(
provider: string,
options: RequestOptions
): Promise<Response> {
const pool = this.getPool(provider);
const connection = await pool.acquire();
try {
return await connection.request(options);
} finally {
pool.release(connection);
}
}
}
本架构分析基于逆向工程和反编译。实际实现可能有所不同。所呈现的模式代表了基于可观察行为和高性能Node.js应用程序常见实践推断出的架构决策。
文件总结
概述
本文档深入分析了Claude Code的核心架构,揭示了其高性能响应背后的精密设计。通过反编译和逆向工程分析,文档详细展示了Claude Code如何通过分层架构、流式处理、智能状态管理等技术实现卓越的用户体验。
核心架构特点
1. tt控制循环:系统的心脏
- 六阶段处理流程:
- 回合初始化和上下文准备(10-50ms token计数,2000-3000ms LLM压缩)
- 动态系统提示组装(并行获取上下文源)
- LLM流初始化(异步生成器设置)
- 流事件处理状态机(实时事件驱动处理)
- 工具执行编排(智能批量处理)
- 递归控制(尾递归,深度限制10轮)
- 性能特征:首个token延迟300-800ms,token吞吐量50-100/秒,恒定内存使用
2. 分层架构设计
- 第1层:用户界面:React组件 + Ink渲染器 + Yoga布局引擎
- 第2层:代理核心:tt控制循环 + 上下文组装 + 权限系统 + 会话状态
- 第3层:LLM交互:流处理器 + 重试逻辑 + Token计数器
- 第4层:工具系统:工具执行器 + 输入验证器 + 沙箱管理器
- 第5层:基础设施:文件系统 + 进程管理器 + 网络客户端 + 遥测
3. 通信模式
- 向下通信:直接函数调用
- 向上通信:事件和回调
- 跨层通信:共享上下文对象
事件驱动和流式架构
流背压管理
- 压力阈值:高压力1000事件(丢弃非关键),临界压力5000事件(仅保留错误)
- 批处理策略:100个事件为一批,让出事件循环
- 智能过滤:文本增量可过滤,结构事件保留
进度事件聚合
- 多操作协调:并发进度报告的统一管理
- 竞态机制:Promise.race获取下一个事件
- 超时保护:100ms超时防止挂起
状态管理架构
全局会话状态
- 单例模式:直接突变方法,异步磁盘持久化
- 令牌跟踪:详细的模型使用统计
- 防抖写入:1秒延迟避免过度I/O
文件状态管理
- 弱引用缓存:WeakRef + FinalizationRegistry自动清理
- 新鲜度检查:基于修改时间的缓存失效
- 内存安全:防止悬挂引用和内存泄漏
安全架构
三层防护体系
权限系统:
- 五级优先级评估(CLI参数 > 本地设置 > 项目设置 > 策略设置 > 用户设置)
- 规则编译缓存,JIT优化
- 智能建议生成
沙箱架构:
- macOS专用sandbox-exec集成
- 动态配置文件生成
- 文件系统、网络、系统操作精细控制
路径验证:
- 边界检查(项目根目录 + 额外工作目录)
- 敏感模式拒绝(SSH/GPG密钥、系统目录、私钥文件)
- 实时验证结果
性能架构
ANR检测系统
- 工作线程监控:独立线程监控主事件循环
- 5秒阈值检测:超时自动触发堆栈捕获
- 检查器协议集成:使用inspector模块获取调用栈
- 遥测报告:Sentry集成自动上报
三级缓存架构
- L1缓存:内存缓存(LRU 100个schema,TTL 30秒git上下文)
- L2缓存:弱引用缓存(文件内容自动垃圾回收)
- L3缓存:磁盘缓存(’.claude/cache’持久化存储)
遥测和可观测性
三支柱方法
错误跟踪(Sentry):
- 事务边界包装
- 状态捕获(会话ID、对话深度、内存使用)
- 指纹生成和错误分类
指标收集(OpenTelemetry):
- API调用计数器
- Token使用直方图
- 工具执行持续时间
- 流延迟监控
功能标志(Statsig):
- 动态特性开关
- 配置管理
- A/B测试支持
资源管理
进程生命周期管理
- 资源限制:最大10个进程,每进程512MB,总计2GB
- 健康监控:1秒间隔检查进程内存使用
- 自动清理:超限进程自动终止
- NODE_OPTIONS控制:动态设置内存限制
网络连接池
- 提供商隔离:Anthropic 10连接,其他5连接
- 连接复用:30秒空闲时间,keep-alive支持
- 获取释放:自动连接管理
技术创新点
架构创新
- 异步生成器管道:tt函数的流式处理架构
- 分层权限系统:多维度安全控制机制
- 事件驱动设计:响应式状态更新
- 智能缓存策略:多层次缓存优化
性能创新
- ANR检测机制:工作线程监控主线程
- 背压控制系统:防止内存溢出的流控制
- 弱引用缓存:自动内存管理
- 连接池优化:网络资源高效利用
工程实践
- 模块化设计:清晰的职责分离
- 错误边界处理:全面的异常捕获和报告
- 资源监控:实时性能和资源使用追踪
- 可观测性:三支柱监控体系
结论
Claude Code的架构体现了现代软件工程的最佳实践,通过精密的分层设计和创新的技术实现,构建了高性能、高可靠性的AI开发工具。其核心价值在于:
- 卓越性能:多层次的优化策略确保快速响应
- 可靠安全:三层防护体系保障系统安全
- 可扩展性:模块化架构支持功能扩展
- 可观测性:全面的监控体系确保运维效率
这种架构设计为现代AI工具的开发提供了优秀的参考模板,特别是在需要处理复杂流式数据、保证系统响应性和维护高度安全性的场景中。文档的深入分析为理解高性能AI系统的架构设计提供了宝贵的技术参考。