事件更新處理器 — 透過呼叫外部 HTTP 服務動態更新告警事件內容。

概述

Event Update 是 Nightingale 告警系統中的一個事件處理器,它允許您透過 HTTP 呼叫外部服務來動態更新告警事件的內容。當告警事件流經此處理器時,處理器會將事件資料傳送到指定的 URL,並根據外部服務的回應來更新事件資訊。

工作原理

  1. 事件接收:處理器接收到告警事件
  2. 資料序列化:將事件物件轉換為 JSON 格式
  3. HTTP 呼叫:透過 POST 請求將 JSON 資料傳送到設定的 URL
  4. 回應處理:讀取外部服務的回應
  5. 事件更新:將回應內容反序列化後更新原始事件
  6. 事件傳遞:傳回更新後的事件繼續流水線處理

設定說明

基礎設定

URL(必填)

  • 說明:外部服務的 HTTP 介面位址
  • 格式:完整的 HTTP/HTTPS URL
  • 範例https://your-service.com/api/event-update

進階設定

授權設定

  • 授權使用者名稱:HTTP Basic 認證的使用者名稱
  • 授權密碼:HTTP Basic 認證的密碼
  • 使用場景:當外部服務需要身分驗證時使用

HTTP Headers

  • 說明:自訂 HTTP 請求標頭
  • 格式:鍵值對形式
  • 預設包含Content-Type: application/json
  • 範例
    X-API-Key: your-api-key
    X-Custom-Header: custom-value
    

HTTP Proxy

  • 說明:HTTP 代理伺服器位址
  • 格式http://proxy-host:porthttps://proxy-host:port
  • 使用場景:當需要透過代理存取外部服務時

Timeout

  • 說明:HTTP 請求逾時時間
  • 單位:毫秒(ms)
  • 預設值:10000(10 秒)
  • 建議:根據外部服務回應時間適度調整

TLS InsecureSkipVerify

  • 說明:是否略過 TLS 憑證驗證
  • 預設值:關閉(驗證憑證)
  • 注意:僅在測試環境或私有憑證時開啟

外部服務介面規範

您的外部服務需要符合以下要求:

請求格式

  • 方法:POST
  • Content-Type:application/json
  • 請求主體:完整的告警事件 JSON 物件

回應格式

  • Content-Type:application/json
  • 回應主體:更新後的告警事件 JSON 物件
  • 狀態碼:建議傳回 200

事件物件結構範例

{
  "id": "event-id",
  "rule_name": "规则名称",
  "metric": "指标名称",
  "severity": 2,
  "status": 1,
  "values": "告警值",
  "tags": [
    "host=server01",
    "service=web"
  ],
  "annotations": {
    "summary": "告警摘要",
    "description": "详细描述"
  }
}

PS:在外部服務中,所有欄位都可以更新,夜鶯後端會用 response 的內容對 event 進行 Unmarshal。 如果 response 傳回的 JSON 中只有 annotations 欄位,Unmarshal 之後,只有 annotations 會被覆寫。

使用範例

範例 1:豐富告警資訊

// 外部服务可以根据事件中的主机名查询 CMDB,
// 添加更多机器信息到 annotations 中
{ 
  ...
  "annotations": {
    "summary": "CPU使用率过高",
    "description": "服务器 server01 CPU使用率达到90%",
    "owner": "运维团队",
    "location": "北京机房A区",
    "contact": "admin@company.com"
  }
}

範例 2:動態調整告警等級

// 根据时间、业务影响等因素动态调整告警级别
{
  ...
  "severity": 1,  // 从 2 调整为 1(更高级别)
  "annotations": {
    "reason": "业务高峰期,提升告警级别"
  }
}

實際使用案例

Python HTTP 服務範例

以下是一個使用 Python 標準函式庫的完整範例,示範如何建立一個外部服務來處理 Event Update 請求:

1. 建立 Python 服務檔案(event_processor.py)

from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import logging
from datetime import datetime
from urllib.parse import urlparse

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EventUpdateHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        """Handle POST requests"""
        try:
            # Parse request path
            parsed_path = urlparse(self.path)
            
            if parsed_path.path == '/api/event-update':
                self.handle_event_update()
            else:
                self.send_error(404, "Not Found")
                
        except Exception as e:
            logger.error(f"Error handling request: {str(e)}")
            self.send_error(500, "Internal Server Error")
    
    def do_GET(self):
        """Handle GET requests"""
        self.send_error(404, "Not Found")
    
    def handle_event_update(self):
        """Handle event update requests"""
        try:
            # Read request body
            content_length = int(self.headers.get('Content-Length', 0))
            if content_length == 0:
                self.send_error(400, "Empty request body")
                return
                
            post_data = self.rfile.read(content_length)
            event = json.loads(post_data.decode('utf-8'))
            
            logger.info(f"Received alert event: {json.dumps(event, ensure_ascii=False, indent=2)}")
            
            # Backup original event
            original_event = event.copy()
            
            # Example 1: Enrich alert information based on hostname
            # Use tags_map if available, otherwise parse tags array
            tags_dict = event.get('tags_map', {})
            if not tags_dict and 'tags' in event:
                # Parse tags array like ["key=value", "key2=value2"]
                for tag in event['tags']:
                    if '=' in tag:
                        key, value = tag.split('=', 1)
                        tags_dict[key] = value
            
            # Look for host identifier (could be 'host', 'ident', or 'instance')
            hostname = tags_dict.get('host') or tags_dict.get('ident') or tags_dict.get('instance')
            
            if hostname:
                # Simulate CMDB query to get host information
                host_info = get_host_info(hostname)
                
                # Update annotations
                if 'annotations' not in event:
                    event['annotations'] = {}
                
                event['annotations'].update({
                    'owner': host_info.get('owner', 'Unknown'),
                    'location': host_info.get('location', 'Unknown Location'),
                    'contact': host_info.get('contact', 'admin@company.com'),
                    'processed_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                    'processor': 'Python HTTP Service',
                    'host_identifier': hostname
                })
            
            # Example 2: Add custom tags to tags_map
            if 'tags_map' not in event:
                event['tags_map'] = {}
            
            event['tags_map']['processed'] = 'true'
            event['tags_map']['processor_version'] = '1.0'
            
            # Also update the tags array
            if 'tags' not in event:
                event['tags'] = []
            
            event['tags'].append('processed=true')
            event['tags'].append('processor_version=1.0')
            
            # Log processing result
            logger.info(f"Processing completed, returning updated event: {json.dumps(event, ensure_ascii=False, indent=2)}")
            
            # Return processed event
            self.send_response(200)
            self.send_header('Content-Type', 'application/json; charset=utf-8')
            self.end_headers()
            
            response_data = json.dumps(event, ensure_ascii=False).encode('utf-8')
            self.wfile.write(response_data)
            
        except json.JSONDecodeError as e:
            logger.error(f"JSON parsing error: {str(e)}")
            self.send_error(400, "Invalid JSON")
        except Exception as e:
            logger.error(f"Error processing event: {str(e)}")
            # Return original event on error to ensure alert pipeline doesn't break
            try:
                self.send_response(200)
                self.send_header('Content-Type', 'application/json; charset=utf-8')
                self.end_headers()
                response_data = json.dumps(original_event, ensure_ascii=False).encode('utf-8')
                self.wfile.write(response_data)
            except:
                self.send_error(500, "Internal Server Error")
    

    def log_message(self, format, *args):
        """Custom log format"""
        logger.info(f"{self.address_string()} - {format % args}")

def get_host_info(hostname):
    # Simulated host information database
    host_db = {
        'server01': {
            'owner': 'Operations Team',
            'location': 'Beijing Data Center Zone A',
            'contact': 'ops-team@company.com'
        },
        'server02': {
            'owner': 'Development Team',
            'location': 'Shanghai Data Center Zone B', 
            'contact': 'dev-team@company.com'
        },
    }
    
    return host_db.get(hostname, {
        'owner': 'Development Team',
        'location': 'Aliyun ECS Beijing Zone',
        'contact': 'admin@company.com'
    })

if __name__ == '__main__':
    server_host = '0.0.0.0'
    server_port = 5000
    
    print(f"Event processing endpoint: http://localhost:{server_port}/api/event-update")
    print("Press Ctrl+C to stop service")
    
    try:
        server = HTTPServer((server_host, server_port), EventUpdateHandler)
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nStopping service...")
        server.shutdown()
        print("Service stopped")

2. 啟動服務

python event_processor.py

服務啟動後會在 http://localhost:5000 監聽請求。

3. 在 Nightingale 中設定

在 Event Update 處理器設定頁面中設定:

  • URLhttp://localhost:5000/api/event-update

5. 測試設定

可以點選頁面上的「測試」按鈕來驗證設定是否正確。

常見問題

Q:如果外部服務無法使用怎麼辦?

A:處理器會記錄錯誤日誌,並傳回原始事件繼續處理,不會中斷告警流水線。

Q:外部服務可以拒絕某些事件嗎?

A:可以,外部服務傳回原始事件物件即表示不做修改。

Q:是否支援非同步處理?

A:目前版本為同步處理,外部服務需要在逾時時間內傳回回應。

Q:如何除錯設定問題?

A:可以點選頁面上的測試按鈕。

注意事項

  • 確保外部服務的高可用性,避免影響告警處理
  • 合理設定逾時時間,平衡回應速度與服務穩定性
  • 定期監控外部服務的效能與可用性
  • 建議對外部服務進行充分測試後再投入正式環境使用

參考資料

快猫星云 联系方式 快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云