AI

人工智能相关文章

AI Agent工程化实践:从Demo到生产级系统的幂等性与可观测性设计

核心问题:为什么Demo能跑,生产会崩?

生产环境的挑战:网络不稳定、API超时、用户输入千奇百怪、多任务并发。 本文基于3个AI Agent产品落地生产的经验,总结关键工程实践。


一、Plan-and-Execute架构(LangGraph实现)

from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Annotated
import operator

class AgentState(TypedDict):
    task: str
    plan: List[str]
    current_step: int
    results: Annotated[List[str], operator.add]
    error_count: int

def planner(state):
    llm = ChatOpenAI(model="gpt-5.5-turbo", temperature=0)
    plan = llm.invoke(f"将任务拆解为5步以内的可执行步骤:{state['task']}")
    return {"plan": parse_plan(plan.content), "current_step": 0}

def executor(state):
    step = state["plan"][state["current_step"]]
    result = execute_with_retry(step, max_retries=3)
    return {"results": [result], "current_step": state["current_step"] + 1}

def should_continue(state):
    if state["current_step"] >= len(state["plan"]): return "end"
    if state["error_count"] >= 3: return "fallback"
    return "continue"

workflow = StateGraph(AgentState)
workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_conditional_edges("executor", should_continue,
    {"continue":"executor", "end":END, "fallback":"human_review"})
workflow.set_entry_point("planner")
app = workflow.compile()

二、工具调用幂等性设计(最容易忽视!)

网络超时后Agent重试,可能导致邮件发两次、数据库写入重复。

import hashlib, json

class IdempotentTool:
    def __init__(self, redis_client):
        self.redis = redis_client

    def execute(self, tool_name: str, params: dict):
        key = hashlib.md5(f"{tool_name}:{sorted(params.items())}".encode()).hexdigest()
        cache_key = f"tool_result:{key}"
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)  # 直接返回缓存,不重复执行

        result = self._execute_tool(tool_name, params)
        self.redis.setex(cache_key, 86400, json.dumps(result))
        return result

三、三层错误恢复

class RobustAgent:
    def execute_step(self, step, context):
        # 第一层:指数退避重试(临时性错误)
        for i in range(3):
            try:
                return self._call_tool(step, context)
            except TransientError:
                time.sleep(2 ** i)

        # 第二层:降级策略(备用方案)
        fallback = self._get_fallback(step)
        if fallback:
            return self._call_tool(fallback, context)

        # 第三层:人工介入
        return self._escalate_to_human(step, context)

四、OpenTelemetry链路追踪

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

def run_agent(task: str):
    with tracer.start_as_current_span("agent.run") as span:
        span.set_attribute("task", task[:200])
        for i, step in enumerate(plan):
            with tracer.start_as_current_span(f"step.{i}") as s:
                result = execute_step(step)
                s.set_attribute("success", result.success)
                s.set_attribute("tokens", result.token_count)

五、生产部署七项检查清单

  • [x] 所有工具调用支持幂等重试
  • [x] 每个工具有30s超时限制
  • [x] 单次任务Token消耗有硬上限
  • [x] Agent步骤上限20步(防死循环)
  • [x] 异常能触发人工审核
  • [x] 完整链路追踪可回溯
  • [x] Agent副作用可回滚

幂等性、错误恢复、可观测性——三个维度决定Agent能否真正上生产。