AI Agent 监控与可观测性:2025 生产实践指南

他山之石 2026-01-27 15:27:19

🎯 核心要点(TL;DR)

  • 监控五大关键指标: 成功率(目标 >99%)、响应延迟(p95 <3 秒)、Token 用量、错误率,以及业务层面的结果指标
  • 成本追踪至关重要: 日处理 1 亿+ 请求的团队通过监控 Token 用量和设置预算告警,每月节省 5 万美元以上
  • 分布式追踪揭示完整链路: 追踪每一次 LLM 调用、工具执行和决策点,问题排查效率提升 10 倍
  • 告警要基于持续异常,而非瞬时波动: 当成功率连续 5 分钟以上低于 95% 时才触发紧急告警,避免告警疲劳
  • 采用分层监控架构: Prometheus + Grafana 负责指标,Jaeger 负责链路追踪,LangSmith 等 Agent 专用工具负责行为分析
  • 使用带 Trace ID 的结构化日志: JSON 格式日志配合统一的 Trace ID,实现快速排查和根因定位

📋 目录

  1. 引言:为什么 AI Agent 需要可观测性
  2. 必须追踪的五大关键指标
  3. LLM 成本的监控与管控
  4. 结构化日志最佳实践
  5. 复杂 Agent 的分布式追踪
  6. 如何设计不会"狼来了"的告警系统
  7. 生产环境 Agent 故障排查
  8. 代码实战指南

为什么 AI Agent 需要可观测性

AI Agent 可观测性定义: AI Agent 可观测性是指从自主运行的 AI 系统中采集指标、日志和追踪数据,以理解其在生产环境中的内部状态和行为。与传统监控只追踪预定义指标不同,可观测性让团队能够对 Agent 的决策过程、资源消耗和故障模式提出任意问题——即使这些问题在开发阶段从未被预料到。

当你把 AI Agent 部署到生产环境时,你实际上是在释放一个能自主决策、调用工具、与用户交互的系统,而且不需要持续的人工干预。这种自主性带来了传统应用可观测性无法解决的独特监控挑战。

在传统软件中,你监控请求量、错误码和延迟就够了。但对于 2025 年的 AI Agent,你需要追踪的是:Agent 是否在做正确的决策、是否选择了合适的工具、是否控制在成本预算内、质量是否随时间保持稳定。这是本质性的区别:传统应用的故障是可预测的,会给出堆栈跟踪和错误码;而 Agent 可能"静默失败"——做出糟糕的决策或质量逐渐下滑,却不产生任何明显的错误。

📊 行业数据(2025)

来自大规模运行 AI Agent 团队的生产数据:

  • 67% 的生产 AI Agent 故障是被用户发现的,而非监控系统
  • 拥有完善可观测性的团队问题排查速度快 10 倍(平均解决时间:12 分钟 vs 2 小时)
  • 合理的成本监控平均每月每个生产 Agent 可避免 8,000 美元的意外 LLM 费用
  • 具备分布式追踪的系统平均故障恢复时间(MTTR)降低 73%
  • 监控 20+ 指标的生产团队可维持 99.9% 的可用性,而监控少于 10 个指标的团队仅能达到 95%

必须追踪的五大关键指标

生产 AI Agent 需要追踪几十个指标,但有五个是维护 2025 年系统可靠性和性能的绝对核心。这些指标构成了所有生产监控系统的基础。

1. 成功率与完成度指标

成功率衡量的是没有错误、成功完成的 Agent 请求占比。生产系统的目标成功率应达到 99% 或更高。这个指标与传统 HTTP 成功率不同,因为 Agent 可能返回 200 状态码,但由于决策失误、幻觉(Hallucination)或工具故障,实际上并未完成任务。

成功率的计算需要追踪三种状态:成功完成、失败请求(错误、超时、崩溃)和降级响应(完成但存在质量问题)。要在多个时间窗口(1 分钟、5 分钟、1 小时、24 小时)追踪成功率,以便同时识别突发事故和渐进式退化。

Python - 成功率追踪

from prometheus_client import Counter, Histogram
import time

# 定义指标
agent_requests_total = Counter(
    'agent_requests_total',
    'Total number of agent requests',
    ['agent_type', 'status']
)

agent_request_duration = Histogram(
    'agent_request_duration_seconds',
    'Time spent processing agent request',
    ['agent_type'],
    buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]
)

def track_agent_request(agent_type: str, success: bool, duration: float):
    """追踪 Agent 请求指标"""
    status = 'success' if success else 'failure'
    agent_requests_total.labels(agent_type=agent_type, status=status).inc()
    agent_request_duration.labels(agent_type=agent_type).observe(duration)

# 在 Agent 执行中使用
async def execute_agent(agent_type: str, input_data: dict):
    start_time = time.time()
    try:
        result = await agent.run(input_data)
        duration = time.time() - start_time
        track_agent_request(agent_type, success=True, duration=duration)
        return result
    except Exception as e:
        duration = time.time() - start_time
        track_agent_request(agent_type, success=False, duration=duration)
        raise

2. 响应延迟与性能

AI Agent 的延迟从初始请求到最终响应,包括所有 LLM 调用、工具执行和处理时间。2025 年生产系统的目标是 p50 延迟低于 1 秒,p95 延迟低于 3 秒。但可接受的延迟因场景而异:聊天机器人需要亚秒级响应,而分析型 Agent 可以容忍 10-30 秒。

要追踪多个百分位的延迟(p50、p90、p95、p99),因为平均值会掩盖异常值。p99 的单个慢请求可能暗示系统性问题,比如低效的工具调用或失控的循环。按 Agent 类型、用户群体和时段监控延迟,以识别规律。

3. Token 用量与单请求成本

Token 消耗直接关联成本,是生产环境 LLM 费用的主要驱动因素。截至 2025 年,GPT-4 的价格约为每 1000 输入 Token $0.03,每 1000 输出 Token $0.06。对于处理数百万请求的 Agent,未优化的 Token 使用可能导致每月数十万美元的成本。

Token 成本计算公式: 单请求成本 = (输入 Token 数 / 1000 × 输入价格) + (输出 Token 数 / 1000 × 输出价格)。举例:一个使用 GPT-4、包含 500 输入 Token 和 200 输出 Token 的请求:(500/1000 × $0.03) + (200/1000 × $0.06) = $0.015 + $0.012 = 每请求 $0.027。按每月 100 万请求计算,LLM 成本为 $27,000。

要追踪每请求 Token 用量、汇总日/月成本、监控每用户或每会话成本,并在多个阈值设置预算告警。日处理 1 亿+ 请求的团队通常通过 Token 监控识别和修复低效 Agent,每月节省 $50,000 以上。

Python - Token 与成本追踪

from prometheus_client import Counter, Gauge
import asyncio

# Token 和成本指标
tokens_used_total = Counter(
    'llm_tokens_used_total',
    'Total tokens used by LLM',
    ['model', 'token_type', 'agent_type']
)

cost_usd_total = Counter(
    'llm_cost_usd_total',
    'Total cost in USD',
    ['model', 'agent_type']
)

daily_cost_usd = Gauge(
    'llm_daily_cost_usd',
    'Current daily cost in USD',
    ['date']
)

# 2025 年定价(每 1K Token)
MODEL_PRICING = {
    'gpt-4': {'input': 0.03, 'output': 0.06},
    'gpt-4-turbo': {'input': 0.01, 'output': 0.03},
    'gpt-3.5-turbo': {'input': 0.0005, 'output': 0.0015}
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """计算 LLM API 调用成本"""
    pricing = MODEL_PRICING.get(model, MODEL_PRICING['gpt-4'])
    input_cost = (input_tokens / 1000) * pricing['input']
    output_cost = (output_tokens / 1000) * pricing['output']
    return input_cost + output_cost

def track_llm_usage(model: str, agent_type: str, input_tokens: int, output_tokens: int):
    """追踪 Token 用量和成本"""
    # 追踪 Token
    tokens_used_total.labels(
        model=model, 
        token_type='input', 
        agent_type=agent_type
    ).inc(input_tokens)
    
    tokens_used_total.labels(
        model=model, 
        token_type='output', 
        agent_type=agent_type
    ).inc(output_tokens)
    
    # 计算并追踪成本
    cost = calculate_cost(model, input_tokens, output_tokens)
    cost_usd_total.labels(model=model, agent_type=agent_type).inc(cost)
    
    return cost

4. 分类型错误率

并非所有错误都一样。按类别追踪错误以理解故障模式:LLM API 错误(限流、超时、服务不可用)、工具执行失败(API 错误、超时、无效响应)、Agent 逻辑错误(死循环、无效决策、约束违规)以及质量问题(幻觉、跑题、安全问题)。

每种错误类型需要不同的修复策略。LLM API 错误可能需要重试逻辑或备用模型,而 Agent 逻辑错误表明决策代码存在 Bug。质量问题可能需要 Prompt 工程或模型微调。

5. 业务指标与结果

技术指标并不能说明全部情况。要追踪与 Agent 目的相关的业务结果:销售 Agent 的转化率、客服 Agent 的问题解决率、效率 Agent 的任务完成率,以及通过反馈衡量的用户满意度。

一个 Agent 可能拥有出色的技术指标(99% 成功率、低延迟),但如果它做出糟糕的决策或提供无用的回复,仍然会在核心任务上失败。业务指标填补了技术性能与用户价值之间的鸿沟。

LLM 成本的监控与管控

LLM 成本是 2025 年生产 AI Agent 的主要运营支出。如果没有适当的监控和管控,成本可能因用量增加、低效 Prompt 或失控循环而意外飙升。有效的成本管理需要实时追踪、预算执行和优化策略。

📊 真实成本数据

基于数百个 Agent 的生产部署数据:

  • 平均单请求成本:$0.02-0.15,取决于模型和复杂度
  • 中等流量(100 万请求/月)的月成本:$20,000-150,000
  • 成本优化潜力:通过 Prompt 工程和缓存可降低 40-70%
  • 未监控的成本在 90 天内会增长 3-5 倍,原因是功能增加和用量增长

实施预算告警与防护机制

在多个层级设置预算告警,在成本飙升变得昂贵之前捕获它。创建三级告警:信息级告警在达到预期日预算的 100% 时触发,警告级告警在 150% 时触发需要调查,紧急告警在 200% 时触发可能自动限流或熔断。

监控表明问题的异常成本模式:每请求 Token 数突然飙升(可能是 Prompt 变更或失控循环)、特定用户的异常使用模式(可能是滥用)、错误率升高伴随重试(级联故障消耗 Token)、成本增速超过用户增速(效率下降)。

Python - 预算监控与告警

import asyncio
from datetime import datetime, timedelta
from typing import Dict, Optional
import logging

class CostMonitor:
    """监控和执行 LLM 成本预算"""
    
    def __init__(self, daily_budget_usd: float):
        self.daily_budget = daily_budget_usd
        self.current_day_cost = 0.0
        self.current_day = datetime.now().date()
        self.alert_thresholds = {
            'info': 1.0,      # 预算的 100%
            'warning': 1.5,   # 预算的 150%
            'critical': 2.0   # 预算的 200%
        }
        self.alerted = set()
        
    def add_cost(self, cost: float, metadata: Optional[Dict] = None) -> bool:
        """
        添加成本并检查预算。超出预算返回 False。
        """
        # 新的一天重置追踪
        today = datetime.now().date()
        if today != self.current_day:
            self.current_day = today
            self.current_day_cost = 0.0
            self.alerted.clear()
            
        self.current_day_cost += cost
        percentage = self.current_day_cost / self.daily_budget
        
        # 检查告警阈值
        for level, threshold in self.alert_thresholds.items():
            if percentage >= threshold and level not in self.alerted:
                self.send_alert(level, self.current_day_cost, percentage, metadata)
                self.alerted.add(level)
                
        # 返回是否允许请求(直到紧急级别才阻止)
        return percentage < self.alert_thresholds['critical']
    
    def send_alert(self, level: str, current_cost: float, percentage: float, 
                   metadata: Optional[Dict]):
        """通过监控系统发送告警"""
        alert_msg = (
            f"成本告警 [{level.upper()}]: 日 LLM 成本已达 "
            f"${current_cost:.2f}(${self.daily_budget:.2f} 预算的 {percentage:.1%})"
        )
        
        if level == 'critical':
            # 呼叫值班工程师
            logging.critical(alert_msg)
            # TODO: 集成 PagerDuty/Opsgenie
        elif level == 'warning':
            # 通知团队 Slack 频道
            logging.warning(alert_msg)
            # TODO: 发送 Slack 通知
        else:
            # 仅记录信息
            logging.info(alert_msg)

# 生产使用
cost_monitor = CostMonitor(daily_budget_usd=1000.0)

async def call_llm_with_budget(prompt: str, model: str = 'gpt-4'):
    """带预算执行的 LLM 调用"""
    # 检查预算是否允许请求
    if not cost_monitor.add_cost(0):  # 悲观检查
        raise Exception("日预算已超出 - 请求被阻止")
    
    # 调用 LLM
    response = await llm_client.complete(prompt, model=model)
    
    # 追踪实际成本
    cost = calculate_cost(model, response.input_tokens, response.output_tokens)
    cost_monitor.add_cost(cost, {
        'model': model,
        'tokens': response.input_tokens + response.output_tokens
    })
    
    return response

成本优化策略

通过策略性优化可降低 40-70% 的 LLM 成本。使用 Prompt 压缩在保持语义的同时减少不必要的 Token。实施语义缓存(Semantic Caching)存储和复用相似查询的响应。对简单任务使用更便宜的模型(用 GPT-3.5-turbo 替代 GPT-4)。实现流式响应(Streaming)改善感知延迟而不增加成本。在延迟要求允许时批量处理相似请求。

结构化日志最佳实践

AI Agent 的有效日志需要比传统应用更多的结构。每条日志都应该讲述 Agent 在想什么、做了什么决策、采取了什么行动的故事。JSON 格式的结构化日志支持在问题出现时快速查询、过滤和排查。

各阶段应记录什么

记录完整的 Agent 执行流程,提供足够的排查细节。在请求开始时,记录 Trace ID、用户 ID、会话 ID、输入查询或任务、时间戳。在 Agent 推理过程中,记录每个决策点、考虑和选择的工具、决策背后的推理、中间结果。对于每次 LLM 调用,记录使用的模型、消耗的 Token、Prompt 和响应、Temperature 等参数、延迟。对于工具执行,记录调用了哪个工具、输入参数、响应数据、执行时间、任何错误。在请求完成时,记录最终结果、总执行时间、总成本、成功/失败状态。

Python - 结构化日志实现

import json
import logging
import uuid
from datetime import datetime
from typing import Any, Dict, Optional

class AgentLogger:
    """带追踪上下文的 AI Agent 结构化日志器"""
    
    def __init__(self, agent_type: str):
        self.agent_type = agent_type
        self.logger = logging.getLogger(f"agent.{agent_type}")
        self.logger.setLevel(logging.INFO)
        
        # 使用 JSON 格式化器进行结构化日志
        handler = logging.StreamHandler()
        handler.setFormatter(self.JSONFormatter())
        self.logger.addHandler(handler)
        
    class JSONFormatter(logging.Formatter):
        """格式化日志为 JSON"""
        def format(self, record):
            log_data = {
                'timestamp': datetime.utcnow().isoformat(),
                'level': record.levelname,
                'message': record.getMessage(),
            }
            
            # 如果存在则添加额外字段
            if hasattr(record, 'trace_id'):
                log_data['trace_id'] = record.trace_id
            if hasattr(record, 'agent_type'):
                log_data['agent_type'] = record.agent_type
            if hasattr(record, 'extra_data'):
                log_data.update(record.extra_data)
                
            return json.dumps(log_data)
    
    def log_request_start(self, trace_id: str, user_id: str, input_data: Dict):
        """记录 Agent 请求开始"""
        self.logger.info(
            f"Agent request started",
            extra={
                'trace_id': trace_id,
                'agent_type': self.agent_type,
                'extra_data': {
                    'event': 'request_start',
                    'user_id': user_id,
                    'input': input_data
                }
            }
        )
    
    def log_llm_call(self, trace_id: str, model: str, prompt: str, 
                     response: str, tokens: Dict, latency: float, cost: float):
        """记录 LLM API 调用"""
        self.logger.info(
            f"LLM call completed: {model}",
            extra={
                'trace_id': trace_id,
                'agent_type': self.agent_type,
                'extra_data': {
                    'event': 'llm_call',
                    'model': model,
                    'prompt': prompt[:500],  # 截断以控制日志大小
                    'response': response[:500],
                    'input_tokens': tokens['input'],
                    'output_tokens': tokens['output'],
                    'latency_ms': int(latency * 1000),
                    'cost_usd': round(cost, 4)
                }
            }
        )
    
    def log_tool_execution(self, trace_id: str, tool_name: str, 
                          input_params: Dict, result: Any, latency: float, 
                          success: bool):
        """记录工具执行"""
        level = logging.INFO if success else logging.ERROR
        self.logger.log(
            level,
            f"Tool execution: {tool_name} {'succeeded' if success else 'failed'}",
            extra={
                'trace_id': trace_id,
                'agent_type': self.agent_type,
                'extra_data': {
                    'event': 'tool_execution',
                    'tool_name': tool_name,
                    'input_params': input_params,
                    'result': str(result)[:500],
                    'latency_ms': int(latency * 1000),
                    'success': success
                }
            }
        )
    
    def log_agent_decision(self, trace_id: str, decision_point: str, 
                          reasoning: str, chosen_action: str):
        """记录带推理过程的 Agent 决策"""
        self.logger.info(
            f"Agent decision: {decision_point}",
            extra={
                'trace_id': trace_id,
                'agent_type': self.agent_type,
                'extra_data': {
                    'event': 'agent_decision',
                    'decision_point': decision_point,
                    'reasoning': reasoning,
                    'chosen_action': chosen_action
                }
            }
        )
    
    def log_request_complete(self, trace_id: str, success: bool, 
                           total_time: float, total_cost: float, 
                           result: Optional[Any] = None):
        """记录请求完成"""
        level = logging.INFO if success else logging.ERROR
        self.logger.log(
            level,
            f"Agent request {'completed' if success else 'failed'}",
            extra={
                'trace_id': trace_id,
                'agent_type': self.agent_type,
                'extra_data': {
                    'event': 'request_complete',
                    'success': success,
                    'total_time_ms': int(total_time * 1000),
                    'total_cost_usd': round(total_cost, 4),
                    'result': str(result)[:500] if result else None
                }
            }
        )

# 使用示例
logger = AgentLogger('customer_support_agent')
trace_id = str(uuid.uuid4())

# 请求开始时
logger.log_request_start(trace_id, user_id='user123', 
                         input_data={'query': '如何重置密码?'})

# LLM 调用时
logger.log_llm_call(trace_id, model='gpt-4', 
                   prompt='You are a helpful assistant...',
                   response='To reset your password...',
                   tokens={'input': 150, 'output': 80},
                   latency=1.2, cost=0.015)

# 请求完成时
logger.log_request_complete(trace_id, success=True, 
                           total_time=2.5, total_cost=0.025)

日志保留与存储

通过分层日志保留来平衡排查需求和存储成本。热日志(最近 7 天)保存在 Elasticsearch 等快速存储中,便于事故响应时快速访问。温日志(8-30 天)移至访问较慢的廉价存储。冷日志(31-365 天)归档到带压缩的对象存储如 S3。除非合规要求,超过 365 天的日志可删除。

对于日处理数百万请求的高流量生产系统,日志存储每月可能花费 $5,000-20,000。对常规操作实施日志采样(只记录 1-10% 的成功请求),同时 100% 记录失败、错误和异常模式。

复杂 Agent 的分布式追踪

分布式追踪提供了对 Agent 请求流经多个服务、LLM 调用和工具执行的完整可见性。每个操作被记录为一个 Span,包含时间信息、元数据以及与父操作的关系。这创建了一个详细的 Trace,准确显示 Agent 做了什么、按什么顺序、每步花了多长时间。

分布式追踪定义: 分布式追踪是一种追踪应用请求流经各种服务和组件的方法。对于 AI Agent,一个 Trace 捕获从初始用户查询到所有 LLM 调用、工具执行和决策点的完整执行路径,在每个步骤记录时间和上下文数据。这使得调试复杂的多步骤行为和识别性能瓶颈成为可能。

为 Agent 实现 OpenTelemetry

OpenTelemetry 是 2025 年分布式追踪的行业标准,提供与 Jaeger、Tempo、Zipkin 和商业平台兼容的供应商中立检测。实现追踪的方法是:为每个 Agent 请求创建根 Span,为每次 LLM 调用创建子 Span,为每次工具执行创建子 Span,为主要决策点创建子 Span。

Python - OpenTelemetry 追踪实现

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
import time

# 初始化 OpenTelemetry
resource = Resource.create({"service.name": "ai-agent-service"})
provider = TracerProvider(resource=resource)

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

class TracedAgent:
    """带分布式追踪的 AI Agent"""
    
    async def execute(self, user_query: str, user_id: str):
        """执行带完整追踪的 Agent"""
        # 为整个请求创建根 Span
        with tracer.start_as_current_span(
            "agent.execute",
            attributes={
                "agent.type": "customer_support",
                "user.id": user_id,
                "input.query": user_query[:100]  # 截断
            }
        ) as span:
            try:
                # 阶段 1:理解意图
                intent = await self.understand_intent(user_query)
                
                # 阶段 2:收集上下文
                context = await self.gather_context(intent)
                
                # 阶段 3:生成响应
                response = await self.generate_response(intent, context)
                
                # 标记成功
                span.set_attribute("agent.success", True)
                span.set_attribute("response.length", len(response))
                
                return response
                
            except Exception as e:
                # 标记失败
                span.set_attribute("agent.success", False)
                span.set_attribute("error.type", type(e).__name__)
                span.set_attribute("error.message", str(e))
                span.record_exception(e)
                raise
    
    async def understand_intent(self, query: str):
        """用 LLM 理解用户意图"""
        with tracer.start_as_current_span(
            "agent.understand_intent",
            attributes={"input.query": query[:100]}
        ) as span:
            start_time = time.time()
            
            # 调用 LLM
            result = await self.call_llm(
                prompt=f"Classify the intent of this query: {query}",
                model="gpt-4"
            )
            
            latency = time.time() - start_time
            span.set_attribute("llm.latency_ms", int(latency * 1000))
            span.set_attribute("intent.category", result.get("intent"))
            
            return result
    
    async def call_llm(self, prompt: str, model: str):
        """带追踪的 LLM API 调用"""
        with tracer.start_as_current_span(
            "llm.call",
            attributes={
                "llm.model": model,
                "llm.prompt": prompt[:200]
            }
        ) as span:
            start_time = time.time()
            
            # 模拟 LLM 调用
            response = await llm_client.complete(prompt, model=model)
            
            # 记录指标
            latency = time.time() - start_time
            span.set_attribute("llm.latency_ms", int(latency * 1000))
            span.set_attribute("llm.input_tokens", response.input_tokens)
            span.set_attribute("llm.output_tokens", response.output_tokens)
            span.set_attribute("llm.cost_usd", 
                             calculate_cost(model, response.input_tokens, 
                                          response.output_tokens))
            
            return response
    
    async def call_tool(self, tool_name: str, params: dict):
        """带追踪的外部工具调用"""
        with tracer.start_as_current_span(
            f"tool.{tool_name}",
            attributes={
                "tool.name": tool_name,
                "tool.params": str(params)
            }
        ) as span:
            start_time = time.time()
            
            try:
                result = await execute_tool(tool_name, params)
                latency = time.time() - start_time
                
                span.set_attribute("tool.success", True)
                span.set_attribute("tool.latency_ms", int(latency * 1000))
                
                return result
                
            except Exception as e:
                span.set_attribute("tool.success", False)
                span.set_attribute("error.type", type(e).__name__)
                span.record_exception(e)
                raise

读取和分析 Trace

使用 Trace 排查生产问题:识别瓶颈(哪些操作耗时最长)、理解故障序列(错误前发生了什么)、追踪决策流程(Agent 如何得出结论)、比较成功与失败请求以发现模式。

现代追踪工具如 Jaeger 提供 Trace 时间线可视化,在单一时间线上显示所有操作及其耗时。生产团队使用分布式追踪通常能将问题解决速度提高 10 倍,因为他们能准确看到 Agent 做了什么,而不是从日志中猜测。

如何设计不会"狼来了"的告警系统

有效的告警是在捕获真实问题和避免告警疲劳之间取得平衡。告警太多,团队开始忽视它们;太少,关键问题被遗漏。关键是基于系统行为设置适当的阈值,只对需要行动的持续问题告警。

分级告警策略

基于紧急程度和影响实施三级告警。紧急告警(立即呼叫)在以下情况触发:成功率连续 5 分钟以上低于 95%、日成本超过基线的 200%、完全系统故障或无法处理请求、数据丢失或安全事件。这些需要值班工程师立即响应。

警告级告警(工作时间通知)在以下情况触发:成功率 95-98% 持续 10 分钟以上、错误率升高(正常的 2 倍)持续 15 分钟以上、延迟退化(p95 >5 秒)持续 10 分钟以上、成本达到基线的 150-200%。这些需要调查但不需要立即呼叫。

信息级告警(记录后续审查)在以下情况触发:轻微指标偏差、完成的部署或配置变更、接近但未超过资源限制、可能表明新问题的异常模式。

Prometheus 告警规则 - 生产配置

groups:
  - name: ai_agent_alerts
    interval: 30s
    rules:
      # 紧急:成功率连续 5 分钟低于 95%
      - alert: AgentSuccessRateCritical
        expr: |
          (
            sum(rate(agent_requests_total{status="success"}[5m])) by (agent_type)
            /
            sum(rate(agent_requests_total[5m])) by (agent_type)
          ) < 0.95          
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Agent {{ $labels.agent_type }} 成功率严重过低"
          description: "成功率为 {{ $value | humanizePercentage }},已连续 5 分钟低于 95% 阈值"
          
      # 警告:成功率在 95-98% 之间
      - alert: AgentSuccessRateWarning
        expr: |
          (
            sum(rate(agent_requests_total{status="success"}[5m])) by (agent_type)
            /
            sum(rate(agent_requests_total[5m])) by (agent_type)
          ) < 0.98 and > 0.95          
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Agent {{ $labels.agent_type }} 成功率下降"
          description: "成功率为 {{ $value | humanizePercentage }},已连续 10 分钟低于 98%"
          
      # 紧急:日成本异常(超过基线 200%)
      - alert: LLMCostAnomaly
        expr: |
          llm_daily_cost_usd > (avg_over_time(llm_daily_cost_usd[7d]) * 2)          
        for: 30m
        labels:
          severity: critical
        annotations:
          summary: "LLM 成本达到正常基线的 2 倍"
          description: "当前日成本:${{ $value }},超过 7 天平均值的 200%"
          
      # 警告:延迟退化
      - alert: AgentLatencyHigh
        expr: |
          histogram_quantile(0.95, 
            rate(agent_request_duration_seconds_bucket[5m])
          ) > 5          
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Agent p95 延迟超过 5 秒"
          description: "{{ $labels.agent_type }} 的 p95 延迟为 {{ $value }}s"
          
      # 信息:接近限流
      - alert: LLMRateLimitApproaching
        expr: |
          rate(llm_requests_total[1m]) > 80          
        for: 5m
        labels:
          severity: info
        annotations:
          summary: "接近 LLM API 限流阈值"
          description: "请求速率为 {{ $value }} 请求/分钟,接近 100 请求/分钟限制"

告警集成与响应

将告警与事件管理平台集成:紧急告警接入 PagerDuty 或 Opsgenie,警告和信息级接入团队聊天(Slack、企业微信),需要跟进的非紧急问题接入工单系统(Jira、Linear),以及显示告警状态和历史的仪表板。

每个告警应包含响应所需的上下文:出了什么问题、有多严重、哪个服务或 Agent 受影响、当前指标值和阈值、调查步骤的 Runbook 链接。这通过消除猜测来减少从告警到解决的时间。

生产环境 Agent 故障排查

当 Agent 在生产环境失败时,你需要系统性的方法来快速识别根因。指标、日志和 Trace 的组合提供了完整的故障画像。

逐步排查流程

首先检查仪表板以识别范围(单个用户、Agent 类型还是全系统)、时间(何时开始、是否仍在持续)和模式(特定输入类型、时段、用户群体)。查看故障时间点附近的指标,寻找错误率、延迟、Token 用量或成本的异常。

从日志或指标中找到 Trace ID,在 Jaeger 或你的追踪工具中检查分布式 Trace。Trace 显示精确的操作序列、哪个步骤失败、每个操作的耗时、以及任何错误或异常。这立即将调查范围缩小到特定的组件或调用。

审查按 Trace ID 过滤的结构化日志,查看详细上下文:Agent 收到了什么输入、做了什么决策、LLM 返回了什么、调用了哪些工具及其响应、以及任何错误消息或堆栈跟踪。

将失败请求与成功请求比较以识别差异:不同的输入模式或边界情况、特定的工具组合失败、资源约束或超时、影响特定工具的外部 API 问题。

如有可能使用生产数据在开发环境重现问题,以验证根因、测试修复方案、防止回归。

⚠️ 常见排查误区

排查 Agent 故障时避免这些常见错误:

  • 不先看指标就看日志: 指标能显示问题是孤立的还是大范围的
  • 假设错误消息就是根因: 往往只是症状而非原因
  • 不用 Trace ID 关联日志: 没有它就无法跟踪请求流程
  • 在没有可观测性的情况下在生产环境排查: 如同在黑暗中摸索
  • 不记录发现以供未来参考: 团队会重复相同的调查

代码实战指南

本节提供在你的 Agent 系统中实现全面监控的生产就绪代码。所有示例使用行业标准工具,遵循日处理数百万请求系统的最佳实践。

完整监控配置

Python - 完整的 Agent 监控类

"""
AI Agent 的生产就绪监控。
整合指标、日志、追踪和成本追踪。
"""

import asyncio
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field

from prometheus_client import Counter, Histogram, Gauge, start_http_server
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
import structlog

# 初始化结构化日志
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

@dataclass
class AgentMetrics:
    """AI Agent 的 Prometheus 指标"""
    
    # 请求指标
    requests_total: Counter = field(default_factory=lambda: Counter(
        'agent_requests_total',
        'Total number of agent requests',
        ['agent_type', 'status']
    ))
    
    request_duration: Histogram = field(default_factory=lambda: Histogram(
        'agent_request_duration_seconds',
        'Request duration in seconds',
        ['agent_type'],
        buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]
    ))
    
    # LLM 指标
    llm_calls_total: Counter = field(default_factory=lambda: Counter(
        'agent_llm_calls_total',
        'Total LLM API calls',
        ['agent_type', 'model', 'status']
    ))
    
    llm_tokens_total: Counter = field(default_factory=lambda: Counter(
        'agent_llm_tokens_total',
        'Total tokens used',
        ['agent_type', 'model', 'token_type']
    ))
    
    llm_cost_usd: Counter = field(default_factory=lambda: Counter(
        'agent_llm_cost_usd_total',
        'Total LLM cost in USD',
        ['agent_type', 'model']
    ))
    
    daily_cost_usd: Gauge = field(default_factory=lambda: Gauge(
        'agent_llm_daily_cost_usd',
        'Current daily cost',
        ['date']
    ))
    
    # 工具指标
    tool_calls_total: Counter = field(default_factory=lambda: Counter(
        'agent_tool_calls_total',
        'Total tool invocations',
        ['agent_type', 'tool_name', 'status']
    ))
    
    tool_duration: Histogram = field(default_factory=lambda: Histogram(
        'agent_tool_duration_seconds',
        'Tool execution duration',
        ['agent_type', 'tool_name'],
        buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
    ))

class ProductionAgentMonitor:
    """
    AI Agent 的完整监控方案。
    整合指标、日志、追踪和成本追踪。
    """
    
    def __init__(
        self,
        agent_type: str,
        daily_budget_usd: float = 1000.0,
        enable_metrics: bool = True,
        enable_tracing: bool = True,
        jaeger_host: str = "localhost",
        jaeger_port: int = 6831
    ):
        self.agent_type = agent_type
        self.daily_budget = daily_budget_usd
        
        # 初始化指标
        if enable_metrics:
            self.metrics = AgentMetrics()
        
        # 初始化结构化日志
        self.logger = structlog.get_logger()
        
        # 初始化追踪
        if enable_tracing:
            provider = TracerProvider()
            jaeger_exporter = JaegerExporter(
                agent_host_name=jaeger_host,
                agent_port=jaeger_port,
            )
            provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
            trace.set_tracer_provider(provider)
            self.tracer = trace.get_tracer(__name__)
        else:
            self.tracer = None
        
        # 成本追踪
        self.daily_costs = {}
        self.current_date = datetime.now().date()
        
    async def execute_agent(
        self,
        input_data: Dict[str, Any],
        user_id: str,
        agent_function: callable
    ) -> Dict[str, Any]:
        """
        执行带完整监控的 Agent。
        
        Args:
            input_data: Agent 输入
            user_id: 用户标识
            agent_function: 运行 Agent 逻辑的异步函数
            
        Returns:
            带元数据的 Agent 响应
        """
        trace_id = str(uuid.uuid4())
        start_time = time.time()
        
        # 启动根 Span
        span = None
        if self.tracer:
            span = self.tracer.start_span(
                "agent.execute",
                attributes={
                    "agent.type": self.agent_type,
                    "user.id": user_id,
                    "trace.id": trace_id
                }
            )
        
        # 记录请求开始
        self.logger.info(
            "agent_request_started",
            trace_id=trace_id,
            agent_type=self.agent_type,
            user_id=user_id,
            input=input_data
        )
        
        try:
            # 执行 Agent
            result = await agent_function(
                input_data=input_data,
                monitor=self,
                trace_id=trace_id
            )
            
            # 计算指标
            duration = time.time() - start_time
            
            # 记录成功
            self.metrics.requests_total.labels(
                agent_type=self.agent_type,
                status='success'
            ).inc()
            
            self.metrics.request_duration.labels(
                agent_type=self.agent_type
            ).observe(duration)
            
            # 记录完成
            self.logger.info(
                "agent_request_completed",
                trace_id=trace_id,
                agent_type=self.agent_type,
                duration_ms=int(duration * 1000),
                success=True
            )
            
            if span:
                span.set_attribute("agent.success", True)
                span.set_attribute("agent.duration_ms", int(duration * 1000))
                span.end()
            
            return {
                "success": True,
                "result": result,
                "trace_id": trace_id,
                "duration": duration
            }
            
        except Exception as e:
            duration = time.time() - start_time
            
            # 记录失败
            self.metrics.requests_total.labels(
                agent_type=self.agent_type,
                status='failure'
            ).inc()
            
            self.metrics.request_duration.labels(
                agent_type=self.agent_type
            ).observe(duration)
            
            # 记录错误
            self.logger.error(
                "agent_request_failed",
                trace_id=trace_id,
                agent_type=self.agent_type,
                error_type=type(e).__name__,
                error_message=str(e),
                duration_ms=int(duration * 1000)
            )
            
            if span:
                span.set_attribute("agent.success", False)
                span.set_attribute("error.type", type(e).__name__)
                span.record_exception(e)
                span.end()
            
            raise
    
    def track_llm_call(
        self,
        trace_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency: float,
        success: bool = True
    ) -> float:
        """追踪 LLM API 调用指标和成本"""
        
        # 追踪 Token
        self.metrics.llm_tokens_total.labels(
            agent_type=self.agent_type,
            model=model,
            token_type='input'
        ).inc(input_tokens)
        
        self.metrics.llm_tokens_total.labels(
            agent_type=self.agent_type,
            model=model,
            token_type='output'
        ).inc(output_tokens)
        
        # 计算成本
        cost = self._calculate_cost(model, input_tokens, output_tokens)
        
        self.metrics.llm_cost_usd.labels(
            agent_type=self.agent_type,
            model=model
        ).inc(cost)
        
        # 追踪日成本
        today = str(datetime.now().date())
        if today not in self.daily_costs:
            self.daily_costs[today] = 0.0
        self.daily_costs[today] += cost
        self.metrics.daily_cost_usd.labels(date=today).set(self.daily_costs[today])
        
        # 追踪调用状态
        status = 'success' if success else 'failure'
        self.metrics.llm_calls_total.labels(
            agent_type=self.agent_type,
            model=model,
            status=status
        ).inc()
        
        # 记录日志
        self.logger.info(
            "llm_call",
            trace_id=trace_id,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            latency_ms=int(latency * 1000),
            cost_usd=round(cost, 4),
            success=success
        )
        
        return cost
    
    def track_tool_call(
        self,
        trace_id: str,
        tool_name: str,
        latency: float,
        success: bool = True
    ):
        """追踪工具执行指标"""
        
        status = 'success' if success else 'failure'
        self.metrics.tool_calls_total.labels(
            agent_type=self.agent_type,
            tool_name=tool_name,
            status=status
        ).inc()
        
        self.metrics.tool_duration.labels(
            agent_type=self.agent_type,
            tool_name=tool_name
        ).observe(latency)
        
        self.logger.info(
            "tool_call",
            trace_id=trace_id,
            tool_name=tool_name,
            latency_ms=int(latency * 1000),
            success=success
        )
    
    def _calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """计算 LLM API 成本(2025 定价)"""
        pricing = {
            'gpt-4': {'input': 0.03, 'output': 0.06},
            'gpt-4-turbo': {'input': 0.01, 'output': 0.03},
            'gpt-3.5-turbo': {'input': 0.0005, 'output': 0.0015}
        }
        
        prices = pricing.get(model, pricing['gpt-4'])
        input_cost = (input_tokens / 1000) * prices['input']
        output_cost = (output_tokens / 1000) * prices['output']
        
        return input_cost + output_cost

# 使用示例
async def my_agent_logic(input_data: Dict, monitor: ProductionAgentMonitor, trace_id: str):
    """带监控的示例 Agent"""
    
    # 模拟 LLM 调用
    await asyncio.sleep(0.5)
    monitor.track_llm_call(
        trace_id=trace_id,
        model='gpt-4',
        input_tokens=200,
        output_tokens=150,
        latency=0.5,
        success=True
    )
    
    # 模拟工具调用
    await asyncio.sleep(0.2)
    monitor.track_tool_call(
        trace_id=trace_id,
        tool_name='search_database',
        latency=0.2,
        success=True
    )
    
    return {"answer": "Agent 返回的结果"}

# 初始化并运行
async def main():
    # 启动 Prometheus 指标服务器
    start_http_server(8000)
    
    # 创建监控器
    monitor = ProductionAgentMonitor(
        agent_type='customer_support',
        daily_budget_usd=1000.0
    )
    
    # 执行带监控的 Agent
    result = await monitor.execute_agent(
        input_data={'query': '如何重置密码?'},
        user_id='user123',
        agent_function=my_agent_logic
    )
    
    print(f"结果: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Grafana 仪表板配置

创建一目了然显示系统健康状况的综合仪表板。关键面板包括:成功率随时间变化(5 分钟窗口)、请求延迟(p50、p95、p99 百分位)、活跃请求数和吞吐量、分类型错误率、LLM Token 用量和成本(日趋势)、工具执行率和延迟、以及单请求成本趋势。

按角色组织仪表板:管理层仪表板显示业务指标和成本,工程仪表板显示技术指标和告警,SRE 仪表板显示系统健康和事故响应数据。

查看:https://orbitalai.in/orbitalai-optimized-monitoring-observability.html

快猫星云 联系方式 快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云
OpenSource
开源版
Flashcat
Flashcat
Flashduty
Flashduty