事件更新處理器 — 透過呼叫外部 HTTP 服務動態更新告警事件內容。
概述
Event Update 是 Nightingale 告警系統中的一個事件處理器,它允許您透過 HTTP 呼叫外部服務來動態更新告警事件的內容。當告警事件流經此處理器時,處理器會將事件資料傳送到指定的 URL,並根據外部服務的回應來更新事件資訊。
工作原理
- 事件接收:處理器接收到告警事件
- 資料序列化:將事件物件轉換為 JSON 格式
- HTTP 呼叫:透過 POST 請求將 JSON 資料傳送到設定的 URL
- 回應處理:讀取外部服務的回應
- 事件更新:將回應內容反序列化後更新原始事件
- 事件傳遞:傳回更新後的事件繼續流水線處理
設定說明
基礎設定
URL(必填)
- 說明:外部服務的 HTTP 介面位址
- 格式:完整的 HTTP/HTTPS URL
- 範例:
https://your-service.com/api/event-update
進階設定
授權設定
- 授權使用者名稱:HTTP Basic 認證的使用者名稱
- 授權密碼:HTTP Basic 認證的密碼
- 使用場景:當外部服務需要身分驗證時使用
HTTP Headers
- 說明:自訂 HTTP 請求標頭
- 格式:鍵值對形式
- 預設包含:
Content-Type: application/json - 範例:
X-API-Key: your-api-key X-Custom-Header: custom-value
HTTP Proxy
- 說明:HTTP 代理伺服器位址
- 格式:
http://proxy-host:port或https://proxy-host:port - 使用場景:當需要透過代理存取外部服務時
Timeout
- 說明:HTTP 請求逾時時間
- 單位:毫秒(ms)
- 預設值:10000(10 秒)
- 建議:根據外部服務回應時間適度調整
TLS InsecureSkipVerify
- 說明:是否略過 TLS 憑證驗證
- 預設值:關閉(驗證憑證)
- 注意:僅在測試環境或私有憑證時開啟
外部服務介面規範
您的外部服務需要符合以下要求:
請求格式
- 方法:POST
- Content-Type:application/json
- 請求主體:完整的告警事件 JSON 物件
回應格式
- Content-Type:application/json
- 回應主體:更新後的告警事件 JSON 物件
- 狀態碼:建議傳回 200
事件物件結構範例
{
"id": "event-id",
"rule_name": "规则名称",
"metric": "指标名称",
"severity": 2,
"status": 1,
"values": "告警值",
"tags": [
"host=server01",
"service=web"
],
"annotations": {
"summary": "告警摘要",
"description": "详细描述"
}
}
PS:在外部服務中,所有欄位都可以更新,夜鶯後端會用 response 的內容對 event 進行 Unmarshal。 如果 response 傳回的 JSON 中只有 annotations 欄位,Unmarshal 之後,只有 annotations 會被覆寫。
使用範例
範例 1:豐富告警資訊
// 外部服务可以根据事件中的主机名查询 CMDB,
// 添加更多机器信息到 annotations 中
{
...
"annotations": {
"summary": "CPU使用率过高",
"description": "服务器 server01 CPU使用率达到90%",
"owner": "运维团队",
"location": "北京机房A区",
"contact": "admin@company.com"
}
}
範例 2:動態調整告警等級
// 根据时间、业务影响等因素动态调整告警级别
{
...
"severity": 1, // 从 2 调整为 1(更高级别)
"annotations": {
"reason": "业务高峰期,提升告警级别"
}
}
實際使用案例
Python HTTP 服務範例
以下是一個使用 Python 標準函式庫的完整範例,示範如何建立一個外部服務來處理 Event Update 請求:
1. 建立 Python 服務檔案(event_processor.py)
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import logging
from datetime import datetime
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventUpdateHandler(BaseHTTPRequestHandler):
def do_POST(self):
"""Handle POST requests"""
try:
# Parse request path
parsed_path = urlparse(self.path)
if parsed_path.path == '/api/event-update':
self.handle_event_update()
else:
self.send_error(404, "Not Found")
except Exception as e:
logger.error(f"Error handling request: {str(e)}")
self.send_error(500, "Internal Server Error")
def do_GET(self):
"""Handle GET requests"""
self.send_error(404, "Not Found")
def handle_event_update(self):
"""Handle event update requests"""
try:
# Read request body
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_error(400, "Empty request body")
return
post_data = self.rfile.read(content_length)
event = json.loads(post_data.decode('utf-8'))
logger.info(f"Received alert event: {json.dumps(event, ensure_ascii=False, indent=2)}")
# Backup original event
original_event = event.copy()
# Example 1: Enrich alert information based on hostname
# Use tags_map if available, otherwise parse tags array
tags_dict = event.get('tags_map', {})
if not tags_dict and 'tags' in event:
# Parse tags array like ["key=value", "key2=value2"]
for tag in event['tags']:
if '=' in tag:
key, value = tag.split('=', 1)
tags_dict[key] = value
# Look for host identifier (could be 'host', 'ident', or 'instance')
hostname = tags_dict.get('host') or tags_dict.get('ident') or tags_dict.get('instance')
if hostname:
# Simulate CMDB query to get host information
host_info = get_host_info(hostname)
# Update annotations
if 'annotations' not in event:
event['annotations'] = {}
event['annotations'].update({
'owner': host_info.get('owner', 'Unknown'),
'location': host_info.get('location', 'Unknown Location'),
'contact': host_info.get('contact', 'admin@company.com'),
'processed_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'processor': 'Python HTTP Service',
'host_identifier': hostname
})
# Example 2: Add custom tags to tags_map
if 'tags_map' not in event:
event['tags_map'] = {}
event['tags_map']['processed'] = 'true'
event['tags_map']['processor_version'] = '1.0'
# Also update the tags array
if 'tags' not in event:
event['tags'] = []
event['tags'].append('processed=true')
event['tags'].append('processor_version=1.0')
# Log processing result
logger.info(f"Processing completed, returning updated event: {json.dumps(event, ensure_ascii=False, indent=2)}")
# Return processed event
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
response_data = json.dumps(event, ensure_ascii=False).encode('utf-8')
self.wfile.write(response_data)
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error: {str(e)}")
self.send_error(400, "Invalid JSON")
except Exception as e:
logger.error(f"Error processing event: {str(e)}")
# Return original event on error to ensure alert pipeline doesn't break
try:
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
response_data = json.dumps(original_event, ensure_ascii=False).encode('utf-8')
self.wfile.write(response_data)
except:
self.send_error(500, "Internal Server Error")
def log_message(self, format, *args):
"""Custom log format"""
logger.info(f"{self.address_string()} - {format % args}")
def get_host_info(hostname):
# Simulated host information database
host_db = {
'server01': {
'owner': 'Operations Team',
'location': 'Beijing Data Center Zone A',
'contact': 'ops-team@company.com'
},
'server02': {
'owner': 'Development Team',
'location': 'Shanghai Data Center Zone B',
'contact': 'dev-team@company.com'
},
}
return host_db.get(hostname, {
'owner': 'Development Team',
'location': 'Aliyun ECS Beijing Zone',
'contact': 'admin@company.com'
})
if __name__ == '__main__':
server_host = '0.0.0.0'
server_port = 5000
print(f"Event processing endpoint: http://localhost:{server_port}/api/event-update")
print("Press Ctrl+C to stop service")
try:
server = HTTPServer((server_host, server_port), EventUpdateHandler)
server.serve_forever()
except KeyboardInterrupt:
print("\nStopping service...")
server.shutdown()
print("Service stopped")
2. 啟動服務
python event_processor.py
服務啟動後會在 http://localhost:5000 監聽請求。
3. 在 Nightingale 中設定
在 Event Update 處理器設定頁面中設定:
- URL:
http://localhost:5000/api/event-update
5. 測試設定
可以點選頁面上的「測試」按鈕來驗證設定是否正確。
常見問題
Q:如果外部服務無法使用怎麼辦?
A:處理器會記錄錯誤日誌,並傳回原始事件繼續處理,不會中斷告警流水線。
Q:外部服務可以拒絕某些事件嗎?
A:可以,外部服務傳回原始事件物件即表示不做修改。
Q:是否支援非同步處理?
A:目前版本為同步處理,外部服務需要在逾時時間內傳回回應。
Q:如何除錯設定問題?
A:可以點選頁面上的測試按鈕。
注意事項
- 確保外部服務的高可用性,避免影響告警處理
- 合理設定逾時時間,平衡回應速度與服務穩定性
- 定期監控外部服務的效能與可用性
- 建議對外部服務進行充分測試後再投入正式環境使用