目录

Claude Agent SDK 完整教程:工具系统、代理循环与可控执行实战

Claude Agent SDK 的目标是把"模型推理"升级为"可控执行"。它不仅能生成文本,更能在可审计、可约束的工具体系内完成任务,从而让 Agent 真正参与工程流程。本文将带你从环境搭建到生产实战,全面掌握 Claude Agent SDK。

什么是 Claude Agent SDK?

Claude Agent SDK 是 Anthropic 推出的企业级 Agent 开发框架,它将 Claude 的语言能力与工具执行系统深度结合,让 AI 能够:

  • 自主调用文件系统、数据库、API 等外部工具
  • 在多轮对话中保持上下文并规划复杂任务
  • 通过检查点机制实现长流程任务的断点续传
  • 在严格的权限边界内执行操作,确保安全可控

与直接调用 Claude API 的区别:

特性Claude APIClaude Agent SDK
执行模式单轮问答多轮代理循环
工具调用手动解析 function calling自动化工具编排
上下文管理需手动拼接内置上下文管理器
权限控制工具策略 + 路径白名单
可审计性依赖应用层内置工具调用日志
适用场景聊天、内容生成代码重构、数据处理、自动化运维

核心概念速览

代理循环 (Agent Loop)

SDK 内置的多轮执行引擎,负责"理解目标 → 规划 → 调用工具 → 验证结果 → 继续迭代"的闭环。

用户输入
  ↓
Claude 分析任务并规划步骤
  ↓
调用工具 (读文件/执行命令/搜索)
  ↓
获取工具输出并更新上下文
  ↓
判断:任务完成?
  └─ 否 → 继续下一轮循环
  └─ 是 → 返回最终结果

工具 (Tool)

Agent 可调用的操作能力,通常包括文件系统、命令执行、检索、网络请求等。每个工具需要清晰的输入 schema 与权限边界。

标准工具示例

{
  name: 'read_file',
  description: '读取文本文件内容',
  inputSchema: {
    type: 'object',
    properties: {
      path: { type: 'string', description: '文件路径' }
    },
    required: ['path']
  },
  execute: async ({ path }) => {
    return { content: fs.readFileSync(path, 'utf-8') };
  }
}

工具策略 (Tool Policy)

限制 Agent 能用哪些工具、能访问哪些路径、能执行哪些命令。是控制风险的第一道闸。

子代理 (Sub-Agent)

面向复杂任务的并行/隔离机制。主代理可委派任务给子代理,减少上下文干扰并提升吞吐。

检查点 (Checkpoint)

长流程任务的断点与回滚机制,便于恢复与重放。

环境准备与快速开始

前置要求

  • Node.js 18+Bun 运行时
  • Git(用于工作区管理与可追溯变更)
  • Anthropic API Key获取地址

安装 SDK

# 使用 npm
npm install @anthropic-ai/sdk

# 或使用 bun(更快)
bun add @anthropic-ai/sdk

# 验证安装
node -e "console.log(require('@anthropic-ai/sdk').VERSION)"

配置 API Key

# 创建 .env 文件
echo "ANTHROPIC_API_KEY=your_api_key_here" > .env

# 或设置环境变量
export ANTHROPIC_API_KEY="your_api_key_here"

完整实战案例 1:文件摘要 Agent

让我们从一个最小可运行示例开始,逐步理解 Agent 的工作原理。

场景需求

创建一个 Agent,能够读取项目中的多个 Markdown 文件并生成内容摘要。

步骤 1:定义工具

首先定义 Agent 需要的工具:

// tools.ts
import fs from 'fs';
import path from 'path';

export const readFileTool = {
  name: 'read_file',
  description: '读取文本文件内容。只能读取 .md, .txt, .json 格式的文件。',
  inputSchema: {
    type: 'object',
    properties: {
      path: { 
        type: 'string', 
        description: '相对于项目根目录的文件路径' 
      }
    },
    required: ['path']
  },
  execute: async ({ path: filePath }: { path: string }) => {
    const allowedExtensions = ['.md', '.txt', '.json'];
    const ext = path.extname(filePath);
    
    if (!allowedExtensions.includes(ext)) {
      throw new Error(`不支持的文件类型: ${ext}`);
    }
    
    const fullPath = path.resolve(process.cwd(), filePath);
    const content = fs.readFileSync(fullPath, 'utf-8');
    
    return { 
      content, 
      size: content.length,
      lines: content.split('\n').length
    };
  }
};

export const listFilesTool = {
  name: 'list_files',
  description: '列出指定目录下的所有文件',
  inputSchema: {
    type: 'object',
    properties: {
      directory: { 
        type: 'string', 
        description: '目录路径' 
      },
      extension: {
        type: 'string',
        description: '文件扩展名筛选(如 .md)'
      }
    },
    required: ['directory']
  },
  execute: async ({ directory, extension }: { directory: string; extension?: string }) => {
    const dirPath = path.resolve(process.cwd(), directory);
    const files = fs.readdirSync(dirPath);
    
    const filtered = extension
      ? files.filter(f => f.endsWith(extension))
      : files;
    
    return { files: filtered, count: filtered.length };
  }
};

步骤 2:创建 Agent

// agent.ts
import { Anthropic } from '@anthropic-ai/sdk';
import { readFileTool, listFilesTool } from './tools';

const client = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY
});

async function summarizeMarkdownFiles(directory: string) {
  const messages = [
    {
      role: 'user' as const,
      content: `请帮我分析 ${directory} 目录下的所有 Markdown 文件,并生成一份摘要报告。报告应包含:
      1. 文件列表
      2. 每个文件的核心内容(1-2 句话)
      3. 整体主题分析`
    }
  ];

  let continueLoop = true;
  const tools = [readFileTool, listFilesTool];
  
  while (continueLoop) {
    const response = await client.messages.create({
      model: 'claude-3-5-sonnet-20241022',
      max_tokens: 4096,
      tools,
      messages
    });

    console.log('Claude 回复:', response.content);

    // 检查是否有工具调用
    const toolUseBlock = response.content.find(
      block => block.type === 'tool_use'
    );

    if (toolUseBlock && toolUseBlock.type === 'tool_use') {
      // 执行工具
      const tool = tools.find(t => t.name === toolUseBlock.name);
      if (!tool) {
        throw new Error(`未找到工具: ${toolUseBlock.name}`);
      }

      console.log(`\n执行工具: ${tool.name}`, toolUseBlock.input);
      
      const result = await tool.execute(toolUseBlock.input as any);
      
      console.log('工具输出:', JSON.stringify(result, null, 2));

      // 将工具结果添加到对话
      messages.push({
        role: 'assistant',
        content: response.content
      });
      
      messages.push({
        role: 'user' as const,
        content: [
          {
            type: 'tool_result',
            tool_use_id: toolUseBlock.id,
            content: JSON.stringify(result)
          }
        ]
      });
    } else {
      // 没有工具调用,任务完成
      continueLoop = false;
      
      const textBlock = response.content.find(block => block.type === 'text');
      if (textBlock && textBlock.type === 'text') {
        console.log('\n\n=== 最终摘要 ===\n');
        console.log(textBlock.text);
      }
    }
  }
}

// 运行
summarizeMarkdownFiles('./docs')
  .then(() => console.log('\n任务完成'))
  .catch(err => console.error('错误:', err));

步骤 3:运行与测试

# 创建测试文档
mkdir docs
echo "# 文档1\n这是测试内容。" > docs/file1.md
echo "# 文档2\n另一个测试。" > docs/file2.md

# 运行 Agent
npx ts-node agent.ts

预期输出

执行工具: list_files { directory: './docs', extension: '.md' }
工具输出: { files: ['file1.md', 'file2.md'], count: 2 }

执行工具: read_file { path: 'docs/file1.md' }
工具输出: { content: '# 文档1\n这是测试内容。', size: 20, lines: 2 }

执行工具: read_file { path: 'docs/file2.md' }
工具输出: { content: '# 文档2\n另一个测试。', size: 18, lines: 2 }

=== 最终摘要 ===

## 文件摘要报告

### 文件列表
1. file1.md (20 字符, 2 行)
2. file2.md (18 字符, 2 行)

### 内容概要
- **file1.md**: 包含基础测试内容的标题文档
- **file2.md**: 简短的测试文档

### 整体分析
这些文档似乎是用于测试目的的简单示例文件。

任务完成

完整实战案例 2:代码审查 Agent

场景需求

构建一个自动代码审查工具,检查 Python 代码的:

  • 代码风格问题
  • 潜在 bug
  • 性能优化建议
  • 安全漏洞

实现代码

// code-review-agent.ts
import { Anthropic } from '@anthropic-ai/sdk';
import fs from 'fs';
import { exec } from 'child_process';
import { promisify } from 'util';

const execAsync = promisify(exec);

const tools = [
  {
    name: 'read_python_file',
    description: '读取 Python 文件内容进行分析',
    inputSchema: {
      type: 'object',
      properties: {
        path: { type: 'string' }
      },
      required: ['path']
    },
    execute: async ({ path }: { path: string }) => {
      const content = fs.readFileSync(path, 'utf-8');
      const lines = content.split('\n').length;
      return { content, lines };
    }
  },
  {
    name: 'run_pylint',
    description: '运行 pylint 静态代码检查工具',
    inputSchema: {
      type: 'object',
      properties: {
        path: { type: 'string' }
      },
      required: ['path']
    },
    execute: async ({ path }: { path: string }) => {
      try {
        const { stdout } = await execAsync(`pylint ${path} --output-format=json`);
        return { result: JSON.parse(stdout) };
      } catch (error: any) {
        // Pylint 返回非零退出码时
        return { result: JSON.parse(error.stdout || '[]') };
      }
    }
  }
];

async function reviewPythonCode(filePath: string) {
  const client = new Anthropic({
    apiKey: process.env.ANTHROPIC_API_KEY!
  });

  const messages = [
    {
      role: 'user' as const,
      content: `请审查 Python 文件 ${filePath},重点关注:
      1. 代码风格和可读性
      2. 潜在的 bug 和逻辑错误
      3. 性能优化建议
      4. 安全漏洞(SQL 注入、XSS 等)
      
      请先运行 pylint 工具,然后结合工具输出和代码内容进行人工审查。
      最后生成一份结构化的审查报告。`
    }
  ];

  let continueLoop = true;

  while (continueLoop) {
    const response = await client.messages.create({
      model: 'claude-3-5-sonnet-20241022',
      max_tokens: 8192,
      tools,
      messages
    });

    const toolUseBlocks = response.content.filter(block => block.type === 'tool_use');

    if (toolUseBlocks.length > 0) {
      messages.push({ role: 'assistant', content: response.content });

      const toolResults = [];
      for (const toolUse of toolUseBlocks) {
        if (toolUse.type !== 'tool_use') continue;

        const tool = tools.find(t => t.name === toolUse.name);
        const result = await tool!.execute(toolUse.input as any);
        
        toolResults.push({
          type: 'tool_result' as const,
          tool_use_id: toolUse.id,
          content: JSON.stringify(result)
        });
      }

      messages.push({ role: 'user' as const, content: toolResults });
    } else {
      continueLoop = false;
      const textBlock = response.content.find(block => block.type === 'text');
      if (textBlock && textBlock.type === 'text') {
        console.log(textBlock.text);
      }
    }
  }
}

// 使用示例
reviewPythonCode('./example.py');

工具策略与安全配置

定义工具策略

interface ToolPolicy {
  allowedTools: string[];        // 允许的工具名称白名单
  allowedPaths: string[];        // 允许访问的路径白名单
  allowedCommands: string[];     // 允许执行的命令白名单
  confirmRequired: string[];     // 需要用户确认的工具
  maxFileSize: number;           // 文件大小限制(字节)
  timeout: number;               // 工具执行超时(毫秒)
}

const productionPolicy: ToolPolicy = {
  allowedTools: ['read_file', 'list_files', 'grep'],
  allowedPaths: ['./src', './docs', './tests'],
  allowedCommands: ['git', 'npm', 'node'],
  confirmRequired: ['write_file', 'delete_file', 'exec'],
  maxFileSize: 1024 * 1024 * 5,  // 5MB
  timeout: 30000  // 30秒
};

应用策略

function applyToolPolicy(tool: any, policy: ToolPolicy) {
  return {
    ...tool,
    execute: async (input: any) => {
      // 1. 检查工具是否在白名单
      if (!policy.allowedTools.includes(tool.name)) {
        throw new Error(`工具 ${tool.name} 未在白名单中`);
      }

      // 2. 路径检查
      if (input.path) {
        const isAllowed = policy.allowedPaths.some(
          allowed => input.path.startsWith(allowed)
        );
        if (!isAllowed) {
          throw new Error(`路径 ${input.path} 不在允许范围内`);
        }
      }

      // 3. 文件大小检查
      if (input.path && fs.existsSync(input.path)) {
        const stats = fs.statSync(input.path);
        if (stats.size > policy.maxFileSize) {
          throw new Error(`文件过大: ${stats.size} 字节`);
        }
      }

      // 4. 超时控制
      const timeoutPromise = new Promise((_, reject) =>
        setTimeout(() => reject(new Error('工具执行超时')), policy.timeout)
      );

      return Promise.race([
        tool.execute(input),
        timeoutPromise
      ]);
    }
  };
}

// 使用
const safeTools = tools.map(t => applyToolPolicy(t, productionPolicy));

故障排查与常见问题

问题 1:Agent 不调用工具

症状: Agent 直接返回文本,而不是调用工具。

可能原因:

  1. 工具描述不清晰
  2. inputSchema 过于复杂
  3. 任务描述与工具能力不匹配

解决方案:

// ❌ 糟糕的工具描述
{
  name: 'file_tool',
  description: '文件操作'  // 太模糊
}

// ✅ 优秀的工具描述
{
  name: 'read_file',
  description: '读取文本文件的完整内容。支持 .txt, .md, .json, .yaml 格式。返回文件内容字符串及元信息(行数、大小)。'
}

调试技巧:

// 在提示词中明确要求使用工具
const messages = [{
  role: 'user',
  content: `请使用 read_file 工具读取 ${filePath},不要猜测内容。`
}];

问题 2:长任务中途失败无法恢复

症状: 处理大量文件时,某个环节出错导致前功尽弃。

解决方案: 实现检查点机制

interface Checkpoint {
  step: number;
  processedFiles: string[];
  results: any[];
  timestamp: number;
}

async function processWithCheckpoint(files: string[]) {
  const checkpointFile = './checkpoint.json';
  
  // 恢复检查点
  let checkpoint: Checkpoint = fs.existsSync(checkpointFile)
    ? JSON.parse(fs.readFileSync(checkpointFile, 'utf-8'))
    : { step: 0, processedFiles: [], results: [], timestamp: Date.now() };

  for (let i = checkpoint.step; i < files.length; i++) {
    try {
      const result = await processFile(files[i]);
      checkpoint.processedFiles.push(files[i]);
      checkpoint.results.push(result);
      checkpoint.step = i + 1;
      
      // 每处理 5 个文件保存一次
      if (i % 5 === 0) {
        fs.writeFileSync(checkpointFile, JSON.stringify(checkpoint));
      }
    } catch (error) {
      console.error(`处理 ${files[i]} 失败:`, error);
      fs.writeFileSync(checkpointFile, JSON.stringify(checkpoint));
      throw error;
    }
  }

  // 完成后删除检查点
  fs.unlinkSync(checkpointFile);
  return checkpoint.results;
}

问题 3:Agent 输出质量不稳定

症状: 同样的任务,多次运行结果差异很大。

解决方案: 增加验证步骤

async function runWithVerification(task: string) {
  const messages = [
    { role: 'user' as const, content: task }
  ];

  let result;
  let verified = false;
  let attempts = 0;

  while (!verified && attempts < 3) {
    result = await runAgent(messages);
    
    // 验证输出
    if (result.includes('TODO') || result.length < 100) {
      messages.push({
        role: 'user' as const,
        content: '输出不完整,请提供更详细的分析。'
      });
      attempts++;
    } else {
      verified = true;
    }
  }

  return result;
}

问题 4:API 调用成本过高

症状: Token 消耗远超预期。

优化建议:

// 1. 限制上下文长度
function trimContext(messages: any[], maxTokens = 8000) {
  // 保留最近 N 轮对话
  return messages.slice(-10);
}

// 2. 压缩工具输出
function compressToolResult(result: any) {
  if (typeof result === 'string' && result.length > 2000) {
    return result.slice(0, 1000) + '\n...[省略 ' + (result.length - 1000) + ' 字符]...';
  }
  return result;
}

// 3. 使用更小的模型处理简单任务
const model = task.complexity === 'high' 
  ? 'claude-3-5-sonnet-20241022'
  : 'claude-3-haiku-20240307';

常见问题 (FAQ)

Q1: Agent SDK 与 LangChain/AutoGPT 的区别?

A: 主要区别在于设计哲学和适用场景:

特性Claude Agent SDKLangChainAutoGPT
设计目标可控、可审计的企业级执行通用 LLM 应用开发框架完全自主的 AGI 探索
控制粒度细粒度工具策略中等粗粒度
适用场景代码重构、数据处理、运维聊天机器人、知识检索开放式任务探索
学习曲线中等陡峭简单

Q2: 如何处理需要用户交互的场景?

A: 实现双向通信机制:

async function interactiveAgent() {
  const readline = require('readline').createInterface({
    input: process.stdin,
    output: process.stdout
  });

  const askUser = (question: string): Promise<string> => {
    return new Promise(resolve => {
      readline.question(question, resolve);
    });
  };

  // 在工具中调用
  const confirmTool = {
    name: 'confirm_action',
    description: '请求用户确认敏感操作',
    inputSchema: {
      type: 'object',
      properties: {
        action: { type: 'string' },
        details: { type: 'string' }
      }
    },
    execute: async ({ action, details }: any) => {
      const answer = await askUser(
        `\n确认执行: ${action}?\n详情: ${details}\n(y/n): `
      );
      return { confirmed: answer.toLowerCase() === 'y' };
    }
  };
}

Q3: 如何监控 Agent 的执行过程?

A: 实现结构化日志:

class AgentLogger {
  private logs: any[] = [];

  logToolCall(toolName: string, input: any, output: any, duration: number) {
    this.logs.push({
      type: 'tool_call',
      toolName,
      input,
      output,
      duration,
      timestamp: Date.now()
    });
  }

  logError(error: Error, context: any) {
    this.logs.push({
      type: 'error',
      message: error.message,
      stack: error.stack,
      context,
      timestamp: Date.now()
    });
  }

  exportMetrics() {
    return {
      totalToolCalls: this.logs.filter(l => l.type === 'tool_call').length,
      averageDuration: this.logs
        .filter(l => l.type === 'tool_call')
        .reduce((sum, l) => sum + l.duration, 0) / this.logs.length,
      errors: this.logs.filter(l => l.type === 'error').length
    };
  }
}

Q4: 支持哪些编程语言?

A: SDK 本身是 TypeScript/JavaScript,但 Agent 可以处理任何语言的代码:

  • 直接支持: TypeScript, JavaScript, Python, Go, Rust
  • 通过工具支持: 任何有 CLI 工具的语言(Java/javac, C++/gcc)

Q5: 如何实现并发任务处理?

A: 使用子代理模式:

async function parallelProcessing(tasks: string[]) {
  const subAgents = tasks.map(task => 
    createSubAgent({ 
      task, 
      maxTokens: 2000,
      isolatedContext: true 
    })
  );

  const results = await Promise.all(
    subAgents.map(agent => agent.run())
  );

  return aggregateResults(results);
}

相关资源与延伸阅读

Claude 生态系统

工具与集成

官方资源

总结

Claude Agent SDK 的价值不在"写得更像人",而在"执行得更像工程"。从最小可运行开始,逐步增加工具、约束与验证机制,你可以构建可审计、可控、可扩展的智能体系统。

关键要点回顾

  • ✅ 工具描述要清晰具体,包含输入输出示例
  • ✅ 使用工具策略控制权限边界(路径、命令、大小)
  • ✅ 实现检查点机制处理长流程任务
  • ✅ 添加验证步骤确保输出质量
  • ✅ 结构化日志便于监控和调试
  • ✅ 先用只读工具验证,再逐步开放写入工具

记住:Agent 不是银弹,它是放大器。好的工具设计 + 清晰的任务边界 + 适当的人工监督 = 可靠的自动化系统。

立即开始构建你的第一个 Agent 吧!