【斯坦福 AI 编程课 08】多 Agent 协作——从”单打独斗”到”团队作战”

一个 Agent 再强大,也干不过一个团队。就像软件开发中,前端、后端、测试、运维各司其职,多 Agent 协作能让复杂任务分解成专业化的子任务,每个 Agent 只做自己最擅长的事。

为什么需要多 Agent 协作?

单 Agent 的局限性

问题1:能力冲突

  • 写代码的 Agent 不懂测试
  • 分析数据的 Agent 不会写文档
  • 做客服的 Agent 不懂数据库

问题2:注意力分散

  • 一个 Agent 处理多个任务容易"精神分裂"
  • 上下文切换成本高
  • 容易遗忘之前的工作

问题3:无法并行

  • 单 Agent 只能串行处理
  • 无法同时进行多项工作
  • 效率瓶颈

多 Agent 的优势

专业化分工

  • Planner Agent:制定计划
  • Coder Agent:写代码
  • Reviewer Agent:代码审查
  • Tester Agent:测试验证
  • Doc Writer:写文档

并行处理

时间轴:
0-5分钟:  Planner 制定计划
5-10分钟: Coder 写代码 || Tester 准备测试用例
10-15分钟:Reviewer 审查代码 || Tester 执行测试
15-20分钟:Doc Writer 写文档 || 所有 Agent 协作修复问题

核心架构

1. 中心化架构(Hub-and-Spoke)

一个"领导"协调多个"员工":

class OrchestratorAgent:
    def __init__(self):
        self.specialists = {
            'coder': CoderAgent(),
            'reviewer': ReviewerAgent(),
            'tester': TesterAgent(),
            'docwriter': DocWriterAgent()
        }

    def delegate(self, task):
        """分配任务"""
        # 1. 分析任务类型
        task_type = self.classify_task(task)

        # 2. 选择合适的专家
        specialist = self.specialists[task_type]

        # 3. 委托执行
        result = specialist.execute(task)

        # 4. 收集结果
        return result

    def coordinate(self, complex_task):
        """协调复杂任务"""
        # 1. 分解任务
        subtasks = self.decompose(complex_task)

        # 2. 分配给不同专家
        results = []
        for subtask in subtasks:
            result = self.delegate(subtask)
            results.append(result)

        # 3. 整合结果
        final_result = self.integrate(results)

        return final_result

优点:结构清晰,易于管理
缺点:中心节点成为瓶颈

2. 对等架构(Peer-to-Peer)

所有 Agent 平等协作:

class PeerAgent:
    def __init__(self, name, specialty):
        self.name = name
        self.specialty = specialty
        self.peers = []  # 其他 Agent

    def broadcast(self, message):
        """广播消息给所有同伴"""
        for peer in self.peers:
            peer.receive(message, sender=self)

    def receive(self, message, sender):
        """接收消息"""
        if self.can_help(message):
            # 能帮忙就处理
            result = self.process(message)
            sender.receive(result, sender=self)
        else:
            # 不能帮忙就转发
            self.forward(message, exclude=sender)

    def collaborate(self, task):
        """协作完成任务"""
        # 1. 先自己尝试
        if self.specialty == task.type:
            return self.execute(task)

        # 2. 不擅长就求助
        self.broadcast({
            'type': 'help_request',
            'task': task,
            'sender': self.name
        })

优点:无单点故障,弹性好
缺点:协调复杂,容易混乱

3. 黑板架构(Blackboard)

共享工作空间,所有 Agent 都能看到:

class Blackboard:
    def __init__(self):
        self.data = {}
        self.agents = []

    def write(self, key, value, agent):
        """写入黑板"""
        self.data[key] = {
            'value': value,
            'author': agent.name,
            'timestamp': datetime.now()
        }

        # 通知所有 Agent
        self.notify_all(key, value)

    def read(self, key):
        """读取黑板"""
        return self.data.get(key)

    def subscribe(self, agent):
        """订阅黑板更新"""
        self.agents.append(agent)

    def notify_all(self, key, value):
        """通知所有订阅者"""
        for agent in self.agents:
            agent.on_blackboard_update(key, value)

# 使用示例
blackboard = Blackboard()

# Coder 写入代码
blackboard.write('code', 'def hello(): ...', coder_agent)

# Reviewer 自动收到通知并审查
# Tester 自动收到通知并准备测试

优点:解耦,灵活
缺点:一致性管理困难

实战案例:3个 Agent 协作开发功能

让我们实现一个完整的开发流程:Planner → Coder → Tester

步骤1:定义 Agent 角色

class PlannerAgent:
    """规划专家"""

    def plan(self, requirement):
        """制定开发计划"""
        prompt = f"""
        分析需求,制定开发计划:

        需求:{requirement}

        输出格式:
        1. 功能拆解(不超过5个子任务)
        2. 技术选型
        3. 预计时间
        4. 依赖关系
        """

        response = call_llm(prompt)
        return self.parse_plan(response)

class CoderAgent:
    """编码专家"""

    def code(self, plan):
        """根据计划写代码"""
        code_parts = []

        for task in plan.subtasks:
            prompt = f"""
            实现以下功能:
            {task.description}

            技术栈:{plan.tech_stack}

            要求:
            - 代码简洁清晰
            - 添加必要注释
            - 遵循最佳实践
            """

            code = call_llm(prompt)
            code_parts.append({
                'task': task.name,
                'code': code
            })

        return code_parts

class TesterAgent:
    """测试专家"""

    def test(self, code_parts):
        """测试代码"""
        test_results = []

        for part in code_parts:
            # 1. 生成测试用例
            test_cases = self.generate_tests(part)

            # 2. 执行测试(模拟)
            result = self.run_tests(part['code'], test_cases)

            test_results.append({
                'task': part['task'],
                'passed': result.passed,
                'failed': result.failed,
                'coverage': result.coverage
            })

        return test_results

步骤2:协调器实现

class DevelopmentOrchestrator:
    """开发协调器"""

    def __init__(self):
        self.planner = PlannerAgent()
        self.coder = CoderAgent()
        self.tester = TesterAgent()

    def develop(self, requirement):
        """完整开发流程"""
        print("📝 阶段1:规划")
        plan = self.planner.plan(requirement)
        print(f"   - 拆分为 {len(plan.subtasks)} 个子任务")

        print("💻 阶段2:编码")
        code = self.coder.code(plan)
        print(f"   - 生成了 {len(code)} 个代码块")

        print("🧪 阶段3:测试")
        test_results = self.tester.test(code)
        print(f"   - 通过 {sum(r['passed'] for r in test_results)} 个测试")

        # 如果有失败,循环修复
        iteration = 1
        while any(r['failed'] > 0 for r in test_results):
            print(f"🔧 修复迭代 {iteration}")

            # 找到失败的代码
            failed_parts = [
                code[i] for i, r in enumerate(test_results) 
                if r['failed'] > 0
            ]

            # 重新编码
            fixed_code = self.coder.fix(failed_parts, test_results)

            # 重新测试
            test_results = self.tester.test(fixed_code)
            iteration += 1

            if iteration > 3:  # 最多修复3次
                print("⚠️  达到最大修复次数,需要人工介入")
                break

        print("✅ 开发完成")
        return {
            'plan': plan,
            'code': code,
            'test_results': test_results
        }

步骤3:实际运行

# 使用示例
orchestrator = DevelopmentOrchestrator()

result = orchestrator.develop(
    requirement="实现一个用户注册功能,包括邮箱验证和密码加密"
)

print("\n📊 最终结果:")
print(f"代码行数:{sum(len(c['code'].split('\\n')) for c in result['code'])}")
print(f"测试覆盖率:{sum(r['coverage'] for r in result['test_results'])/len(result['test_results']):.1%}")

输出示例

📝 阶段1:规划
   - 拆分为 3 个子任务
💻 阶段2:编码
   - 生成了 3 个代码块
🧪 阶段3:测试
   - 通过 8 个测试
🔧 修复迭代 1
✅ 开发完成

📊 最终结果:
代码行数:85
测试覆盖率:92.3%

通信协议

1. 直接消息传递

class Message:
    def __init__(self, sender, receiver, content, msg_type='task'):
        self.sender = sender
        self.receiver = receiver
        self.content = content
        self.type = msg_type  # task, result, feedback, query
        self.timestamp = datetime.now()

# Agent 发送消息
coder.send_message(
    receiver='reviewer',
    content={'code': '...'},
    msg_type='task'
)

2. 发布-订阅模式

class EventBus:
    def __init__(self):
        self.subscribers = defaultdict(list)

    def subscribe(self, event_type, agent):
        self.subscribers[event_type].append(agent)

    def publish(self, event_type, data):
        for agent in self.subscribers[event_type]:
            agent.handle_event(event_type, data)

# 使用示例
bus = EventBus()
bus.subscribe('code_committed', reviewer_agent)
bus.subscribe('code_committed', tester_agent)

# Coder 提交代码后发布事件
bus.publish('code_committed', {'code': '...'})

3. 共享状态

class SharedState:
    def __init__(self):
        self.state = {}
        self.lock = threading.Lock()

    def update(self, key, value):
        with self.lock:
            self.state[key] = value

    def get(self, key):
        with self.lock:
            return self.state.get(key)

# 所有 Agent 共享状态
state = SharedState()
state.update('current_task', 'implement_login')
state.update('progress', 0.5)

冲突解决

多 Agent 协作不可避免会产生冲突:

1. 任务冲突

场景:两个 Agent 都想修改同一个文件

解决方案:锁机制

class FileLock:
    def __init__(self):
        self.locks = {}

    def acquire(self, file_path, agent):
        if file_path not in self.locks:
            self.locks[file_path] = agent
            return True
        return False

    def release(self, file_path, agent):
        if self.locks.get(file_path) == agent:
            del self.locks[file_path]

2. 结果冲突

场景:Coder 和 Reviewer 对代码质量有分歧

解决方案:投票机制

def resolve_conflict(opinions):
    """投票解决冲突"""
    # 收集所有意见
    votes = defaultdict(int)
    for agent, opinion in opinions.items():
        votes[opinion] += agent.authority_weight

    # 选择权重最高的意见
    return max(votes.items(), key=lambda x: x[1])[0]

3. 优先级冲突

场景:多个任务同时到达,资源不足

解决方案:优先级队列

import heapq

class PriorityQueue:
    def __init__(self):
        self.queue = []

    def add_task(self, task, priority):
        heapq.heappush(self.queue, (priority, task))

    def get_next_task(self):
        return heapq.heappop(self.queue)[1] if self.queue else None

最佳实践

1. 清晰的角色定义

Agent 角色 职责 输入 输出
Planner 制定计划 需求文档 任务列表
Coder 写代码 任务描述 代码文件
Reviewer 审查代码 代码文件 审查报告
Tester 测试验证 代码 + 测试用例 测试报告

2. 明确的通信协议

# 标准消息格式
{
    "msg_id": "uuid",
    "sender": "agent_name",
    "receiver": "agent_name",
    "type": "task|result|query|feedback",
    "content": {...},
    "timestamp": "2024-01-01T00:00:00Z",
    "priority": "high|medium|low"
}

3. 监控和日志

class AgentMonitor:
    def __init__(self):
        self.logs = []

    def log_interaction(self, sender, receiver, message):
        self.logs.append({
            'time': datetime.now(),
            'from': sender,
            'to': receiver,
            'message': message
        })

    def visualize(self):
        """可视化 Agent 交互"""
        # 生成 Mermaid 图表
        diagram = "graph TD\n"
        for log in self.logs:
            diagram += f"{log['from']} --> {log['to']}: {log['message']['type']}\n"
        return diagram

总结

多 Agent 协作是从"个人英雄主义"到"团队作战"的转变:

关键收获

  1. 专业化分工,各司其职
  2. 并行处理,提升效率
  3. 互相监督,提高质量
  4. 弹性扩展,按需增减

架构选择

  • 中心化:适合流程固定的任务
  • 对等式:适合创新性探索
  • 黑板式:适合知识共享场景

最佳实践

  1. 清晰的角色定义
  2. 标准化的通信协议
  3. 完善的冲突解决机制
  4. 全面的监控和日志

下一步

  • 实现一个简单的双 Agent 协作系统
  • 尝试黑板架构处理复杂任务
  • 监控并优化你的多 Agent 系统

记住:好的团队不是"人越多越好",而是"每个人都在最合适的位置"。


系列导航

Views: 0