夜莺-Nightingale
夜莺V7
项目介绍 功能概览
部署升级 部署升级
数据接入 数据接入
告警管理 告警管理
数据查看 数据查看
功能介绍 功能介绍
API FAQ
夜莺V6
项目介绍 架构介绍
快速开始 快速开始
黄埔营
安装部署 安装部署
升级
采集器 采集器
使用手册 使用手册
API API
数据库表结构 数据库表结构
FAQ FAQ
开源生态
Prometheus
版权声明
第1章:天降奇兵 第1章:天降奇兵
第2章:探索PromQL 第2章:探索PromQL
第3章:Prometheus告警处理 第3章:Prometheus告警处理
第4章:Exporter详解 第4章:Exporter详解
第5章:数据与可视化 第5章:数据与可视化
第6章:集群与高可用 第6章:集群与高可用
第7章:Prometheus服务发现 第7章:Prometheus服务发现
第8章:监控Kubernetes 第8章:监控Kubernetes
第9章:Prometheus Operator 第9章:Prometheus Operator
参考资料

Event Update 事件处理器

概述

Event Update 是 Nightingale 告警系统中的一个事件处理器,它允许您通过 HTTP 调用外部服务来动态更新告警事件的内容。当告警事件流经此处理器时,处理器会将事件数据发送到指定的 URL,并根据外部服务的响应来更新事件信息。

工作原理

  1. 事件接收:处理器接收到告警事件
  2. 数据序列化:将事件对象转换为 JSON 格式
  3. HTTP 调用:通过 POST 请求将 JSON 数据发送到配置的 URL
  4. 响应处理:读取外部服务的响应
  5. 事件更新:将响应内容反序列化后更新原始事件
  6. 事件传递:返回更新后的事件继续流水线处理

配置说明

基础配置

URL(必填)

  • 说明:外部服务的 HTTP 接口地址
  • 格式:完整的 HTTP/HTTPS URL
  • 示例https://your-service.com/api/event-update

高级配置

授权配置

  • 授权用户名:HTTP Basic 认证的用户名
  • 授权密码:HTTP Basic 认证的密码
  • 使用场景:当外部服务需要身份验证时使用

HTTP Headers

  • 说明:自定义 HTTP 请求头
  • 格式:键值对形式
  • 默认包含Content-Type: application/json
  • 示例
    X-API-Key: your-api-key
    X-Custom-Header: custom-value
    

HTTP Proxy

  • 说明:HTTP 代理服务器地址
  • 格式http://proxy-host:porthttps://proxy-host:port
  • 使用场景:当需要通过代理访问外部服务时

Timeout

  • 说明:HTTP 请求超时时间
  • 单位:毫秒(ms)
  • 默认值:10000(10秒)
  • 建议:根据外部服务响应时间适当调整

TLS InsecureSkipVerify

  • 说明:是否跳过 TLS 证书验证
  • 默认值:关闭(验证证书)
  • 注意:仅在测试环境或私有证书时开启

外部服务接口规范

您的外部服务需要满足以下要求:

请求格式

  • 方法:POST
  • Content-Type:application/json
  • 请求体:完整的告警事件 JSON 对象

响应格式

  • Content-Type:application/json
  • 响应体:更新后的告警事件 JSON 对象
  • 状态码:建议返回 200

事件对象结构示例

{
  "id": "event-id",
  "rule_name": "规则名称",
  "metric": "指标名称",
  "severity": 2,
  "status": 1,
  "values": "告警值",
  "tags": {
    "host": "server01",
    "service": "web"
  },
  "annotations": {
    "summary": "告警摘要",
    "description": "详细描述"
  }
}

使用示例

示例 1:丰富告警信息

// 外部服务可以根据事件中的主机名查询 CMDB,
// 添加更多机器信息到 annotations 中
{ 
  ...
  "annotations": {
    "summary": "CPU使用率过高",
    "description": "服务器 server01 CPU使用率达到90%",
    "owner": "运维团队",
    "location": "北京机房A区",
    "contact": "admin@company.com"
  }
}

示例 2:动态调整告警级别

// 根据时间、业务影响等因素动态调整告警级别
{
  ...
  "severity": 1,  // 从 2 调整为 1(更高级别)
  "annotations": {
    "reason": "业务高峰期,提升告警级别"
  }
}

实际使用案例

Python HTTP 服务示例

以下是一个使用 Python 标准库的完整示例,演示如何创建一个外部服务来处理 Event Update 请求:

1. 创建 Python 服务文件 (event_processor.py)

from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import logging
from datetime import datetime
from urllib.parse import urlparse

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EventUpdateHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        """Handle POST requests"""
        try:
            # Parse request path
            parsed_path = urlparse(self.path)
            
            if parsed_path.path == '/api/event-update':
                self.handle_event_update()
            else:
                self.send_error(404, "Not Found")
                
        except Exception as e:
            logger.error(f"Error handling request: {str(e)}")
            self.send_error(500, "Internal Server Error")
    
    def do_GET(self):
        """Handle GET requests"""
        self.send_error(404, "Not Found")
    
    def handle_event_update(self):
        """Handle event update requests"""
        try:
            # Read request body
            content_length = int(self.headers.get('Content-Length', 0))
            if content_length == 0:
                self.send_error(400, "Empty request body")
                return
                
            post_data = self.rfile.read(content_length)
            event = json.loads(post_data.decode('utf-8'))
            
            logger.info(f"Received alert event: {json.dumps(event, ensure_ascii=False, indent=2)}")
            
            # Backup original event
            original_event = event.copy()
            
            # Example 1: Enrich alert information based on hostname
            # Use tags_map if available, otherwise parse tags array
            tags_dict = event.get('tags_map', {})
            if not tags_dict and 'tags' in event:
                # Parse tags array like ["key=value", "key2=value2"]
                for tag in event['tags']:
                    if '=' in tag:
                        key, value = tag.split('=', 1)
                        tags_dict[key] = value
            
            # Look for host identifier (could be 'host', 'ident', or 'instance')
            hostname = tags_dict.get('host') or tags_dict.get('ident') or tags_dict.get('instance')
            
            if hostname:
                # Simulate CMDB query to get host information
                host_info = get_host_info(hostname)
                
                # Update annotations
                if 'annotations' not in event:
                    event['annotations'] = {}
                
                event['annotations'].update({
                    'owner': host_info.get('owner', 'Unknown'),
                    'location': host_info.get('location', 'Unknown Location'),
                    'contact': host_info.get('contact', 'admin@company.com'),
                    'processed_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                    'processor': 'Python HTTP Service',
                    'host_identifier': hostname
                })
            
            # Example 2: Add custom tags to tags_map
            if 'tags_map' not in event:
                event['tags_map'] = {}
            
            event['tags_map']['processed'] = 'true'
            event['tags_map']['processor_version'] = '1.0'
            
            # Also update the tags array
            if 'tags' not in event:
                event['tags'] = []
            
            event['tags'].append('processed=true')
            event['tags'].append('processor_version=1.0')
            
            # Log processing result
            logger.info(f"Processing completed, returning updated event: {json.dumps(event, ensure_ascii=False, indent=2)}")
            
            # Return processed event
            self.send_response(200)
            self.send_header('Content-Type', 'application/json; charset=utf-8')
            self.end_headers()
            
            response_data = json.dumps(event, ensure_ascii=False).encode('utf-8')
            self.wfile.write(response_data)
            
        except json.JSONDecodeError as e:
            logger.error(f"JSON parsing error: {str(e)}")
            self.send_error(400, "Invalid JSON")
        except Exception as e:
            logger.error(f"Error processing event: {str(e)}")
            # Return original event on error to ensure alert pipeline doesn't break
            try:
                self.send_response(200)
                self.send_header('Content-Type', 'application/json; charset=utf-8')
                self.end_headers()
                response_data = json.dumps(original_event, ensure_ascii=False).encode('utf-8')
                self.wfile.write(response_data)
            except:
                self.send_error(500, "Internal Server Error")
    

    def log_message(self, format, *args):
        """Custom log format"""
        logger.info(f"{self.address_string()} - {format % args}")

def get_host_info(hostname):
    # Simulated host information database
    host_db = {
        'server01': {
            'owner': 'Operations Team',
            'location': 'Beijing Data Center Zone A',
            'contact': 'ops-team@company.com'
        },
        'server02': {
            'owner': 'Development Team',
            'location': 'Shanghai Data Center Zone B', 
            'contact': 'dev-team@company.com'
        },
    }
    
    return host_db.get(hostname, {
        'owner': 'Development Team',
        'location': 'Aliyun ECS Beijing Zone',
        'contact': 'admin@company.com'
    })

if __name__ == '__main__':
    server_host = '0.0.0.0'
    server_port = 5000
    
    print(f"Event processing endpoint: http://localhost:{server_port}/api/event-update")
    print("Press Ctrl+C to stop service")
    
    try:
        server = HTTPServer((server_host, server_port), EventUpdateHandler)
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nStopping service...")
        server.shutdown()
        print("Service stopped")

2. 启动服务

python event_processor.py

服务启动后会在 http://localhost:5000 监听请求。

3. 在 Nightingale 中配置

在 Event Update 处理器配置页面中设置:

  • URL: http://localhost:5000/api/event-update

5. 测试配置

可以点击页面上的"测试"按钮来验证配置是否正确。

常见问题

Q: 如果外部服务不可用怎么办?

A: 处理器会记录错误日志,并返回原始事件继续处理,不会中断告警流水线。

Q: 外部服务可以拒绝某些事件吗?

A: 可以,外部服务返回原始事件对象即表示不做修改。

Q: 是否支持异步处理?

A: 当前版本为同步处理,外部服务需要在超时时间内返回响应。

Q: 如何调试配置问题?

A: 可以点击页面上的测试测试按钮

注意事项

  • 确保外部服务的高可用性,避免影响告警处理
  • 合理设置超时时间,平衡响应速度和服务稳定性
  • 定期监控外部服务的性能和可用性
  • 建议对外部服务进行充分测试后再投入生产使用
快猫星云 联系方式 快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云
OpenSource
开源版
Flashcat
Flashcat