首页 运维干货多 Agent 协作系统开发:AutoGen、CrewAI 与自定义编排框架

多 Agent 协作系统开发:AutoGen、CrewAI 与自定义编排框架

运维派隶属马哥教育旗下专业运维社区,是国内成立最早的IT运维技术社区,欢迎关注公众号:yunweipai
领取学习更多免费Linux云计算、Python、Docker、K8s教程关注公众号:马哥linux运维

适用读者:正在搭建多 Agent 系统、踩过 Agent 互相踢皮球的坑、被 token 成本烧穿过的工程师。本文不讲”什么是 Agent”,讲:怎么设计 Agent 角色、通信协议、状态机、工具注册、失败兜底,以及怎么选 AutoGen / CrewAI / 自研编排。


引子:那个让 AI 团队”踢皮球”到天亮的 bug

去年 Q4,我们做了一个研究助手 Agent,意图是替代初级分析师做竞品调研:用户问”帮我调研一下国内 LLM 推理框架的现状”,期望输出结构化的对比报告。

最初的实现很简单——三个 Agent:

  • Researcher:联网搜资料,产出信息碎片
  • Writer:把碎片整理成段落
  • Reviewer:审稿、纠错、给修改意见

我们用了 AutoGen 的 GroupChat,三个 Agent 轮流发言。看上去很合理。

实际跑了三天,问题一个接一个:

  1. 无限循环:Reviewer 说”补充 2025 年数据”,Researcher 补完,Writer 又说”再补充 2026 年数据”,Reviewer 又要补……一个简单任务跑了 47 轮还没结束,token 烧了 $14。
  2. 幻觉传染:Researcher 引用了一篇不存在的论文,Writer 把它当真,Reviewer 居然没看出来(因为 Reviewer 也信任前文)。
  3. 工具调用失败无人兜底:Researcher 调搜索 API 超时,整个流程就卡死,没有重试、没有降级。
  4. 上下文爆炸:每轮都把所有历史塞进 prompt,第 30 轮时单次调用 tokens_in 就 8000+。
  5. 没有人工介入机制:用户问”对了,帮我加上 GPU 厂商对比”,Agent 完全没接到。

复盘后我们重写了整个系统,引入了显式状态机 + 工具注册表 + 消息总线 + 成本熔断。这篇文章把那套架构讲透。


一、多 Agent 系统到底解决什么问题

多 Agent 系统解决的是 “单个 LLM 调用搞不定、需要多个角色分工、需要工具调用、需要迭代协作” 的复杂任务。它的本质不是”用 LLM 调 LLM”,而是分布式协作系统设计

1.1 什么时候必须上多 Agent

场景是否需要多 Agent单 Agent 方案
单一意图、单一回答(如摘要、翻译)❌ 不需要直接 prompt
多步但流程固定(如 SQL 生成 + 执行)⚠️ 链式调用即可LangChain LCEL / LlamaIndex QueryPipeline
多角色分工、需要工具、需要迭代(如研究报告)✅ 必须单 Agent prompt 太长、效果差
需要人机协同(中间需要审核)✅ 必须单 Agent 没法暂停等人
任务可并行(如多源数据汇总)✅ 强烈推荐串行调用太慢
失败需要降级(如搜索 API 挂了)✅ 必须单 Agent 没兜底

1.2 单 Agent vs 多 Agent 的真实差距

一个具体例子:用户问”对比一下 Qwen 和 Llama 3 的中文能力”。

单 Agent prompt(不行)

你是一个 AI 研究员,请联网搜索 Qwen 和 Llama 3 的最新评测,
然后对比它们的中文能力,输出 Markdown 格式的报告。

问题:

  • 模型知识截止 + 联网工具一次只能搜一次
  • 报告质量完全依赖模型本身,没有分工
  • 无法并行(必须等搜索完成才能写)

多 Agent(合理)

  • Planner:拆解任务 → “搜索 Qwen 最新进展” + “搜索 Llama 3 最新进展” + “找评测榜单”
  • Researcher × 2(并行):分别执行搜索任务
  • Writer:合并结果,撰写对比章节
  • Reviewer:核对事实,补充数据
  • 最终合并输出

效果差距是明显的,但前提是编排设计得当——否则就像我们开头那个”踢皮球 47 轮”的惨案。


二、适合什么场景 / 不适合什么场景

场景适合度推荐方案
调研报告 / 综述类任务⭐⭐⭐⭐⭐CrewAI 或自研编排(AutoGen 默认配置容易循环)
代码生成 + 自动测试 + 自动部署⭐⭐⭐⭐⭐AutoGen(工具调用强)
多轮客服(FAQ + 工单 + 升级人工)⭐⭐⭐⭐自研编排(状态机清晰)
SQL 生成(自然语言 → SQL → 执行)⭐⭐⭐单 Agent + 工具调用就够了
数据分析(自动 pandas / 自动 BI)⭐⭐⭐⭐CrewAI
简单 RAG 问答❌ 不要单 Agent + LlamaIndex
1-2 步任务❌ 不要多 Agent 是过度工程

💡 一句话记忆3 步以上 + 至少 1 个工具调用 + 需要迭代的任务才值得上多 Agent。其它都用单 Agent。


三、整体架构:五层编排模型

我们后来沉淀出的多 Agent 架构分五层:

┌────────────────────────────────────────────────────────────────┐
│ ① 用户接口层 (Interface Layer)                                   │
│    Web UI / API / Slack / CLI                                    │
│    负责:发起任务、显示进度、收集反馈、人工介入                      │
└────────────────────────────────────────────────────────────────┘
                                 ↓ ↑ (双向)
┌────────────────────────────────────────────────────────────────┐
│ ② 编排层 (Orchestration Layer)                                   │
│    Orchestrator: 状态机 / 任务队列 / 进度跟踪                      │
│    负责:决定下一步谁做、是否终止、是否需要人介入                      │
└────────────────────────────────────────────────────────────────┘
                                 ↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ③ 消息总线 (Message Bus)                                          │
│    任务消息 / 工具调用消息 / 状态变更事件                            │
│    实现:Redis Streams / Kafka / 内存 queue(按规模选)             │
└────────────────────────────────────────────────────────────────┘
                                 ↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ④ Agent 集群 (Agent Pool)                                        │
│    ┌──────────────┐ ┌──────────────┐ ┌──────────────┐            │
│    │  Researcher  │ │   Writer     │ │  Reviewer    │  ...        │
│    │  + 搜索工具   │ │  + 写作模板   │ │  + 评分工具   │            │
│    └──────────────┘ └──────────────┘ └──────────────┘            │
│    每个 Agent: LLM + System Prompt + Tools + Memory               │
└────────────────────────────────────────────────────────────────┘
                                 ↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ⑤ 工具与数据层 (Tool & Data Layer)                                │
│    搜索 API / 数据库 / 代码执行 / 文件系统 / 内部 API                 │
│    注册到 ToolRegistry,支持超时、重试、限流、降级                      │
└────────────────────────────────────────────────────────────────┘

数据流(一次”竞品调研”任务的完整链路):

[User] "调研国内 LLM 推理框架"
  ↓
[Orchestrator] 创建 Task, 状态=initialized
  ↓ 状态=running
[Orchestrator] 调用 Planner Agent 拆解任务
  → 输出: [task_1: 搜索 vLLM, task_2: 搜索 TGI, task_3: 搜索 TensorRT-LLM]
  ↓
[Orchestrator] 把 task_1/2/3 放入消息总线(并行)
  ↓
[Researcher #1] 消费 task_1 → 调用搜索工具 → 产出 vllm_research.md
[Researcher #2] 消费 task_2 → 调用搜索工具 → 产出 tgi_research.md
[Researcher #3] 消费 task_3 → 调用搜索工具 → 产出 trtllm_research.md
  ↓ (等待三个都完成)
[Orchestrator] 触发 Writer Agent
  → 输入:三个研究结果
  → 输出:对比报告 draft_v1.md
  ↓
[Orchestrator] 触发 Reviewer Agent
  → 输入:draft_v1.md
  → 输出:审稿意见(如果有问题,回 Writer 修订,最多 2 轮)
  ↓
[Orchestrator] 状态=completed, 返回结果给用户

四、核心流程:从一次任务看完整链路

4.1 任务定义:结构化任务描述

# tasks/research_task.py
from pydantic import BaseModel, Field

class ResearchTask(BaseModel):
    task_id: str
    user_query: str
    task_type: Literal["competitor_research", "code_review", "data_analysis"]
    subtasks: list["SubTask"] = Field(default_factory=list)
    state: Literal["initialized", "running", "waiting_human", "completed", "failed"] = "initialized"
    budget_usd: float = 1.0# 成本熔断
    spent_usd: float = 0.0
    human_intervention_required: bool = False
    artifacts: dict[str, str] = Field(default_factory=dict)  # 中间产物
    final_output: str | None = None
    error: str | None = None

class SubTask(BaseModel):
    sub_id: str
    role: Literal["researcher", "writer", "reviewer"]
    input: dict
    output: str | None = None
    state: Literal["pending", "running", "completed", "failed"] = "pending"
    retries: int = 0
    max_retries: int = 2

4.2 编排器:状态机驱动

# orchestrator/state_machine.py
from transitions import Machine

class OrchestratorStateMachine:
    states = ["initialized", "planning", "researching", "writing", "reviewing",
              "waiting_human", "completed", "failed"]

    def __init__(self, task: ResearchTask):
        self.task = task
        self.machine = Machine(
            model=self,
            states=OrchestratorStateMachine.states,
            initial="initialized",
        )
        # 状态转移规则
        self.machine.add_transition("start_planning", "initialized", "planning")
        self.machine.add_transition("plan_done", "planning", "researching")
        self.machine.add_transition("research_done", "researching", "writing")
        self.machine.add_transition("review_done", "reviewing", "completed")
        self.machine.add_transition("needs_revision", "reviewing", "writing")
        self.machine.add_transition("request_human", "*", "waiting_human")
        self.machine.add_transition("human_responded", "waiting_human", "researching")
        self.machine.add_transition("fail", "*", "failed")
        # 成本熔断:任何状态都可触发
        self.machine.add_transition("budget_exceeded", "*", "failed")

4.3 Agent 实现:统一接口

# agents/base.py
from abc import ABC, abstractmethod
from typing import Callable
import time

class BaseAgent(ABC):
    def __init__(self, name: str, role: str, llm: LLMClient, tools: list["Tool"], prompt: str):
        self.name = name
        self.role = role
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.system_prompt = prompt

    @abstractmethod
    def run(self, subtask: SubTask, context: dict) -> str:
        """执行子任务,返回产物。"""
        raise NotImplementedError

    def call_tool(self, tool_name: str, **kwargs) -> str:
        tool = self.tools[tool_name]
        return tool.run(**kwargs)

class ResearcherAgent(BaseAgent):
    def run(self, subtask: SubTask, context: dict) -> str:
        # 1. 用 LLM 决定调用哪个搜索工具、搜什么关键词
        plan = self.llm.complete(
            system=self.system_prompt,
            user=f"任务:{subtask.input['query']}\n可用工具:{list(self.tools.keys())}",
            response_format={"search_query": str, "tool": str}
        )
        # 2. 调用工具(带超时重试)
        results = self.call_tool(plan["tool"], query=plan["search_query"])
        # 3. 整理结果
        return self.llm.complete(
            system="你是一名研究员,请把搜索结果整理成结构化笔记",
            user=results
        )

class WriterAgent(BaseAgent):
    def run(self, subtask: SubTask, context: dict) -> str:
        research_results = "\n\n".join(context["research_results"])
        return self.llm.complete(
            system=self.system_prompt,
            user=f"研究材料:\n{research_results}\n\n请撰写报告:{context['user_query']}",
            max_tokens=3000
        )

class ReviewerAgent(BaseAgent):
    def run(self, subtask: SubTask, context: dict) -> str:
        draft = context["draft"]
        review = self.llm.complete(
            system=self.system_prompt + "\n请用 JSON 输出:{passed: bool, issues: [...]}", 
            user=draft,
            response_format={"passed": bool, "issues": list}
        )
        if not review["passed"]:
            context["revision_count"] = context.get("revision_count", 0) + 1
            if context["revision_count"] >= 2:
                raise MaxRevisionExceededError()
        return review

4.4 工具注册表:超时、重试、降级

# tools/registry.py
import functools
import time
from typing import Callable

class Tool:
    def __init__(self, name: str, func: Callable, timeout_s: float = 30.0,
                 max_retries: int = 2, fallback: Callable = None):
        self.name = name
        self.func = func
        self.timeout_s = timeout_s
        self.max_retries = max_retries
        self.fallback = fallback

    def run(self, **kwargs) -> str:
        last_err = None
        for attempt in range(self.max_retries + 1):
            try:
                # 用线程池实现超时控制
                with ThreadPoolExecutor(max_workers=1) as ex:
                    fut = ex.submit(self.func, **kwargs)
                    return fut.result(timeout=self.timeout_s)
            except TimeoutError:
                last_err = f"tool {self.name} timeout after {self.timeout_s}s"
                logger.warning(f"[{self.name}] attempt {attempt+1} timeout")
            except Exception as e:
                last_err = f"tool {self.name} error: {e}"
                logger.warning(f"[{self.name}] attempt {attempt+1} error: {e}")
            time.sleep(2 ** attempt)  # 指数退避
        # 所有重试失败,走降级
        if self.fallback:
            logger.warning(f"[{self.name}] 降级到 fallback")
            return self.fallback(**kwargs)
        raise ToolExecutionError(last_err)

# 搜索工具示例
def web_search(query: str, num_results: int = 5) -> str:
    # 实际调用 Bing/Google API
    return serp_api.search(query, num_results=num_results)

def fallback_search(query: str, num_results: int = 5) -> str:
    """搜索 API 挂了时,用本地知识库兜底。"""
    return knowledge_base.search(query, top_k=num_results)

search_tool = Tool(
    name="web_search",
    func=web_search,
    timeout_s=10.0,
    max_retries=2,
    fallback=fallback_search,
)

4.5 编排器主循环:成本熔断 + 人机协同

# orchestrator/runner.py
class Orchestrator:
    def __init__(self, agents: dict, tool_registry: ToolRegistry, message_bus):
        self.agents = agents
        self.tools = tool_registry
        self.bus = message_bus
        self.cost_tracker = CostTracker()

    def run(self, task: ResearchTask):
        try:
            # Step 1: Planning
            self.sm.start_planning()
            subtasks = self.agents["planner"].run(
                SubTask(role="planner", input={"query": task.user_query}),
                context={}
            )
            task.subtasks = subtasks
            self.sm.plan_done()

            # Step 2: Researching (并行)
            self._run_parallel(task, role="researcher")

            # 成本检查
            self._check_budget(task)

            # Step 3: Writing
            self.sm.research_done()
            draft = self.agents["writer"].run(
                SubTask(role="writer"),
                context={"research_results": [s.output for s in task.subtasks if s.role=="researcher"]}
            )
            task.artifacts["draft_v1"] = draft

            # Step 4: Reviewing (最多 2 轮)
            for round_i in range(2):
                self.sm.to_reviewing() if round_i == 0 else None
                review = self.agents["reviewer"].run(
                    SubTask(role="reviewer"),
                    context={"draft": draft}
                )
                if review["passed"]:
                    task.final_output = draft
                    self.sm.review_done()
                    return
                # 修订
                draft = self.agents["writer"].run(
                    SubTask(role="writer"),
                    context={"research_results": [...], "feedback": review["issues"]}
                )

            # 超过最大修订轮次
            task.final_output = draft
            self.sm.review_done()

        except MaxRevisionExceededError:
            task.error = "Reviewer 多次不通过,使用最后一版"
            self.sm.fail()
        except BudgetExceededError:
            task.error = f"成本超预算 ${task.budget_usd},当前花费 ${task.spent_usd:.2f}"
            self.sm.fail()
        except Exception as e:
            task.error = str(e)
            self.sm.fail()
        finally:
            # 关键埋点
            logger.info(f"[task={task.task_id}] done, spent=${task.spent_usd:.2f}, "
                        f"rounds={self._count_rounds(task)}")

    def _check_budget(self, task: ResearchTask):
        if task.spent_usd > task.budget_usd:
            raise BudgetExceededError()

    def request_human(self, task: ResearchTask, question: str):
        """阻塞任务,向用户发起询问。"""
        self.sm.request_human()
        task.human_intervention_required = True
        # 发送通知(Slack/邮件/WebSocket)
        notifier.send(user_id=task.user_id, message=question)
        # 等待用户响应(可设超时)
        response = wait_for_response(task.task_id, timeout=300)
        self.sm.human_responded()
        return response

五、关键代码/配置

5.1 AutoGen 用法(适合代码生成类)

# autogen_code_review.py
import autogen

config_list = [{"model": "gpt-4o", "api_key": "..."}]

# 三个 Agent:Developer、Reviewer、UserProxy
developer = autogen.AssistantAgent(
    name="Developer",
    llm_config={"config_list": config_list},
    system_message="""你是 Python 开发者。接到需求后写代码、跑测试。"""
)

reviewer = autogen.AssistantAgent(
    name="Reviewer",
    llm_config={"config_list": config_list},
    system_message="""你是代码审查员。审 Developer 的代码,指出 bug、性能问题、安全问题。
    通过标准:所有测试通过 + 无明显安全问题 + 无 O(n³) 以上复杂度。"""
)

user_proxy = autogen.UserProxyAgent(
    name="UserProxy",
    human_input_mode="TERMINATE",  # 关键:每轮可让人介入
    code_execution_config={"work_dir": "code_workspace"},
    max_consecutive_auto_reply=5,  # 防止无限循环
)

# 限制最大轮次(解决踢皮球问题)
groupchat = autogen.GroupChat(
    agents=[user_proxy, developer, reviewer],
    messages=[],
    max_round=8,  # 硬性上限
    speaker_selection_method="round_robin",  # 不要 auto(auto 容易循环)
)

manager = autogen.GroupChatManager(groupchat=groupchat, llm_config={"config_list": config_list})

user_proxy.initiate_chat(
    manager,
    message="实现一个函数:输入 JSON,输出每个 key 的深度"
)

AutoGen 关键配置点

  • max_round 必须设,否则容易循环
  • speaker_selection_method="round_robin" 比 "auto" 更可控
  • max_consecutive_auto_reply 限制单 Agent 连续回复次数
  • human_input_mode="TERMINATE" 或 "ALWAYS" 让用户能介入

5.2 CrewAI 用法(适合报告类)

# crewai_research.py
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, FileReadTool

search_tool = SerperDevTool(api_key="...")
file_tool = FileReadTool()

researcher = Agent(
    role="高级研究员",
    goal="找到关于 {topic} 的最新、最准确的信息",
    backstory="你是一名资深技术分析师,擅长从海量资料中提炼关键信息。",
    tools=[search_tool, file_tool],
    verbose=True,
    allow_delegation=False,  # 防止 Agent 之间无限互相委托
)

writer = Agent(
    role="技术作家",
    goal="把研究成果写成清晰、结构化的 Markdown 报告",
    backstory="你是一名技术作家,擅长把复杂概念讲清楚。",
    tools=[],
    verbose=True,
)

reviewer = Agent(
    role="主编",
    goal="审稿、纠错、确保报告质量",
    backstory="你是一名严谨的主编,会从事实准确性、逻辑性、可读性三个维度审稿。",
    tools=[],
    verbose=True,
)

research_task = Task(
    description="调研 {topic} 的最新进展,输出结构化笔记",
    expected_output="Markdown 格式的研究笔记,包含 5 个关键发现",
    agent=researcher,
)

write_task = Task(
    description="基于研究笔记撰写完整报告",
    expected_output="3000 字以内的 Markdown 报告",
    agent=writer,
    context=[research_task],  # 依赖研究结果
)

review_task = Task(
    description="审稿,输出修改意见或 PASS",
    expected_output="JSON: {passed: bool, issues: [str]}",
    agent=reviewer,
    context=[write_task],
)

crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, write_task, review_task],
    process=Process.sequential,  # 不要 hierarchical(容易循环)
    max_iterations=10,  # 硬性上限
    verbose=2,
)

result = crew.kickoff(inputs={"topic": "国内 LLM 推理框架"})

CrewAI 关键配置点

  • process=Process.sequential 比 hierarchical 更可控
  • allow_delegation=False 防止 Agent 之间互相踢皮球
  • max_iterations 必须设
  • context=[prev_task] 显式声明依赖

5.3 自研编排 vs 框架选型

维度AutoGenCrewAI自研编排
上手速度⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐(要写不少代码)
流程可控性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
工具调用⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
状态管理⭐⭐(要靠 GroupChat 兜底)⭐⭐⭐⭐⭐⭐⭐⭐
成本控制⭐⭐(token 容易爆)⭐⭐⭐⭐⭐⭐⭐
人机协同⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
大规模并行⭐⭐⭐⭐⭐⭐⭐⭐⭐
维护成本⭐⭐⭐⭐⭐⭐⭐⭐(自己维护)

选型建议

  • 1-3 个 Agent、流程固定、不在乎 token → CrewAI
  • 需要强工具调用、代码执行、自动测试 → AutoGen
  • 大规模并行、复杂状态、需要严格成本控制、有人机协同 → 自研编排

六、上线后如何评估效果

6.1 效果指标

指标数据来源健康阈值
任务完成率Orchestrator 日志≥ 85%
平均轮次Orchestrator 日志≤ 8 轮
人工介入率用户行为日志≤ 20%
终稿人工评分Reviewer + 业务方≥ 4.0/5.0
幻觉率(事实性错误)抽检 + LLM-as-judge≤ 5%

6.2 成本指标

指标数据来源健康阈值
单任务平均成本LLM API 用量≤ $0.50
月度总成本PromptLayer / 自研埋点预算内
Token 利用率(输出/input)同上≥ 0.3
缓存命中率Redis≥ 30%

6.3 稳定性指标

指标数据来源健康阈值
任务平均完成时间Orchestrator≤ 60s
P95 延迟同上≤ 120s
工具调用失败率ToolRegistry≤ 2%
循环/超时率(max_round 触发)Orchestrator≤ 1%
状态机非法转移Orchestrator= 0

6.4 上线 checklist

  • ⬜ 状态机所有转移有单元测试
  • ⬜ 每个工具都有超时 + 重试 + 降级
  • ⬜ 成本熔断在 Orchestrator 主循环必经路径
  • ⬜ max_round / max_iterations / max_consecutive_auto_reply `max_round` / `max_iterations` / `max_consecutive_auto_reply` 都设了
  • ⬜ 人机协同入口测试过(Slack/WebSocket/邮件都能通知)
  • ⬜ Golden task 跑通:5 个典型任务 + 5 个对抗任务
  • ⬜ 监控告警:成本异常、循环超时、工具失败率
  • ⬜ 回滚方案:可以一键切回上一个 prompt 版本

七、常见坑和优化方向

坑 1:Agent 互相踢皮球,无限循环

症状:47 轮还没结束,token 烧穿。

根因:AutoGen speaker_selection_method="auto" 让 LLM 决定下一个发言者,结果 Agent 之间礼貌地”你来补充一下”、”我觉得还需要”。

解决

  1. 强制设 max_round / max_iterations
  2. 用 round_robin 或写自定义 selection 函数
  3. Orchestrator 加显式状态机,超出 N 轮强制 failed

坑 2:上下文爆炸,token 暴涨

症状:第 30 轮单次调用 8000+ tokens。

根因:每轮把完整历史塞进 prompt,没有摘要、没有截断。

解决

# 用滑动窗口 + 摘要
class ContextManager:
    def __init__(self, max_recent_turns=5, summary_max_tokens=500):
        self.max_recent = max_recent_turns
        self.summary_tokens = summary_max_tokens
        self.history = []
        self.summary = ""

    def add(self, role: str, content: str):
        self.history.append({"role": role, "content": content})
        if len(self.history) > self.max_recent * 2:
            # 太老的对话摘要
            old = self.history[:len(self.history) - self.max_recent * 2]
            self.summary = self._summarize(old + [{"summary": self.summary}])
            self.history = self.history[-self.max_recent * 2:]

    def get_context(self) -> str:
        return f"对话摘要:{self.summary}\n\n最近对话:\n" + \
               "\n".join(f"{m['role']}: {m['content']}" for m in self.history)

坑 3:工具调用幻觉 + 失败无人兜底

症状:Agent 调用 search_tool(query="vllm github") 拿不到结果,默默编了一段假数据继续。

解决

class Tool:
    def run(self, **kwargs):
        result = self._safe_call(**kwargs)
        if not result or "no results" in result.lower():
            # 强制返回空,让 Agent 知道没搜到
            return "[搜索无结果] 请换个关键词或承认信息缺失。"
        return result

并且在 Agent prompt 里强调:”如果工具返回空,请承认不知道,不要编造”。

坑 4:Reviewer 总是通过,无审稿效果

症状:Reviewer Agent 几乎从不 fail,所有稿件都过。

根因:LLM 默认倾向”赞同”,需要明确标准。

解决

reviewer_system_prompt = """你是主编。审稿必须严格遵守以下标准:
1. 所有事实必须有 source 引用,无 source 视为不通过
2. 数据点必须有具体数字,不能用"很多"、"大部分"
3. 逻辑推理链完整,不能有跳跃

输出 JSON:
{
  "passed": false,
  "issues": [
    {"type": "no_source", "location": "第2段第3句"},
    {"type": "vague_number", "location": "第3段"}
  ]
}

如果有任何问题,必须 passed=false,不允许放过。"""

坑 5:token 成本失控

症状:月度账单从 $500 涨到 $8000。

根因

  1. Researcher 反复搜索
  2. Writer 反复重写
  3. Reviewer 反复审稿
  4. 上下文一直累积

解决

class CostCircuitBreaker:
    def __init__(self, budget_usd: float):
        self.budget = budget_usd
        self.spent = 0.0

    def check(self, estimated_cost: float):
        if self.spent + estimated_cost > self.budget:
            raise BudgetExceededError(
                f"将超出预算 ${self.budget:.2f},当前已花 ${self.spent:.2f}"
            )

    def record(self, actual_cost: float):
        self.spent += actual_cost
        # 80% 警告
        if self.spent > self.budget * 0.8:
            logger.warning(f"⚠️ 成本已达预算 {self.spent/self.budget:.0%}")

优化方向

当前中期长期
串行 Agent显式并行(消息总线 + Worker Pool)DAG 编排(DAG 描述依赖,自动调度)
手动状态机可视化状态机(Stateful 平台)自我修复(失败自动重路由)
人工抽检评测LLM-as-judge 自动评测 + 人工 spot check全自动 A/B + 自动 prompt 调优
固定 promptprompt 版本化(接上篇文章 08)prompt + Agent 配置自动寻优
单模型多模型路由(简单任务用小模型)Agent 蒸馏(小 Agent 模仿大 Agent)

结语:一页速查表

┌──────────────────────────────────────────────────────────┐
│  多 Agent 系统速查                                         │
├──────────────────────────────────────────────────────────┤
│  选型      3 Agent 报告类 → CrewAI                          │
│            强工具代码类 → AutoGen                            │
│            大规模/严格控成本 → 自研编排                       │
│  上限      max_round / max_iterations 必须设                │
│  工具      Tool: 超时 + 重试 + 降级                          │
│  上下文    滑动窗口 + 摘要,避免 token 爆炸                  │
│  状态机    transitions 库 / 自研                              │
│  成本      CostCircuitBreaker + 单任务预算                   │
│  评测      完成率 + 轮次 + 人工评分 + LLM-as-judge            │
│  监控      循环率、token 单价、工具失败率、幻觉率              │
└──────────────────────────────────────────────────────────┘

最后一句话:多 Agent 系统的复杂度不在 LLM 调用,而在编排。把状态机、消息总线、工具注册、成本熔断这几块做扎实,Agent 数量从 3 个扩到 30 个也只是工作量问题。否则,3 个 Agent 也能写崩。

多 Agent 协作系统开发:AutoGen、CrewAI 与自定义编排框架插图

本文链接:https://www.yunweipai.com/49297.html

网友评论comments

发表回复

您的电子邮箱地址不会被公开。

暂无评论

Copyright © 2012-2022 YUNWEIPAI.COM - 运维派 京ICP备16064699号-6
扫二维码
扫二维码
返回顶部