AI智能体(七):多智能体架构
神译局是36氪旗下编译团队,关注科技、商业、职场、生活等领域,重点介绍国外的新技术、新观点、新风向。
编者按:2025年是AI智能体元年。本系列文章旨在介绍AI智能体的概念、类型、原理、架构、开发等,为进一步了解AI智能体提供入门知识。本文为系列文章的第七篇,文章来自编译。
引言
智能体是通过大语言模型(LLM)控制应用流程的系统。随着系统复杂度增加,管理和扩展会愈发困难,常见问题包括:
智能体工具过多导致决策低效
单智能体难以处理复杂的上下文
系统需要多个专业领域的配合(如规划师、研究员、数学专家等)
随着智能框架的发展,企业开始构建多智能体系统,寻求通用解决方案。两年前研究者开发了ChatDev协作系统,这个虚拟软件公司通过CEO、产品总监、设计师、程序员等多角色智能体协同工作,成功开发出电子游戏。
这些智能体成功协作开发游戏后,人们认为多角色架构可解决所有软件工程问题。但实践表明,某些场景中简单架构反而更高效经济。
1.1 单智能体与多智能体架构
初期单智能体方案(处理浏览器导航、文件操作等全功能)看似可行,但随着任务复杂度提升,会面临以下挑战:
单智能体架构
问题表现包括:
工具过载:选择困难症
上下文膨胀:超出处理能力
错误增加:泛化职责导致输出劣化
在涉及到数据提取、报告生成等差异化任务时,多智能体分工优势显现。各智能体专注专业领域,提升解决方案质量,同时降低开发难度。
多智能体架构
2. 多智能体架构
单智能体适合简单明确的任务,多智能体则适用于需要专业知识协作、动态复杂且需扩展性的场景。
2.1 多智能体系统模式
智能体连接方式包括:
2.1.1 并行模式
多个智能体同步处理任务不同部分
示例:利用三个智能体并行完成文本摘要、翻译和情感分析
代码:
from typing import Dict, Any, TypedDict
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import re
import time
# Define the state
class AgentState(TypedDict):
text: str
summary: str
translation: str
sentiment: str
summary_time: float
translation_time: float
sentiment_time: float
# Summarization Agent
def summarize_agent(state: AgentState) -> Dict[str, Any]:
print("Summarization Agent: Running")
start_time = time.time()
try:
text = state["text"]
if not text.strip():
return {
"summary": "No text provided for summarization.",
"summary_time": 0.0
}
time.sleep(2)
sentences = re.split(r'(?<=[.!?]) +', text.strip())
scored_sentences = [(s, len(s.split())) for s in sentences if s]
top_sentences = [s for s, _ in sorted(scored_sentences, key=lambda x: x[1], reverse=True)[:2]]
summary = " ".join(top_sentences) if top_sentences else "Text too short to summarize."
processing_time = time.time() - start_time
print(f"Summarization Agent: Completed in {processing_time:.2f} seconds")
return {
"summary": summary,
"summary_time": processing_time
}
except Exception as e:
return {
"summary": f"Error in summarization: {str(e)}",
"summary_time": 0.0
}
# Translation Agent
def translate_agent(state: AgentState) -> Dict[str, Any]:
print("Translation Agent: Running")
start_time = time.time()
try:
text = state["text"]
if not text.strip():
return {
"translation": "No text provided for translation.",
"translation_time": 0.0
}
time.sleep(3)
translation = (
"El nuevo parque en la ciudad es una maravillosa adición. "
"Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. "
"Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña."
)
processing_time = time.time() - start_time
print(f"Translation Agent: Completed in {processing_time:.2f} seconds")
return {
"translation": translation,
"translation_time": processing_time
}
except Exception as e:
return {
"translation": f"Error in translation: {str(e)}",
"translation_time": 0.0
}
# Sentiment Agent
def sentiment_agent(state: AgentState) -> Dict[str, Any]:
print("Sentiment Agent: Running")
start_time = time.time()
try:
text = state["text"]
if not text.strip():
return {
"sentiment": "No text provided for sentiment analysis.",
"sentiment_time": 0.0
}
time.sleep(1.5)
blob = TextBlob(text)
polarity = blob.sentiment.polarity
subjectivity = blob.sentiment.subjectivity
sentiment = "Positive" if polarity > 0 else "Negative" if polarity < 0 else "Neutral"
result = f"{sentiment} (Polarity: {polarity:.2f}, Subjectivity: {subjectivity:.2f})"
processing_time = time.time() - start_time
print(f"Sentiment Agent: Completed in {processing_time:.2f} seconds")
return {
"sentiment": result,
"sentiment_time": processing_time
}
except Exception as e:
return {
"sentiment": f"Error in sentiment analysis: {str(e)}",
"sentiment_time": 0.0
}
# Join Node
def join_parallel_results(state: AgentState) -> AgentState:
return state
# Build the Graph
def build_parallel_graph() -> StateGraph:
workflow = StateGraph(AgentState)
# Define parallel branches
parallel_branches = {
"summarize_node": summarize_agent,
"translate_node": translate_agent,
"sentiment_node": sentiment_agent
}
# Add parallel processing nodes
for name, agent in parallel_branches.items():
workflow.add_node(name, agent)
# Add branching and joining nodes
workflow.add_node("branch", lambda state: state) # Simplified branch function
workflow.add_node("join", join_parallel_results)
# Set entry point
workflow.set_entry_point("branch")
# Add edges for parallel execution
for name in parallel_branches:
workflow.add_edge("branch", name)
workflow.add_edge(name, "join")
workflow.add_edge("join", END)
return workflow.compile()
# Main function
def main():
text = (
"The new park in the city is a wonderful addition. Families are enjoying the open spaces, "
"and children love the playground. However, some people think the parking area is too small."
)
initial_state: AgentState = {
"text": text,
"summary": "",
"translation": "",
"sentiment": "",
"summary_time": 0.0,
"translation_time": 0.0,
"sentiment_time": 0.0
}
print("\nBuilding new graph...")
app = build_parallel_graph()
print("\nStarting parallel processing...")
start_time = time.time()
config = RunnableConfig(parallel=True)
result = app.invoke(initial_state, config=config)
total_time = time.time() - start_time
print("\n=== Parallel Task Results ===")
print(f"Input Text:\n{text}\n")
print(f"Summary:\n{result['summary']}\n")
print(f"Translation (Spanish):\n{result['translation']}\n")
print(f"Sentiment Analysis:\n{result['sentiment']}\n")
print("\n=== Processing Times ===")
processing_times = {
"summary": result["summary_time"],
"translation": result["translation_time"],
"sentiment": result["sentiment_time"]
}
for agent, time_taken in processing_times.items():
print(f"{agent.capitalize()}: {time_taken:.2f} seconds")
print(f"\nTotal Wall Clock Time: {total_time:.2f} seconds")
print(f"Sum of Individual Processing Times: {sum(processing_times.values()):.2f} seconds")
print(f"Time Saved by Parallel Processing: {sum(processing_times.values()) - total_time:.2f} seconds")
if __name__ == "__main__":
main()
输出:
Building new graph...
Starting parallel processing...
Sentiment Agent: Running
Summarization Agent: Running
Translation Agent: Running
Sentiment Agent: Completed in 1.50 seconds
Summarization Agent: Completed in 2.00 seconds
Translation Agent: Completed in 3.00 seconds
=== Parallel Task Results ===
Input Text:
The new park in the city is a wonderful addition. Families are enjoying the open spaces, and children love the playground. However, some people think the parking area is too small.
Summary:
Families are enjoying the open spaces, and children love the playground. The new park in the city is a wonderful addition.
Translation (Spanish):
El nuevo parque en la ciudad es una maravillosa adición. Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña.
Sentiment Analysis:
Positive (Polarity: 0.31, Subjectivity: 0.59)
=== Processing Times ===
Summary: 2.00 seconds
Translation: 3.00 seconds
Sentiment: 1.50 seconds
Total Wall Clock Time: 3.01 seconds
Sum of Individual Processing Times: 6.50 seconds
Time Saved by Parallel Processing: 3.50 seconds
核心要点:
◆ 并行性:三个任务(摘要生成、翻译、情感分析)同步执行,总耗时仅等于最长子任务耗时(3秒)
◆ 独立性:各智能体独立处理原始文本,执行过程无需跨智能体通信
◆ 协调机制:通过队列实现结果安全收集与有序呈现
◆ 典型场景:文本摘要、多语言翻译和情感分析的组合处理是NLP领域常见工作流,特别适合大规模文本的并行处理
2.1.2 串行
任务按顺序处理,一个智能体的输出成为另一个智能体的输入。
例子:多步审批
代码:
from typing import Dict
from langgraph.graph import StateGraph, MessagesState, END
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import HumanMessage, AIMessage
import json
# Agent 1: Team Lead
def team_lead_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("Agent (Team Lead): Starting review")
messages = state["messages"]
proposal = json.loads(messages[0].content)
title = proposal.get("title", "")
amount = proposal.get("amount", 0.0)
if not title or amount <= 0:
status = "Rejected"
comment = "Team Lead: Proposal rejected due to missing title or invalid amount."
goto = END
else:
status = "Approved by Team Lead"
comment = "Team Lead: Proposal is complete and approved."
goto = "dept_manager"
print(f"Agent (Team Lead): Review complete - {status}")
messages.append(AIMessage(
content=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "team_lead", "goto": goto}
))
return {"messages": messages}
# Agent 2: Department Manager
def dept_manager_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("Agent (Department Manager): Starting review")
messages = state["messages"]
team_lead_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "team_lead"), None)
proposal = json.loads(messages[0].content)
amount = proposal.get("amount", 0.0)
if json.loads(team_lead_msg.content)["status"] != "Approved by Team Lead":
status = "Rejected"
comment = "Department Manager: Skipped due to Team Lead rejection."
goto = END
elif amount > 100000:
status = "Rejected"
comment = "Department Manager: Budget exceeds limit."
goto = END
else:
status = "Approved by Department Manager"
comment = "Department Manager: Budget is within limits."
goto = "finance_director"
print(f"Agent (Department Manager): Review complete - {status}")
messages.append(AIMessage(
content=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "dept_manager", "goto": goto}
))
return {"messages": messages}
# Agent 3: Finance Director
def finance_director_agent(state: MessagesState, config: RunnableConfig) -> Dict:
print("Agent (Finance Director): Starting review")
messages = state["messages"]
dept_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "dept_manager"), None)
proposal = json.loads(messages[0].content)
amount = proposal.get("amount", 0.0)
if json.loads(dept_msg.content)["status"] != "Approved by Department Manager":
status = "Rejected"
comment = "Finance Director: Skipped due to Dept Manager rejection."
elif amount > 50000:
status = "Rejected"
comment = "Finance Director: Insufficient budget."
else:
status = "Approved"
comment = "Finance Director: Approved and feasible."
print(f"Agent (Finance Director): Review complete - {status}")
messages.append(AIMessage(
content=json.dumps({"status": status, "comment": comment}),
additional_kwargs={"agent": "finance_director", "goto": END}
))
return {"messages": messages}
# Routing function
def route_step(state: MessagesState) -> str:
for msg in reversed(state["messages"]):
goto = msg.additional_kwargs.get("goto")
if goto:
print(f"Routing: Agent {msg.additional_kwargs.get('agent')} set goto to {goto}")
return goto
return END
# Build LangGraph
builder = StateGraph(MessagesState)
builder.add_node("team_lead", team_lead_agent)
builder.add_node("dept_manager", dept_manager_agent)
builder.add_node("finance_director", finance_director_agent)
builder.set_entry_point("team_lead")
builder.add_conditional_edges("team_lead", route_step, {
"dept_manager": "dept_manager",
END: END
})
builder.add_conditional_edges("dept_manager", route_step, {
"finance_director": "finance_director",
END: END
})
builder.add_conditional_edges("finance_director", route_step, {
END: END
})
workflow = builder.compile()
# Main runner
def main():
initial_state = {
"messages": [
HumanMessage(
content=json.dumps({
"title": "New Equipment Purchase",
"amount": 40000.0,
"department": "Engineering"
})
)
]
}
result = workflow.invoke(initial_state)
messages = result["messages"]
proposal = json.loads(messages[0].content)
print("\n=== Approval Results ===")
print(f"Proposal Title: {proposal['title']}")
final_status = "Unknown"
comments = []
for msg in messages[1:]:
if isinstance(msg, AIMessage):
try:
data = json.loads(msg.content)
if "status" in data:
final_status = data["status"]
if "comment" in data:
comments.append(data["comment"])
except Exception:
continue
print(f"Final Status: {final_status}")
print("Comments:")
for comment in comments:
print(f" - {comment}")
if __name__ == "__main__":
main()
输出(总额=4万美元):
Agent (Team Lead): Starting review
Agent (Team Lead): Review complete - Approved by Team Lead
Routing: Agent team_lead set goto to dept_manager
Agent (Department Manager): Starting review
Agent (Department Manager): Review complete - Approved by Department Manager
Routing: Agent dept_manager set goto to finance_director
Agent (Finance Director): Starting review
Agent (Finance Director): Review complete - Approved
Routing: Agent finance_director set goto to __end__
=== Approval Results ===
Proposal Title: New Equipment Purchase
Final Status: Approved
Comments:
- Team Lead: Proposal is complete and approved.
- Department Manager: Budget is within limits.
- Finance Director: Approved and feasible.
=== 串行执行模式 ===
执行流程:
智能体按顺序执行:团队主管 → 部门经理 → 财务总监
任一环节否决即终止流程,跳过后续处理
每个智能体都会修改共享提案对象,更新状态和批注
协调机制:
执行结果存储在列表,但提案对象在智能体间传递状态
采用单线程顺序执行,确保流程有序性
关键特征:
◆ 层层审批:模拟企业级的三级审批流程(团队层→部门层→财务层)
◆ 状态继承:提案对象在流程中持续累积审批意见(如团队主管批注"需成本估算"→部门经理补充"增加营销预算")
◆ 熔断机制:任一节点拒绝即触发early stopping(例如部门经理直接否决时,财务总监不再处理)
◆ 串行依赖:后置智能体可查看前置所有修改记录(如财务总监能看到团队主管和部门经理的全部批注)
2.1.3 循环
智能体按迭代循环的方式执行,根据其他智能体的反馈不断改进输出
例子:评估用例,如代码编写和代码测试
代码:
from typing import Dict, Any, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import textwrap
# State to track the workflow
class EvaluationState(Dict[str, Any]):
code: str = ""
feedback: str = ""
passed: bool = False
iteration: int = 0
max_iterations: int = 3
history: List[Dict] = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setdefault("code", "")
self.setdefault("feedback", "")
self.setdefault("passed", False)
self.setdefault("iteration", 0)
self.setdefault("max_iterations", 3)
self.setdefault("history", [])
# Agent 1: Code Writer
def code_writer_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:
print(f"Iteration {state['iteration'] + 1} - Code Writer: Generating code")
print(f"Iteration {state['iteration'] + 1} - Code Writer: Received feedback: {state['feedback']}")
iteration = state["iteration"] + 1
feedback = state["feedback"]
if iteration == 1:
# Initial attempt: Basic factorial with bugs (no handling for zero or negatives)
code = textwrap.dedent("""
def factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "Initial code generated."
elif "factorial(0)" in feedback.lower():
# Fix for zero case
code = textwrap.dedent("""
def factorial(n):
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "Fixed handling for n=0."
elif "factorial(-1)" in feedback.lower() or "negative" in feedback.lower():
# Fix for negative input
code = textwrap.dedent("""
def factorial(n):
if n < 0:
raise ValueError("Factorial not defined for negative numbers")
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
""")
writer_feedback = "Added error handling for negative inputs."
else:
code = state["code"]
writer_feedback = "No further improvements identified."
print(f"Iteration {iteration} - Code Writer: Code generated")
return {
"code": code,
"feedback": writer_feedback,
"iteration": iteration
}
# Agent 2: Code Tester
def code_tester_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:
print(f"Iteration {state['iteration']} - Code Tester: Testing code")
code = state["code"]
try:
# Define test cases
test_cases = [
(0, 1), # factorial(0) = 1
(1, 1), # factorial(1) = 1
(5, 120), # factorial(5) = 120
(-1, None), # Should raise ValueError
]
# Execute code in a safe namespace
namespace = {}
exec(code, namespace)
factorial = namespace.get('factorial')
if not callable(factorial):
return {"passed": False, "feedback": "No factorial function found."}
feedback_parts = []
passed = True
# Run all test cases and collect all failures
for input_val, expected in test_cases:
try:
result = factorial(input_val)
if expected is None: # Expecting an error
passed = False
feedback_parts.append(f"Test failed: factorial({input_val}) should raise an error.")
elif result != expected:
passed = False
feedback_parts.append(f"Test failed: factorial({input_val}) returned {result}, expected {expected}.")
except ValueError as ve:
if expected is not None:
passed = False
feedback_parts.append(f"Test failed: factorial({input_val}) raised ValueError unexpectedly: {str(ve)}")
except Exception as e:
passed = False
feedback_parts.append(f"Test failed: factorial({input_val}) caused error: {str(e)}")
feedback = "All tests passed!" if passed else "\n".join(feedback_parts)
print(f"Iteration {state['iteration']} - Code Tester: Testing complete - {'Passed' if passed else 'Failed'}")
# Log the attempt in history
history = state["history"]
history.append({
"iteration": state["iteration"],
"code": code,
"feedback": feedback,
"passed": passed
})
return {
"passed": passed,
"feedback": feedback,
"history": history
}
except Exception as e:
print(f"Iteration {state['iteration']} - Code Tester: Failed")
return {"passed": False, "feedback": f"Error in testing: {str(e)}"}
# Conditional edge to decide whether to loop or end
def should_continue(state: EvaluationState) -> str:
if state["passed"] or state["iteration"] >= state["max_iterations"]:
print(f"Iteration {state['iteration']} - {'Loop stops: Tests passed' if state['passed'] else 'Loop stops: Max iterations reached'}")
return "end"
print(f"Iteration {state['iteration']} - Loop continues: Tests failed")
return "code_writer"
# Build the LangGraph workflow
workflow = StateGraph(EvaluationState)
# Add nodes
workflow.add_node("code_writer", code_writer_agent)
workflow.add_node("code_tester", code_tester_agent)
# Add edges
workflow.set_entry_point("code_writer")
workflow.add_edge("code_writer", "code_tester")
workflow.add_conditional_edges(
"code_tester",
should_continue,
{
"code_writer": "code_writer",
"end": END
}
)
# Compile the graph
app = workflow.compile()
# Run the workflow
def main():
initial_state = EvaluationState()
result = app.invoke(initial_state)
# Display results
print("\n=== Evaluation Results ===")
print(f"Final Status: {'Passed' if result['passed'] else 'Failed'} after {result['iteration']} iteration(s)")
print(f"Final Code:\n{result['code']}")
print(f"Final Feedback:\n{result['feedback']}")
print("\nIteration History:")
for attempt in result["history"]:
print(f"Iteration {attempt['iteration']}:")
print(f" Code:\n{attempt['code']}")
print(f" Feedback: {attempt['feedback']}")
print(f" Passed: {attempt['passed']}\n")
if __name__ == "__main__":
main()
输出:
Iteration 1 - Code Writer: Generating code
Iteration 1 - Code Writer: Received feedback:
Iteration 1 - Code Writer: Code generated
Iteration 1 - Code Tester: Testing code
Iteration 1 - Code Tester: Testing complete - Failed
Iteration 1 - Loop continues: Tests failed
Iteration 2 - Code Writer: Generating code
Iteration 2 - Code Writer: Received feedback: Test failed: factorial(-1) should raise an error.
Iteration 2 - Code Writer: Code generated
Iteration 2 - Code Tester: Testing code
Iteration 2 - Code Tester: Testing complete - Passed
Iteration 2 - Loop stops: Tests passed
=== Evaluation Results ===
Final Status: Passed after 2 iteration(s)
Final Code:
def factorial(n):
if n < 0:
raise ValueError("Factorial not defined for negative numbers")
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
Final Feedback:
All tests passed!
Iteration History:
Iteration 1:
Code:
def factorial(n):
result = 1
for i in range(1, n + 1):
result *= i
return result
Feedback: Test failed: factorial(-1) should raise an error.
Passed: False
Iteration 2:
Code:
def factorial(n):
if n < 0:
raise ValueError("Factorial not defined for negative numbers")
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
Feedback: All tests passed!
Passed: True
全面反馈机制:代码测试模块现完整报告所有测试失败案例,确保代码编写模块获得必要信息以逐步修复问题
反馈处理优化:代码编写模块按优先级修复问题(先处理零值场景,再处理负数输入),实现渐进式改进
循环终止条件:当测试通过后适时终止循环,避免无意义的三次完整迭代
2.1.4 路由器
中心路由器根据任务或输入决定调用哪一个智能体
例子:客户支持工单路由
代码:
from typing import Dict, Any, TypedDict, Literal
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import re
import time
# Step 1: Define the State
# The state holds the ticket information and the processing results
class TicketState(TypedDict):
ticket_text: str # The content of the ticket
category: str # The determined category (Billing, Technical, General, or Unknown)
resolution: str # The resolution provided by the support team
processing_time: float # Time taken to process the ticket
# Step 2: Define the Router Agent
# This agent analyzes the ticket and determines its category
def router_agent(state: TicketState) -> Dict[str, Any]:
print("Router Agent: Analyzing ticket...")
start_time = time.time()
ticket_text = state["ticket_text"].lower()
# Simple keyword-based categorization (could be replaced with an LLM or ML model)
if any(keyword in ticket_text for keyword in ["billing", "payment", "invoice", "charge"]):
category = "Billing"
elif any(keyword in ticket_text for keyword in ["technical", "bug", "error", "crash"]):
category = "Technical"
elif any(keyword in ticket_text for keyword in ["general", "question", "inquiry", "info"]):
category = "General"
else:
category = "Unknown"
processing_time = time.time() - start_time
print(f"Router Agent: Categorized as '{category}' in {processing_time:.2f} seconds")
return {
"category": category,
"processing_time": processing_time
}
# Step 3: Define the Support Team Agents
# Each agent handles tickets for a specific category
# Billing Team Agent
def billing_team_agent(state: TicketState) -> Dict[str, Any]:
print("Billing Team Agent: Processing ticket...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"Billing Team: Reviewed ticket '{ticket_text}'. Please check your invoice details or contact our billing department for further assistance."
processing_time = time.time() - start_time
time.sleep(1) # Simulate processing time
print(f"Billing Team Agent: Completed in {processing_time:.2f} seconds")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# Technical Support Team Agent
def technical_team_agent(state: TicketState) -> Dict[str, Any]:
print("Technical Team Agent: Processing ticket...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"Technical Team: Reviewed ticket '{ticket_text}'. Please try restarting your device or submit a detailed error log for further investigation."
processing_time = time.time() - start_time
time.sleep(1.5) # Simulate processing time
print(f"Technical Team Agent: Completed in {processing_time:.2f} seconds")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# General Support Team Agent
def general_team_agent(state: TicketState) -> Dict[str, Any]:
print("General Team Agent: Processing ticket...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"General Team: Reviewed ticket '{ticket_text}'. For more information, please refer to our FAQ or contact us via email."
processing_time = time.time() - start_time
time.sleep(0.8) # Simulate processing time
print(f"General Team Agent: Completed in {processing_time:.2f} seconds")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# Manual Review Agent (for unknown categories)
def manual_review_agent(state: TicketState) -> Dict[str, Any]:
print("Manual Review Agent: Processing ticket...")
start_time = time.time()
ticket_text = state["ticket_text"]
resolution = f"Manual Review: Ticket '{ticket_text}' could not be categorized. Flagged for human review. Please assign to the appropriate team manually."
processing_time = time.time() - start_time
time.sleep(0.5) # Simulate processing time
print(f"Manual Review Agent: Completed in {processing_time:.2f} seconds")
return {
"resolution": resolution,
"processing_time": state["processing_time"] + processing_time
}
# Step 4: Define the Router Function
# This function determines the next node based on the ticket category
def route_ticket(state: TicketState) -> Literal["billing_team", "technical_team", "general_team", "manual_review"]:
category = state["category"]
print(f"Routing: Ticket category is '{category}'")
if category == "Billing":
return "billing_team"
elif category == "Technical":
return "technical_team"
elif category == "General":
return "general_team"
else:
return "manual_review"
# Step 5: Build the Graph with a Router Pattern
def build_router_graph() -> StateGraph:
workflow = StateGraph(TicketState)
# Add nodes
workflow.add_node("router", router_agent) # Entry point: Categorizes the ticket
workflow.add_node("billing_team", billing_team_agent) # Handles billing tickets
workflow.add_node("technical_team", technical_team_agent) # Handles technical tickets
workflow.add_node("general_team", general_team_agent) # Handles general inquiries
workflow.add_node("manual_review", manual_review_agent) # Handles uncategorized tickets
# Set the entry point
workflow.set_entry_point("router")
# Add conditional edges for routing
workflow.add_conditional_edges(
"router",
route_ticket, # Router function to determine the next node
{
"billing_team": "billing_team",
"technical_team": "technical_team",
"general_team": "general_team",
"manual_review": "manual_review"
}
)
# Add edges from each team to END
workflow.add_edge("billing_team", END)
workflow.add_edge("technical_team", END)
workflow.add_edge("general_team", END)
workflow.add_edge("manual_review", END)
return workflow.compile()
# Step 6: Run the Workflow
def main():
# Test cases for different ticket categories
test_tickets = [
"I have a billing issue with my last invoice. It seems I was overcharged.",
"My app keeps crashing with a technical error. Please help!",
"I have a general question about your services. Can you provide more info?",
"I need assistance with something unrelated to billing or technical issues."
]
for ticket_text in test_tickets:
# Initialize the state for each ticket
initial_state: TicketState = {
"ticket_text": ticket_text,
"category": "",
"resolution": "",
"processing_time": 0.0
}
print(f"\n=== Processing Ticket: '{ticket_text}' ===")
app = build_router_graph()
start_time = time.time()
result = app.invoke(initial_state, config=RunnableConfig())
total_time = time.time() - start_time
print("\n=== Ticket Results ===")
print(f"Category: {result['category']}")
print(f"Resolution: {result['resolution']}")
print(f"Total Processing Time: {result['processing_time']:.2f} seconds")
print(f"Total Wall Clock Time: {total_time:.2f} seconds")
print("-" * 50)
if __name__ == "__main__":
main()
输出:
=== Processing Ticket: 'I have a billing issue with my last invoice. It seems I was overcharged.' ===
Router Agent: Analyzing ticket...
Router Agent: Categorized as 'Billing' in 0.00 seconds
Routing: Ticket category is 'Billing'
Billing Team Agent: Processing ticket...
Billing Team Agent: Completed in 0.00 seconds
=== Ticket Results ===
Category: Billing
Resolution: Billing Team: Reviewed ticket 'I have a billing issue with my last invoice. It seems I was overcharged.'. Please check your invoice details or contact our billing department for further assistance.
Total Processing Time: 0.00 seconds
Total Wall Clock Time: 1.03 seconds
--------------------------------------------------
=== Processing Ticket: 'My app keeps crashing with a technical error. Please help!' ===
Router Agent: Analyzing ticket...
Router Agent: Categorized as 'Technical' in 0.00 seconds
Routing: Ticket category is 'Technical'
Technical Team Agent: Processing ticket...
Technical Team Agent: Completed in 0.00 seconds
=== Ticket Results ===
Category: Technical
Resolution: Technical Team: Reviewed ticket 'My app keeps crashing with a technical error. Please help!'. Please try restarting your device or submit a detailed error log for further investigation.
Total Processing Time: 0.00 seconds
Total Wall Clock Time: 1.50 seconds
--------------------------------------------------
=== Processing Ticket: 'I have a general question about your services. Can you provide more info?' ===
Router Agent: Analyzing ticket...
Router Agent: Categorized as 'General' in 0.00 seconds
Routing: Ticket category is 'General'
General Team Agent: Processing ticket...
General Team Agent: Completed in 0.00 seconds
=== Ticket Results ===
Category: General
Resolution: General Team: Reviewed ticket 'I have a general question about your services. Can you provide more info?'. For more information, please refer to our FAQ or contact us via email.
Total Processing Time: 0.00 seconds
Total Wall Clock Time: 0.80 seconds
--------------------------------------------------
=== Processing Ticket: 'I need assistance with something unrelated to billing or technical issues.' ===
Router Agent: Analyzing ticket...
Router Agent: Categorized as 'Billing' in 0.00 seconds
Routing: Ticket category is 'Billing'
Billing Team Agent: Processing ticket...
Billing Team Agent: Completed in 0.00 seconds
=== Ticket Results ===
Category: Billing
Resolution: Billing Team: Reviewed ticket 'I need assistance with something unrelated to billing or technical issues.'. Please check your invoice details or contact our billing department for further assistance.
Total Processing Time: 0.00 seconds
Total Wall Clock Time: 1.00 seconds
--------------------------------------------------
动态路由机制:路由智能体(router_agent)判定事务类型,route_ticket函数通过add_conditional_edges方法实现条件分支路由
条件驱动流程:
◆ 与并行模式(多节点同时运行)不同,路由模式仅执行符合条件的分支路径
◆ 事务分类精准导向(如技术支持类转给QA团队,账单问题转财务团队)
系统扩展性:
◆ 横向扩展:通过新增处理节点支持更多事务类型(例如增加安全事件处理节点)
◆ 路由函数升级:更新route_ticket的条件判断逻辑即可接入新业务分类
◆ 模块化架构:新增团队服务模块无需修改核心路由框架
2.1.5 聚合器(或合成器)
由一个聚合器智能体将贡献输出的智能体收集到一起并将输出合成为最终结果。
例子:社交媒体情绪分析聚合器
代码:
from typing import Dict, Any, TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import time
from typing_extensions import Annotated
from operator import add
# Step 1: Define the State
class SocialMediaState(TypedDict):
twitter_posts: List[str]
instagram_posts: List[str]
reddit_posts: List[str]
twitter_sentiment: Dict[str, float]
instagram_sentiment: Dict[str, float]
reddit_sentiment: Dict[str, float]
final_report: str
processing_time: Annotated[float, add]
# Step 2: Define the Post Collection Agents
def collect_twitter_posts(state: SocialMediaState) -> Dict[str, Any]:
print("Twitter Agent: Collecting posts...")
start_time = time.time()
posts = [
"Loving the new product from this brand! Amazing quality.",
"Terrible customer service from this brand. Very disappointed."
]
time.sleep(1) # Simulate processing time
processing_time = time.time() - start_time # Include time.sleep in processing_time
print(f"Twitter Agent: Completed in {processing_time:.2f} seconds")
return {
"twitter_posts": posts,
"processing_time": processing_time
}
def collect_instagram_posts(state: SocialMediaState) -> Dict[str, Any]:
print("Instagram Agent: Collecting posts...")
start_time = time.time()
posts = [
"Beautiful design by this brand! #loveit",
"Not impressed with the latest release. Expected better."
]
time.sleep(1.2) # Simulate processing time
processing_time = time.time() - start_time
print(f"Instagram Agent: Completed in {processing_time:.2f} seconds")
return {
"instagram_posts": posts,
"processing_time": processing_time
}
def collect_reddit_posts(state: SocialMediaState) -> Dict[str, Any]:
print("Reddit Agent: Collecting posts...")
start_time = time.time()
posts = [
"This brand is awesome! Great value for money.",
"Had a bad experience with their support team. Not happy."
]
time.sleep(0.8) # Simulate processing time
processing_time = time.time() - start_time
print(f"Reddit Agent: Completed in {processing_time:.2f} seconds")
return {
"reddit_posts": posts,
"processing_time": processing_time
}
# Step 3: Define the Sentiment Analysis Agents
def analyze_twitter_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("Twitter Sentiment Agent: Analyzing sentiment...")
start_time = time.time()
posts = state["twitter_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else 0.0
time.sleep(0.5) # Simulate processing time
processing_time = time.time() - start_time
print(f"Twitter Sentiment Agent: Completed in {processing_time:.2f} seconds")
return {
"twitter_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
def analyze_instagram_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("Instagram Sentiment Agent: Analyzing sentiment...")
start_time = time.time()
posts = state["instagram_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else 0.0
time.sleep(0.6) # Simulate processing time
processing_time = time.time() - start_time
print(f"Instagram Sentiment Agent: Completed in {processing_time:.2f} seconds")
return {
"instagram_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
def analyze_reddit_sentiment(state: SocialMediaState) -> Dict[str, Any]:
print("Reddit Sentiment Agent: Analyzing sentiment...")
start_time = time.time()
posts = state["reddit_posts"]
polarities = [TextBlob(post).sentiment.polarity for post in posts]
avg_polarity = sum(polarities) / len(polarities) if polarities else 0.0
time.sleep(0.4) # Simulate processing time
processing_time = time.time() - start_time
print(f"Reddit Sentiment Agent: Completed in {processing_time:.2f} seconds")
return {
"reddit_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},
"processing_time": processing_time
}
# Step 4: Define the Aggregator Agent
def aggregate_results(state: SocialMediaState) -> Dict[str, Any]:
print("Aggregator Agent: Generating final report...")
start_time = time.time()
twitter_sentiment = state["twitter_sentiment"]
instagram_sentiment = state["instagram_sentiment"]
reddit_sentiment = state["reddit_sentiment"]
total_posts = (twitter_sentiment["num_posts"] +
instagram_sentiment["num_posts"] +
reddit_sentiment["num_posts"])
weighted_polarity = (
twitter_sentiment["average_polarity"] * twitter_sentiment["num_posts"] +
instagram_sentiment["average_polarity"] * instagram_sentiment["num_posts"] +
reddit_sentiment["average_polarity"] * reddit_sentiment["num_posts"]
) / total_posts if total_posts > 0 else 0.0
overall_sentiment = ("Positive" if weighted_polarity > 0 else
"Negative" if weighted_polarity < 0 else "Neutral")
report = (
f"Overall Sentiment: {overall_sentiment} (Average Polarity: {weighted_polarity:.2f})\n"
f"Twitter Sentiment: {twitter_sentiment['average_polarity']:.2f} (Posts: {twitter_sentiment['num_posts']})\n"
f"Instagram Sentiment: {instagram_sentiment['average_polarity']:.2f} (Posts: {instagram_sentiment['num_posts']})\n"
f"Reddit Sentiment: {reddit_sentiment['average_polarity']:.2f} (Posts: {reddit_sentiment['num_posts']})"
)
time.sleep(0.3) # Simulate processing time
processing_time = time.time() - start_time
print(f"Aggregator Agent: Completed in {processing_time:.2f} seconds")
return {
"final_report": report,
"processing_time": processing_time
}
# Step 5: Build the Graph with an Aggregator Pattern
def build_aggregator_graph() -> StateGraph:
workflow = StateGraph(SocialMediaState)
# Add nodes for collecting posts
workflow.add_node("collect_twitter", collect_twitter_posts)
workflow.add_node("collect_instagram", collect_instagram_posts)
workflow.add_node("collect_reddit", collect_reddit_posts)
# Add nodes for sentiment analysis
workflow.add_node("analyze_twitter", analyze_twitter_sentiment)
workflow.add_node("analyze_instagram", analyze_instagram_sentiment)
workflow.add_node("analyze_reddit", analyze_reddit_sentiment)
# Add node for aggregation
workflow.add_node("aggregate", aggregate_results)
# Add a branching node to trigger all collection nodes in parallel
workflow.add_node("branch", lambda state: state)
# Set the entry point to the branch node
workflow.set_entry_point("branch")
# Add edges from branch to collection nodes (parallel execution)
workflow.add_edge("branch", "collect_twitter")
workflow.add_edge("branch", "collect_instagram")
workflow.add_edge("branch", "collect_reddit")
# Add edges from collection to sentiment analysis
workflow.add_edge("collect_twitter", "analyze_twitter")
workflow.add_edge("collect_instagram", "analyze_instagram")
workflow.add_edge("collect_reddit", "analyze_reddit")
# Add edges from sentiment analysis to aggregator
workflow.add_edge("analyze_twitter", "aggregate")
workflow.add_edge("analyze_instagram", "aggregate")
workflow.add_edge("analyze_reddit", "aggregate")
# Add edge from aggregator to END
workflow.add_edge("aggregate", END)
return workflow.compile()
# Step 6: Run the Workflow
def main():
initial_state: SocialMediaState = {
"twitter_posts": [],
"instagram_posts": [],
"reddit_posts": [],
"twitter_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"instagram_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"reddit_sentiment": {"average_polarity": 0.0, "num_posts": 0},
"final_report": "",
"processing_time": 0.0
}
print("\nStarting social media sentiment analysis...")
app = build_aggregator_graph()
start_time = time.time()
config = RunnableConfig(parallel=True)
result = app.invoke(initial_state, config=config)
total_time = time.time() - start_time
print("\n=== Sentiment Analysis Results ===")
print(result["final_report"])
print(f"\nTotal Processing Time: {result['processing_time']:.2f} seconds")
print(f"Total Wall Clock Time: {total_time:.2f} seconds")
if __name__ == "__main__":
main()
输出:
Starting social media sentiment analysis...
Instagram Agent: Collecting posts...
Reddit Agent: Collecting posts...
Twitter Agent: Collecting posts...
Reddit Agent: Completed in 0.80 seconds
Twitter Agent: Completed in 1.00 seconds
Instagram Agent: Completed in 1.20 seconds
Instagram Sentiment Agent: Analyzing sentiment...
Reddit Sentiment Agent: Analyzing sentiment...
Twitter Sentiment Agent: Analyzing sentiment...
Reddit Sentiment Agent: Completed in 0.40 seconds
Twitter Sentiment Agent: Completed in 0.50 seconds
Instagram Sentiment Agent: Completed in 0.60 seconds
Aggregator Agent: Generating final report...
Aggregator Agent: Completed in 0.30 seconds
=== Sentiment Analysis Results ===
Overall Sentiment: Positive (Average Polarity: 0.15)
Twitter Sentiment: -0.27 (Posts: 2)
Instagram Sentiment: 0.55 (Posts: 2)
Reddit Sentiment: 0.18 (Posts: 2)
Total Processing Time: 4.80 seconds
Total Wall Clock Time: 2.13 seconds
并行执行:数据收集与分析节点并行执行,实际总耗时(2.1秒)显著低于各任务累计耗时(3.8秒)
聚合:聚合节点将情绪分析结果合并成一份最终报告,计算整体情绪指数,然后由平台提供明细。
2.1.6 网络(或横向)
智能体之间以多对多的方式相互直接沟通,形成去中心化网络。
这种架构对于智能体没有明显的层次关系或具体的调用序列的问题来说很适合。
代码:
from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
model = ChatOpenAI()
def agent_1(state: MessagesState) -> Command[Literal["agent_2", "agent_3", END]]:
# you can pass relevant parts of the state to the LLM (e.g., state["messages"])
# to determine which agent to call next. a common pattern is to call the model
# with a structured output (e.g. force it to return an output with a "next_agent" field)
response = model.invoke(...)
# route to one of the agents or exit based on the LLM's decision
# if the LLM returns "__end__", the graph will finish execution
return Command(
goto=response["next_agent"],
update={"messages": [response["content"]]},
)
def agent_2(state: MessagesState) -> Command[Literal["agent_1", "agent_3", END]]:
response = model.invoke(...)
return Command(
goto=response["next_agent"],
update={"messages": [response["content"]]},
)
def agent_3(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:
...
return Command(
goto=response["next_agent"],
update={"messages": [response["content"]]},
)
builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_node(agent_3)
builder.add_edge(START, "agent_1")
network = builder.compile()
好处:分布式协作,集体决策。即便部分智能体失效系统仍能工作。
坏处:管理智能体间的沟通很有挑战。更多沟通可能会导致低效,做重复工。
2.1.7 移交
在多智能体架构中,智能体可被建模为图结构中的节点。每个智能体节点执行自身任务后,将决策继续执行流程——选择终止任务或将控制流移交其他智能体(包括可能的路由回自身形成循环处理)。多智能体协作中的典型模式即工作移交机制,该机制允许指定:
◆ 目标节点:控制流转移的目标智能体(例如指定下一跳节点名称)
◆ 有效载荷:传递给目标智能体的上下文信息(如状态更新数据)
要想在LangGraph中实现移交,智能体节点可返回Command对象,从而将控制流和状态更新组合到一起:
def agent(state) -> Command[Literal["agent", "another_agent"]]:
# the condition for routing/halting can be anything, e.g. LLM tool call / structured output, etc.
goto = get_next_agent(...) # 'agent' / 'another_agent'
return Command(
# Specify which agent to call next
goto=goto,
# Update the graph state
update={"my_state_key": "my_state_value"}
)
在更复杂的场景中,当每个智能体节点自身也是一个图结构(即子图)时,某个智能体子图中的节点可能需要导航至其他智能体。例如存在两个智能体Alice和Bob(作为父级图中的子图节点),当Alice需要导航至Bob时,可通过在指令对象中设置 graph=Command.PARENT 实现:
def some_node_inside_alice(state)
return Command(
goto="bob",
update={"my_state_key": "my_state_value"},
# specify which graph to navigate to (defaults to the current graph)
graph=Command.PARENT,
)
注:
若需实现使用Command(graph=Command.PARENT)的子图间通信可视化,必须通过Command注解进行节点函数封装。正确做法如下:
builder.add_node(alice)
你可能还需要:
def call_alice(state) -> Command[Literal["bob"]]:
return alice.invoke(state)
builder.add_node("alice", call_alice)
移交作为工具使用
最常见的智能体类型之一当属ReAct型工具调用智能体。此类智能体其中一种常见模式是将移交打包进工具调用里,比如:
def transfer_to_bob(state):
"""Transfer to bob."""
return Command(
goto="bob",
update={"my_state_key": "my_state_value"},
graph=Command.PARENT,
)
特殊场景说明
这是工具调用同时更新状态和控制流的特殊实现方式,其核心机制包含双重操作:
◆ 状态更新:修改当前图结构的共享状态
◆ 流程控制:指定后续执行路径
重要实现指南
若需使用返回Command对象的工具,可选择两种实现方式:
1)采用预构建组件:使用 create_react_agent / ToolNode 等预制模块
2)自定义工具执行节点:实现工具调用逻辑并收集返回的Command对象,例如:
def call_tools(state):
...
# 执行所有工具调用并收集指令
commands = [tools_by_name[tool_call["name"]].invoke(tool_call)
for tool_call in tool_calls]
return commands # 返回指令集合
接下来我们将深入解析各类多智能体架构的设计模式。
2.1.8 监督者架构
在这种架构下,我们通过以下方式实现智能体协作:
◆ 节点化智能体:将各功能模块抽象为图节点(如数据清洗节点、模型推理节点)
◆ 监督节点:部署大语言模型(LLM)作为中央决策单元,动态决定后续执行路径
◆ 指令路由机制:基于监督节点的决策,通过Command对象路由至目标节点
from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
model = ChatOpenAI()
def supervisor(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:
# you can pass relevant parts of the state to the LLM (e.g., state["messages"])
# to determine which agent to call next. a common pattern is to call the model
# with a structured output (e.g. force it to return an output with a "next_agent" field)
response = model.invoke(...)
# route to one of the agents or exit based on the supervisor's decision
# if the supervisor returns "__end__", the graph will finish execution
return Command(goto=response["next_agent"])
def agent_1(state: MessagesState) -> Command[Literal["supervisor"]]:
# you can pass relevant parts of the state to the LLM (e.g., state["messages"])
# and add any additional logic (different models, custom prompts, structured output, etc.)
response = model.invoke(...)
return Command(
goto="supervisor",
update={"messages": [response]},
)
def agent_2(state: MessagesState) -> Command[Literal["supervisor"]]:
response = model.invoke(...)
return Command(
goto="supervisor",
update={"messages": [response]},
)
builder = StateGraph(MessagesState)
builder.add_node(supervisor)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_edge(START, "supervisor")
supervisor = builder.compile()
2.1.9 监督者(工具调用式架构)
作为监督者架构的变体实现,工作模式如下:
核心机制
◆ 智能体工具化:将独立智能体抽象为可调用工具(如数据清洗工具、API调用工具)
◆ 监督节点LLM:部署具备工具调用能力的LLM作为决策中枢
◆ 双节点架构:
LLM节点:执行逻辑推理,生成工具调用指令
工具调用节点:执行具体工具(即各智能体功能模块)
from typing import Annotated
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import InjectedState, create_react_agent
model = ChatOpenAI()
# this is the agent function that will be called as tool
# notice that you can pass the state to the tool via InjectedState annotation
def agent_1(state: Annotated[dict, InjectedState]):
# you can pass relevant parts of the state to the LLM (e.g., state["messages"])
# and add any additional logic (different models, custom prompts, structured output, etc.)
response = model.invoke(...)
# return the LLM response as a string (expected tool response format)
# this will be automatically turned to ToolMessage
# by the prebuilt create_react_agent (supervisor)
return response.content
def agent_2(state: Annotated[dict, InjectedState]):
response = model.invoke(...)
return response.content
tools = [agent_1, agent_2]
# the simplest way to build a supervisor w/ tool-calling is to use prebuilt ReAct agent graph
# that consists of a tool-calling LLM node (i.e. supervisor) and a tool-executing node
supervisor = create_react_agent(model, tools)
2.1.10 分层(垂直)架构
智能体以树状结构组织,高层级智能体(监督者)负责管理底层智能体。
当系统扩展时,单一监督者管理众多智能体会导致监督者难以有效选出下一执行节点,上下文可能会变得过于复杂,导致单个监督者无法追踪所有智能体的状态变化。换句话说,最终会遭遇最初促使采用多智能体架构的同类问题。
为此,你可以采用分层架构设计。比方说,可以建立独立、专业的智能体团队,分别由专属监督者管理,然后再设置全局监督者来协调各支团队。
from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()
# define team 1 (same as the single supervisor example above)
def team_1_supervisor(state: MessagesState) -> Command[Literal["team_1_agent_1", "team_1_agent_2", END]]:
response = model.invoke(...)
return Command(goto=response["next_agent"])
def team_1_agent_1(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:
response = model.invoke(...)
return Command(goto="team_1_supervisor", update={"messages": [response]})
def team_1_agent_2(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:
response = model.invoke(...)
return Command(goto="team_1_supervisor", update={"messages": [response]})
team_1_builder = StateGraph(Team1State)
team_1_builder.add_node(team_1_supervisor)
team_1_builder.add_node(team_1_agent_1)
team_1_builder.add_node(team_1_agent_2)
team_1_builder.add_edge(START, "team_1_supervisor")
team_1_graph = team_1_builder.compile()
# define team 2 (same as the single supervisor example above)
class Team2State(MessagesState):
next: Literal["team_2_agent_1", "team_2_agent_2", "__end__"]
def team_2_supervisor(state: Team2State):
...
def team_2_agent_1(state: Team2State):
...
def team_2_agent_2(state: Team2State):
...
team_2_builder = StateGraph(Team2State)
...
team_2_graph = team_2_builder.compile()
# define top-level supervisor
builder = StateGraph(MessagesState)
def top_level_supervisor(state: MessagesState) -> Command[Literal["team_1_graph", "team_2_graph", END]]:
# you can pass relevant parts of the state to the LLM (e.g., state["messages"])
# to determine which team to call next. a common pattern is to call the model
# with a structured output (e.g. force it to return an output with a "next_team" field)
response = model.invoke(...)
# route to one of the teams or exit based on the supervisor's decision
# if the supervisor returns "__end__", the graph will finish execution
return Command(goto=response["next_team"])
builder = StateGraph(MessagesState)
builder.add_node(top_level_supervisor)
builder.add_node("team_1_graph", team_1_graph)
builder.add_node("team_2_graph", team_2_graph)
builder.add_edge(START, "top_level_supervisor")
builder.add_edge("team_1_graph", "top_level_supervisor")
builder.add_edge("team_2_graph", "top_level_supervisor")
graph = builder.compile()
好处:不同层级智能体间的角色与职责分工明确。沟通顺畅。适合有着结构化决策流的大型系统。
坏处:顶层失效会破坏整个系统。底层智能体的独立性有限。
2.1.11 自定义多智能体工作流
这种架构具备以下核心特征:
选择性通信:每个智能体仅与特定子集智能体交互
混合控制流:部分流程采用确定性规则,部分流程由智能体自主决策
预定义与动态结合:提前定义基础执行顺序,同时允许特定节点动态调整后续路径
在LangGraph框架下,可通过两种方式构建工作流:
显式控制流(标准边连接):通过标准图边(normal edges)显式定义应用的控制流(如智能体间沟通的顺序),这种属于确定性最强的一种——任何时候都能提前知道下一个被调用的智能体是哪个。
动态控制流(指令驱动):通过Command对象实现LLM驱动的流程控制。典型应用如监督者工具调用架构,监督者LLM动态选择工具/智能体调用顺序。
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START
model = ChatOpenAI()
def agent_1(state: MessagesState):
response = model.invoke(...)
return {"messages": [response]}
def agent_2(state: MessagesState):
response = model.invoke(...)
return {"messages": [response]}
builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
# define the flow explicitly
builder.add_edge(START, "agent_1")
builder.add_edge("agent_1", "agent_2")
3. 智能体间的通信
开发多智能体系统最重要的事情是设计有效的通信方式,需考虑以下关键因素:
◆ 通信载体选择:通过图状态(graph state)传递还是工具调用(tool calls)交互?
◆ 模式兼容问题:不同智能体的状态模式(state schemas)差异如何处理?
◆ 消息共享机制:如何通过共享消息列表实现协作?
3.1 图状态传递 vs 工具调用
智能体间传递的“载荷”是什么?在上述讨论过的大多数架构下,智能体间的通信都是通过图状态进行的。在带工具调用的监督者的情况下,载荷是工具调用参数。
图状态
要通过图状态进行通信,需将各个智能体定义为图节点。这些节点可以是函数或完整子图。在图执行的每个步骤中,智能体节点接收当前图状态,执行代码后将更新后的状态传递给后续节点。
比方说,在社交媒体聚合系统的代码里,我们是这样定义图状态的:
通常智能体节点共享统一状态模式。但有时需要设计不同状态模式。
3.2 不同状态模式
某个智能体可能需要与其他智能体不同的状态模式。比如搜索智能体只需跟踪查询和检索文档。在LangGraph中可通过两种方式实现:
◆ 定义具有独立状态模式的子图智能体。若子图与父图没有共享状态键(通道),需添加输入/输出转换逻辑
◆ 为智能体节点函数定义私有输入状态模式,该模式与全局图状态模式相隔离
3.3 共享消息列表
最常见的通信方式是通过共享状态通道(通常是消息列表)。这要求至少存在一个所有智能体共享的通道(键)。使用共享消息列表时需考虑:智能体应该共享完整思维过程还是仅共享最终结果?
◆ 共享完整历史
智能体共享完整思维过程(即" scratchpad "),通常表现为消息列表。优势是帮助其他智能体做出更好决策,提升系统整体推理能力。缺点是随着复杂度增加需要额外内存管理策略
◆ 共享最终结果
智能体保留私有" scratchpad ",仅共享最终结果。适合复杂系统或多智能体场景。此时需定义不同状态模式
对于工具调用的智能体:
◆ 监督者根据工具模式决定输入参数
◆ LangGraph支持运行时传递状态,子智能体可访问父级状态
4. 结论
多智能体LLM系统通过整合并行、顺序、路由器和聚合器等多样化架构模式,为解决复杂任务提供了强大的范式支持。
通过详细探讨共享状态、消息列表和工具调用等通信机制,我们揭示了智能体如何通过协作实现无缝协调。
延伸阅读:















