记一次categraf插件开发
王玉松@快猫星云
2024-09-24 10:06:10
Categraf简介
-
Categraf 旨在成为一个全能型的监控代理,支持通过 remote_write 协议写入 Prometheus、M3DB、VictoriaMetrics 和 InfluxDB 等时序数据库。其特点是只采集数值数据,保持稳定的标签结构,并通过高效采集机制减少高基数数据问题。采用纯 Go 语言编写,便于部署和扩展。Categraf 还提供常用的采集器、监控大盘和告警规则,方便用户直接使用,未来计划集成日志和追踪功能,并持续迭代,成为开源社区的一部分。
-
下载地址: https://github.com/flashcatcloud/categraf/releases/tag/v0.3.79
-
./categraf –help
相关代码:
var (
appPath string
configDir = flag.String("configs", osx.GetEnv("CATEGRAF_CONFIGS", "conf"), "Specify configuration directory.(env:CATEGRAF_CONFIGS)")
debugMode = flag.Bool("debug", false, "Is debug mode?")
debugLevel = flag.Int("debug-level", 0, "debug level")
testMode = flag.Bool("test", false, "Is test mode? print metrics to stdout")
interval = flag.Int64("interval", 0, "Global interval(unit:Second)")
showVersion = flag.Bool("version", false, "Show version.")
inputFilters = flag.String("inputs", "", "e.g. cpu:mem:system")
install = flag.Bool("install", false, "Install categraf service")
remove = flag.Bool("remove", false, "Remove categraf service")
start = flag.Bool("start", false, "Start categraf service")
stop = flag.Bool("stop", false, "Stop categraf service")
status = flag.Bool("status", false, "Show categraf service status")
update = flag.Bool("update", false, "Update categraf binary")
updateFile = flag.String("update_url", "", "new version for categraf to download")
)
配置插件
一个插件一般会有多个实例,查看这个配置插件的文件, 比如input.apache/apache.toml
:
[[instances]]
# https://statuslist.app/apache/apache-status-page-simple-setup-guide/
# scrape_uri = "http://localhost/server-status/?auto"
# host_override = ""
# insecure = false
# custom_headers = {}
# log_level = "info"
字段简介:
- scrape_uri: 采集 Apache 状态页面的 URL
- host_override: 覆盖默认的主机头,用于当采集主机与实际主机不一致时。
- insecure设置为true 时,禁用 SSL 证书验证,适用于自签名证书的情况。
- custom_headers: 自定义 HTTP 头部(例如身份验证)。
- log_level: 设置日志级别,默认值为 “info”,控制日志输出的详细程度。
主要流程
- 我们在inputs里面配置了新的插件采集器
- 通过 init() 函数,Categraf 框架会自动初始化并将该采集器注册到系统中。
- 初始化阶段:MetricsAgent 设置了过滤器和输入读取器,用于管理和跟踪数据采集器。
- 注册采集器:类似 Apache 的插件通过 RegisterInput 注册。该过程加载配置并创建采集器实例。
- 启动采集:注册后,采集器(例如 Apache)初始化其导出器,并通过 Gather() 函数采集指标数据。
- 指标数据传递:采集到的指标数据被传递到 SampleList,它用于存储 Prometheus 可以抓取的指标数据。
安全采集,放心使用:
func (ins *Instance) Gather(slist *types.SampleList) {
// collect
err := inputs.Collect(ins.e, slist)
if err != nil {
ins.Println("E! failed to collect metrics:", err)
}
}
在代码中,ins.e(即 exporter.Exporter 实例)通过调用 inputs.Collect(ins.e, slist) 函数收集指标,slist 是 SampleList 的实例,负责保存采集到的样本数据。
// SafeList is a thread-safe list
type SafeList[T any] struct {
sync.RWMutex
L *list.List
}
func NewSafeList[T any]() *SafeList[T] {
return &SafeList[T]{L: list.New()}
}
SafeList 被认为是线程安全的,因为它使用了 sync.RWMutex 读写锁来保护底层列表操作。
- 写操作锁:像 PushFront、PopBack 和 RemoveAll 这样的修改操作使用写锁 (Lock),确保同一时间只有一个 goroutine 可以修改列表,避免并发修改冲突。
- 读操作锁:对于只读操作,如 Len,使用读锁 (RLock),允许多个读操作同时进行,而不会相互阻塞。 这防止了竞争条件,确保多个 goroutine 安全访问列表。
实战
我们演示如何将apache exporter fork到Categraf插件中
- 配置参数:一般是网络端口配置
# scrape_uri = "http://localhost/server-status/?auto"
# host_override = ""
# insecure = false
# custom_headers = {}
# log_level = "info"
- 初始化插件
// 定义了一个Apache插件类型,嵌套了config.PluginConfig,用于保存插件的配置信息。
type Apache struct {
config.PluginConfig
Instances []*Instance `toml:"instances"`
}
// 定义了一个指向Instance结构体的指针数组,表示该插件可能有多个实例(Instance)。
type Instance struct {
config.InstanceConfig
LogLevel string `toml:"log_level"`
exporter.Config
e *exporter.Exporter //用于保存导出器实例的指针。这个指针会在Init()方法中进行初始化。
}
// 确保Apache结构体实现了inputs.Input接口。
var _ inputs.Input = new(Apache)
// 确保Instance结构体实现了inputs.SampleGatherer接口。
var _ inputs.SampleGatherer = new(Instance)
// 确保Apache结构体实现了inputs.InstancesGetter接口。
var _ inputs.InstancesGetter = new(Apache)
// inputs.Add()函数用于注册插件,将Apache插件添加到插件系统中。inputs.Add()需要提供插件的名称和一个初始化函数,这里是返回一个新的Apache实例。
func init() {
inputs.Add(inputName, func() inputs.Input {
return &Apache{}
})
}
func (ins *Instance) Init() error {
// 通过exporter.New()函数创建一个新的导出器实例
e, err := exporter.New(logger, &ins.Config)
if err != nil {
return fmt.Errorf("could not instantiate apache lag exporter: %v", err)
}
ins.e = e
return nil
}
- 采集中 将exporter服务推入到SampleList
func (ins *Instance) Gather(slist *types.SampleList) {
//收集指标数据的入口函数。
//调用Collect函数,将ins.e(即exporter.Exporter实例)传递给Collect函数,收集Apache的指标数据。
inputs.Collect(ins.e, slist)
}
func Collect(e prometheus.Collector, slist *types.SampleList, constLabels ...map[string]string) error {
//创建一个metricChan通道,用于接收Prometheus的指标数据。
metricChan := make(chan prometheus.Metric, capMetricChan)
go func() {
//e.Collect(metricChan):以异步的方式(在新的goroutine中)调用e的Collect方法,将指标数据传入通道metricChan。
e.Collect(metricChan)
close(metricChan)
}()
//通过range循环遍历metricChan,获取每个指标metric。
for metric := range metricChan {
desc := metric.Desc()
dtoMetric := &dto.Metric{}
err := metric.Write(dtoMetric)
//.........
//slist.PushSample():将Counter、Gauge或Untyped类型的指标推送到样本列表。
switch {
case dtoMetric.Counter != nil:
slist.PushSample("", desc.Name(), *dtoMetric.Counter.Value, labels)
case dtoMetric.Gauge != nil:
slist.PushSample("", desc.Name(), *dtoMetric.Gauge.Value, labels)
}
}
return nil
}
// 用于从Apache实例中收集原始数据并处理这些数据
func (e *Exporter) collect(ch chan<- prometheus.Metric) error {
if err != nil {
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
return fmt.Errorf("error scraping Apache: %w", err)
}
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 1)
scanner := bufio.NewScanner(bytes.NewReader(data))
for scanner.Scan() {
key, v := splitkv(scanner.Text())
if err != nil {
continue
}
switch {
}
return nil
}
// 实现了prometheus.Collector接口中的Collect方法,负责从Exporter中收集数据并发送给Prometheus。
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.mutex.Lock() // To protect metrics from concurrent collects.
defer e.mutex.Unlock()
if err := e.collect(ch); err != nil {
level.Error(e.logger).Log("msg", "Error scraping Apache:", "err", err)
e.scrapeFailures.Inc()
e.scrapeFailures.Collect(ch)
}
}
- Metrics Agent MetricsAgent:负责管理输入插件及其配置。通过 RegisterInput 方法加载和注册各个采集器(如 Apache、Redis 等)的配置。
InputReader:负责启动并定期调用 gatherOnce(),从每个插件中采集数据。采集的数据被处理后,转发到 writer 模块进行写入。
func (r *InputReader) gatherOnce() {
defer func() {
if rc := recover(); rc != nil {
log.Println("E!", r.inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
}
}()
// plugin level, for system plugins
slist := types.NewSampleList()
inputs.MayGather(r.input, slist)
sl := r.input.Process(slist)
if r.input.Type() == "debug" {
arr := sl.PopBackAll()
debug.ReportMetrics(r.input.UUID(), arr)
} else {
r.forward(sl)
}
instances := inputs.MayGetInstances(r.input)
if len(instances) == 0 {
return
}
atomic.AddUint64(&r.runCounter, 1)
for i := 0; i < len(instances); i++ {
if !instances[i].Initialized() {
continue
}
r.waitGroup.Add(1)
go func(ins inputs.Instance) {
defer r.waitGroup.Done()
it := ins.GetIntervalTimes()
if it > 0 {
counter := atomic.LoadUint64(&r.runCounter)
if counter%uint64(it) != 0 {
return
}
}
insList := types.NewSampleList()
inputs.MayGather(ins, insList)
sl := ins.Process(insList)
if ins.Type() == "debug" {
arr := sl.PopBackAll()
debug.ReportMetrics(ins.UUID(), arr)
} else {
r.forward(sl)
}
}(instances[i])
}
r.waitGroup.Wait()
}
所以我们写完采集器将采集器的包导到metricsAgent里,然后会在初始化时读取该采集器服务。
结果打印
- 手动输入命令: ./categraf –test –inputs ExporterServerName