Event Update 事件处理器
概述
Event Update 是 Nightingale 告警系统中的一个事件处理器,它允许您通过 HTTP 调用外部服务来动态更新告警事件的内容。当告警事件流经此处理器时,处理器会将事件数据发送到指定的 URL,并根据外部服务的响应来更新事件信息。
工作原理
- 事件接收:处理器接收到告警事件
- 数据序列化:将事件对象转换为 JSON 格式
- HTTP 调用:通过 POST 请求将 JSON 数据发送到配置的 URL
- 响应处理:读取外部服务的响应
- 事件更新:将响应内容反序列化后更新原始事件
- 事件传递:返回更新后的事件继续流水线处理
配置说明
基础配置
URL(必填)
- 说明:外部服务的 HTTP 接口地址
- 格式:完整的 HTTP/HTTPS URL
- 示例:
https://your-service.com/api/event-update
高级配置
授权配置
- 授权用户名:HTTP Basic 认证的用户名
- 授权密码:HTTP Basic 认证的密码
- 使用场景:当外部服务需要身份验证时使用
HTTP Headers
- 说明:自定义 HTTP 请求头
- 格式:键值对形式
- 默认包含:
Content-Type: application/json - 示例:
X-API-Key: your-api-key X-Custom-Header: custom-value
HTTP Proxy
- 说明:HTTP 代理服务器地址
- 格式:
http://proxy-host:port或https://proxy-host:port - 使用场景:当需要通过代理访问外部服务时
Timeout
- 说明:HTTP 请求超时时间
- 单位:毫秒(ms)
- 默认值:10000(10秒)
- 建议:根据外部服务响应时间适当调整
TLS InsecureSkipVerify
- 说明:是否跳过 TLS 证书验证
- 默认值:关闭(验证证书)
- 注意:仅在测试环境或私有证书时开启
外部服务接口规范
您的外部服务需要满足以下要求:
请求格式
- 方法:POST
- Content-Type:application/json
- 请求体:完整的告警事件 JSON 对象
响应格式
- Content-Type:application/json
- 响应体:更新后的告警事件 JSON 对象
- 状态码:建议返回 200
事件对象结构示例
{
"id": "event-id",
"rule_name": "规则名称",
"metric": "指标名称",
"severity": 2,
"status": 1,
"values": "告警值",
"tags": [
"host=server01",
"service=web"
],
"annotations": {
"summary": "告警摘要",
"description": "详细描述"
}
}
PS: 在外部服务中,所有字段都可以更新,夜莺后端会用 response 的内容对 event 进行 Unmarshal。 如果 response 返回 json 中只有 annotations 字段, Unmarshal 之后,只有 annotations 会被覆盖。
使用示例
示例 1:丰富告警信息
// 外部服务可以根据事件中的主机名查询 CMDB,
// 添加更多机器信息到 annotations 中
{
...
"annotations": {
"summary": "CPU使用率过高",
"description": "服务器 server01 CPU使用率达到90%",
"owner": "运维团队",
"location": "北京机房A区",
"contact": "admin@company.com"
}
}
示例 2:动态调整告警级别
// 根据时间、业务影响等因素动态调整告警级别
{
...
"severity": 1, // 从 2 调整为 1(更高级别)
"annotations": {
"reason": "业务高峰期,提升告警级别"
}
}
实际使用案例
Python HTTP 服务示例
以下是一个使用 Python 标准库的完整示例,演示如何创建一个外部服务来处理 Event Update 请求:
1. 创建 Python 服务文件 (event_processor.py)
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import logging
from datetime import datetime
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventUpdateHandler(BaseHTTPRequestHandler):
def do_POST(self):
"""Handle POST requests"""
try:
# Parse request path
parsed_path = urlparse(self.path)
if parsed_path.path == '/api/event-update':
self.handle_event_update()
else:
self.send_error(404, "Not Found")
except Exception as e:
logger.error(f"Error handling request: {str(e)}")
self.send_error(500, "Internal Server Error")
def do_GET(self):
"""Handle GET requests"""
self.send_error(404, "Not Found")
def handle_event_update(self):
"""Handle event update requests"""
try:
# Read request body
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_error(400, "Empty request body")
return
post_data = self.rfile.read(content_length)
event = json.loads(post_data.decode('utf-8'))
logger.info(f"Received alert event: {json.dumps(event, ensure_ascii=False, indent=2)}")
# Backup original event
original_event = event.copy()
# Example 1: Enrich alert information based on hostname
# Use tags_map if available, otherwise parse tags array
tags_dict = event.get('tags_map', {})
if not tags_dict and 'tags' in event:
# Parse tags array like ["key=value", "key2=value2"]
for tag in event['tags']:
if '=' in tag:
key, value = tag.split('=', 1)
tags_dict[key] = value
# Look for host identifier (could be 'host', 'ident', or 'instance')
hostname = tags_dict.get('host') or tags_dict.get('ident') or tags_dict.get('instance')
if hostname:
# Simulate CMDB query to get host information
host_info = get_host_info(hostname)
# Update annotations
if 'annotations' not in event:
event['annotations'] = {}
event['annotations'].update({
'owner': host_info.get('owner', 'Unknown'),
'location': host_info.get('location', 'Unknown Location'),
'contact': host_info.get('contact', 'admin@company.com'),
'processed_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'processor': 'Python HTTP Service',
'host_identifier': hostname
})
# Example 2: Add custom tags to tags_map
if 'tags_map' not in event:
event['tags_map'] = {}
event['tags_map']['processed'] = 'true'
event['tags_map']['processor_version'] = '1.0'
# Also update the tags array
if 'tags' not in event:
event['tags'] = []
event['tags'].append('processed=true')
event['tags'].append('processor_version=1.0')
# Log processing result
logger.info(f"Processing completed, returning updated event: {json.dumps(event, ensure_ascii=False, indent=2)}")
# Return processed event
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
response_data = json.dumps(event, ensure_ascii=False).encode('utf-8')
self.wfile.write(response_data)
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error: {str(e)}")
self.send_error(400, "Invalid JSON")
except Exception as e:
logger.error(f"Error processing event: {str(e)}")
# Return original event on error to ensure alert pipeline doesn't break
try:
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
response_data = json.dumps(original_event, ensure_ascii=False).encode('utf-8')
self.wfile.write(response_data)
except:
self.send_error(500, "Internal Server Error")
def log_message(self, format, *args):
"""Custom log format"""
logger.info(f"{self.address_string()} - {format % args}")
def get_host_info(hostname):
# Simulated host information database
host_db = {
'server01': {
'owner': 'Operations Team',
'location': 'Beijing Data Center Zone A',
'contact': 'ops-team@company.com'
},
'server02': {
'owner': 'Development Team',
'location': 'Shanghai Data Center Zone B',
'contact': 'dev-team@company.com'
},
}
return host_db.get(hostname, {
'owner': 'Development Team',
'location': 'Aliyun ECS Beijing Zone',
'contact': 'admin@company.com'
})
if __name__ == '__main__':
server_host = '0.0.0.0'
server_port = 5000
print(f"Event processing endpoint: http://localhost:{server_port}/api/event-update")
print("Press Ctrl+C to stop service")
try:
server = HTTPServer((server_host, server_port), EventUpdateHandler)
server.serve_forever()
except KeyboardInterrupt:
print("\nStopping service...")
server.shutdown()
print("Service stopped")
2. 启动服务
python event_processor.py
服务启动后会在 http://localhost:5000 监听请求。
3. 在 Nightingale 中配置
在 Event Update 处理器配置页面中设置:
- URL:
http://localhost:5000/api/event-update
5. 测试配置
可以点击页面上的"测试"按钮来验证配置是否正确。
常见问题
Q: 如果外部服务不可用怎么办?
A: 处理器会记录错误日志,并返回原始事件继续处理,不会中断告警流水线。
Q: 外部服务可以拒绝某些事件吗?
A: 可以,外部服务返回原始事件对象即表示不做修改。
Q: 是否支持异步处理?
A: 当前版本为同步处理,外部服务需要在超时时间内返回响应。
Q: 如何调试配置问题?
A: 可以点击页面上的测试测试按钮
注意事项
- 确保外部服务的高可用性,避免影响告警处理
- 合理设置超时时间,平衡响应速度和服务稳定性
- 定期监控外部服务的性能和可用性
- 建议对外部服务进行充分测试后再投入生产使用