AI
人工智能相关文章
AI Agent工程化实践:从Demo到生产级系统的幂等性与可观测性设计
核心问题:为什么Demo能跑,生产会崩?
生产环境的挑战:网络不稳定、API超时、用户输入千奇百怪、多任务并发。 本文基于3个AI Agent产品落地生产的经验,总结关键工程实践。
一、Plan-and-Execute架构(LangGraph实现)
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Annotated
import operator
class AgentState(TypedDict):
task: str
plan: List[str]
current_step: int
results: Annotated[List[str], operator.add]
error_count: int
def planner(state):
llm = ChatOpenAI(model="gpt-5.5-turbo", temperature=0)
plan = llm.invoke(f"将任务拆解为5步以内的可执行步骤:{state['task']}")
return {"plan": parse_plan(plan.content), "current_step": 0}
def executor(state):
step = state["plan"][state["current_step"]]
result = execute_with_retry(step, max_retries=3)
return {"results": [result], "current_step": state["current_step"] + 1}
def should_continue(state):
if state["current_step"] >= len(state["plan"]): return "end"
if state["error_count"] >= 3: return "fallback"
return "continue"
workflow = StateGraph(AgentState)
workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_conditional_edges("executor", should_continue,
{"continue":"executor", "end":END, "fallback":"human_review"})
workflow.set_entry_point("planner")
app = workflow.compile()
二、工具调用幂等性设计(最容易忽视!)
网络超时后Agent重试,可能导致邮件发两次、数据库写入重复。
import hashlib, json
class IdempotentTool:
def __init__(self, redis_client):
self.redis = redis_client
def execute(self, tool_name: str, params: dict):
key = hashlib.md5(f"{tool_name}:{sorted(params.items())}".encode()).hexdigest()
cache_key = f"tool_result:{key}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached) # 直接返回缓存,不重复执行
result = self._execute_tool(tool_name, params)
self.redis.setex(cache_key, 86400, json.dumps(result))
return result
三、三层错误恢复
class RobustAgent:
def execute_step(self, step, context):
# 第一层:指数退避重试(临时性错误)
for i in range(3):
try:
return self._call_tool(step, context)
except TransientError:
time.sleep(2 ** i)
# 第二层:降级策略(备用方案)
fallback = self._get_fallback(step)
if fallback:
return self._call_tool(fallback, context)
# 第三层:人工介入
return self._escalate_to_human(step, context)
四、OpenTelemetry链路追踪
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def run_agent(task: str):
with tracer.start_as_current_span("agent.run") as span:
span.set_attribute("task", task[:200])
for i, step in enumerate(plan):
with tracer.start_as_current_span(f"step.{i}") as s:
result = execute_step(step)
s.set_attribute("success", result.success)
s.set_attribute("tokens", result.token_count)
五、生产部署七项检查清单
- [x] 所有工具调用支持幂等重试
- [x] 每个工具有30s超时限制
- [x] 单次任务Token消耗有硬上限
- [x] Agent步骤上限20步(防死循环)
- [x] 异常能触发人工审核
- [x] 完整链路追踪可回溯
- [x] Agent副作用可回滚
幂等性、错误恢复、可观测性——三个维度决定Agent能否真正上生产。