Event Update 事件处理器

概述

Event Update 是 Nightingale 告警系统中的一个事件处理器,它允许您通过 HTTP 调用外部服务来动态更新告警事件的内容。当告警事件流经此处理器时,处理器会将事件数据发送到指定的 URL,并根据外部服务的响应来更新事件信息。

工作原理

  1. 事件接收:处理器接收到告警事件
  2. 数据序列化:将事件对象转换为 JSON 格式
  3. HTTP 调用:通过 POST 请求将 JSON 数据发送到配置的 URL
  4. 响应处理:读取外部服务的响应
  5. 事件更新:将响应内容反序列化后更新原始事件
  6. 事件传递:返回更新后的事件继续流水线处理

配置说明

基础配置

URL(必填)

  • 说明:外部服务的 HTTP 接口地址
  • 格式:完整的 HTTP/HTTPS URL
  • 示例https://your-service.com/api/event-update

高级配置

授权配置

  • 授权用户名:HTTP Basic 认证的用户名
  • 授权密码:HTTP Basic 认证的密码
  • 使用场景:当外部服务需要身份验证时使用

HTTP Headers

  • 说明:自定义 HTTP 请求头
  • 格式:键值对形式
  • 默认包含Content-Type: application/json
  • 示例
    X-API-Key: your-api-key
    X-Custom-Header: custom-value
    

HTTP Proxy

  • 说明:HTTP 代理服务器地址
  • 格式http://proxy-host:porthttps://proxy-host:port
  • 使用场景:当需要通过代理访问外部服务时

Timeout

  • 说明:HTTP 请求超时时间
  • 单位:毫秒(ms)
  • 默认值:10000(10秒)
  • 建议:根据外部服务响应时间适当调整

TLS InsecureSkipVerify

  • 说明:是否跳过 TLS 证书验证
  • 默认值:关闭(验证证书)
  • 注意:仅在测试环境或私有证书时开启

外部服务接口规范

您的外部服务需要满足以下要求:

请求格式

  • 方法:POST
  • Content-Type:application/json
  • 请求体:完整的告警事件 JSON 对象

响应格式

  • Content-Type:application/json
  • 响应体:更新后的告警事件 JSON 对象
  • 状态码:建议返回 200

事件对象结构示例

{
  "id": "event-id",
  "rule_name": "规则名称",
  "metric": "指标名称",
  "severity": 2,
  "status": 1,
  "values": "告警值",
  "tags": [
    "host=server01",
    "service=web"
  ],
  "annotations": {
    "summary": "告警摘要",
    "description": "详细描述"
  }
}

PS: 在外部服务中,所有字段都可以更新,夜莺后端会用 response 的内容对 event 进行 Unmarshal。 如果 response 返回 json 中只有 annotations 字段, Unmarshal 之后,只有 annotations 会被覆盖。

使用示例

示例 1:丰富告警信息

// 外部服务可以根据事件中的主机名查询 CMDB,
// 添加更多机器信息到 annotations 中
{ 
  ...
  "annotations": {
    "summary": "CPU使用率过高",
    "description": "服务器 server01 CPU使用率达到90%",
    "owner": "运维团队",
    "location": "北京机房A区",
    "contact": "admin@company.com"
  }
}

示例 2:动态调整告警级别

// 根据时间、业务影响等因素动态调整告警级别
{
  ...
  "severity": 1,  // 从 2 调整为 1(更高级别)
  "annotations": {
    "reason": "业务高峰期,提升告警级别"
  }
}

实际使用案例

Python HTTP 服务示例

以下是一个使用 Python 标准库的完整示例,演示如何创建一个外部服务来处理 Event Update 请求:

1. 创建 Python 服务文件 (event_processor.py)

from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import logging
from datetime import datetime
from urllib.parse import urlparse

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EventUpdateHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        """Handle POST requests"""
        try:
            # Parse request path
            parsed_path = urlparse(self.path)
            
            if parsed_path.path == '/api/event-update':
                self.handle_event_update()
            else:
                self.send_error(404, "Not Found")
                
        except Exception as e:
            logger.error(f"Error handling request: {str(e)}")
            self.send_error(500, "Internal Server Error")
    
    def do_GET(self):
        """Handle GET requests"""
        self.send_error(404, "Not Found")
    
    def handle_event_update(self):
        """Handle event update requests"""
        try:
            # Read request body
            content_length = int(self.headers.get('Content-Length', 0))
            if content_length == 0:
                self.send_error(400, "Empty request body")
                return
                
            post_data = self.rfile.read(content_length)
            event = json.loads(post_data.decode('utf-8'))
            
            logger.info(f"Received alert event: {json.dumps(event, ensure_ascii=False, indent=2)}")
            
            # Backup original event
            original_event = event.copy()
            
            # Example 1: Enrich alert information based on hostname
            # Use tags_map if available, otherwise parse tags array
            tags_dict = event.get('tags_map', {})
            if not tags_dict and 'tags' in event:
                # Parse tags array like ["key=value", "key2=value2"]
                for tag in event['tags']:
                    if '=' in tag:
                        key, value = tag.split('=', 1)
                        tags_dict[key] = value
            
            # Look for host identifier (could be 'host', 'ident', or 'instance')
            hostname = tags_dict.get('host') or tags_dict.get('ident') or tags_dict.get('instance')
            
            if hostname:
                # Simulate CMDB query to get host information
                host_info = get_host_info(hostname)
                
                # Update annotations
                if 'annotations' not in event:
                    event['annotations'] = {}
                
                event['annotations'].update({
                    'owner': host_info.get('owner', 'Unknown'),
                    'location': host_info.get('location', 'Unknown Location'),
                    'contact': host_info.get('contact', 'admin@company.com'),
                    'processed_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                    'processor': 'Python HTTP Service',
                    'host_identifier': hostname
                })
            
            # Example 2: Add custom tags to tags_map
            if 'tags_map' not in event:
                event['tags_map'] = {}
            
            event['tags_map']['processed'] = 'true'
            event['tags_map']['processor_version'] = '1.0'
            
            # Also update the tags array
            if 'tags' not in event:
                event['tags'] = []
            
            event['tags'].append('processed=true')
            event['tags'].append('processor_version=1.0')
            
            # Log processing result
            logger.info(f"Processing completed, returning updated event: {json.dumps(event, ensure_ascii=False, indent=2)}")
            
            # Return processed event
            self.send_response(200)
            self.send_header('Content-Type', 'application/json; charset=utf-8')
            self.end_headers()
            
            response_data = json.dumps(event, ensure_ascii=False).encode('utf-8')
            self.wfile.write(response_data)
            
        except json.JSONDecodeError as e:
            logger.error(f"JSON parsing error: {str(e)}")
            self.send_error(400, "Invalid JSON")
        except Exception as e:
            logger.error(f"Error processing event: {str(e)}")
            # Return original event on error to ensure alert pipeline doesn't break
            try:
                self.send_response(200)
                self.send_header('Content-Type', 'application/json; charset=utf-8')
                self.end_headers()
                response_data = json.dumps(original_event, ensure_ascii=False).encode('utf-8')
                self.wfile.write(response_data)
            except:
                self.send_error(500, "Internal Server Error")
    

    def log_message(self, format, *args):
        """Custom log format"""
        logger.info(f"{self.address_string()} - {format % args}")

def get_host_info(hostname):
    # Simulated host information database
    host_db = {
        'server01': {
            'owner': 'Operations Team',
            'location': 'Beijing Data Center Zone A',
            'contact': 'ops-team@company.com'
        },
        'server02': {
            'owner': 'Development Team',
            'location': 'Shanghai Data Center Zone B', 
            'contact': 'dev-team@company.com'
        },
    }
    
    return host_db.get(hostname, {
        'owner': 'Development Team',
        'location': 'Aliyun ECS Beijing Zone',
        'contact': 'admin@company.com'
    })

if __name__ == '__main__':
    server_host = '0.0.0.0'
    server_port = 5000
    
    print(f"Event processing endpoint: http://localhost:{server_port}/api/event-update")
    print("Press Ctrl+C to stop service")
    
    try:
        server = HTTPServer((server_host, server_port), EventUpdateHandler)
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nStopping service...")
        server.shutdown()
        print("Service stopped")

2. 启动服务

python event_processor.py

服务启动后会在 http://localhost:5000 监听请求。

3. 在 Nightingale 中配置

在 Event Update 处理器配置页面中设置:

  • URL: http://localhost:5000/api/event-update

5. 测试配置

可以点击页面上的"测试"按钮来验证配置是否正确。

常见问题

Q: 如果外部服务不可用怎么办?

A: 处理器会记录错误日志,并返回原始事件继续处理,不会中断告警流水线。

Q: 外部服务可以拒绝某些事件吗?

A: 可以,外部服务返回原始事件对象即表示不做修改。

Q: 是否支持异步处理?

A: 当前版本为同步处理,外部服务需要在超时时间内返回响应。

Q: 如何调试配置问题?

A: 可以点击页面上的测试测试按钮

注意事项

  • 确保外部服务的高可用性,避免影响告警处理
  • 合理设置超时时间,平衡响应速度和服务稳定性
  • 定期监控外部服务的性能和可用性
  • 建议对外部服务进行充分测试后再投入生产使用

更新时间 2025-03-12

快猫星云 联系方式 快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云