记一次categraf插件开发

王玉松@快猫星云 2024-09-24 10:06:10

Categraf简介

  • Categraf 旨在成为一个全能型的监控代理,支持通过 remote_write 协议写入 Prometheus、M3DB、VictoriaMetrics 和 InfluxDB 等时序数据库。其特点是只采集数值数据,保持稳定的标签结构,并通过高效采集机制减少高基数数据问题。采用纯 Go 语言编写,便于部署和扩展。Categraf 还提供常用的采集器、监控大盘和告警规则,方便用户直接使用,未来计划集成日志和追踪功能,并持续迭代,成为开源社区的一部分。

  • 下载地址: https://github.com/flashcatcloud/categraf/releases/tag/v0.3.79

  • ./categraf –help help

相关代码:


var (
	appPath      string
	configDir    = flag.String("configs", osx.GetEnv("CATEGRAF_CONFIGS", "conf"), "Specify configuration directory.(env:CATEGRAF_CONFIGS)")
	debugMode    = flag.Bool("debug", false, "Is debug mode?")
	debugLevel   = flag.Int("debug-level", 0, "debug level")
	testMode     = flag.Bool("test", false, "Is test mode? print metrics to stdout")
	interval     = flag.Int64("interval", 0, "Global interval(unit:Second)")
	showVersion  = flag.Bool("version", false, "Show version.")
	inputFilters = flag.String("inputs", "", "e.g. cpu:mem:system")
	install      = flag.Bool("install", false, "Install categraf service")
	remove       = flag.Bool("remove", false, "Remove categraf service")
	start        = flag.Bool("start", false, "Start categraf service")
	stop         = flag.Bool("stop", false, "Stop categraf service")
	status       = flag.Bool("status", false, "Show categraf service status")
	update       = flag.Bool("update", false, "Update categraf binary")
	updateFile   = flag.String("update_url", "", "new version for categraf to download")
)

配置插件

一个插件一般会有多个实例,查看这个配置插件的文件, 比如input.apache/apache.toml:

[[instances]]

# https://statuslist.app/apache/apache-status-page-simple-setup-guide/
# scrape_uri = "http://localhost/server-status/?auto"
# host_override = ""
# insecure = false
# custom_headers = {}
# log_level = "info"

字段简介:

  • scrape_uri: 采集 Apache 状态页面的 URL
  • host_override: 覆盖默认的主机头,用于当采集主机与实际主机不一致时。
  • insecure设置为true 时,禁用 SSL 证书验证,适用于自签名证书的情况。
  • custom_headers: 自定义 HTTP 头部(例如身份验证)。
  • log_level: 设置日志级别,默认值为 “info”,控制日志输出的详细程度。

主要流程

  • 我们在inputs里面配置了新的插件采集器
  • 通过 init() 函数,Categraf 框架会自动初始化并将该采集器注册到系统中。
  • 初始化阶段:MetricsAgent 设置了过滤器和输入读取器,用于管理和跟踪数据采集器。
  • 注册采集器:类似 Apache 的插件通过 RegisterInput 注册。该过程加载配置并创建采集器实例。
  • 启动采集:注册后,采集器(例如 Apache)初始化其导出器,并通过 Gather() 函数采集指标数据。
  • 指标数据传递:采集到的指标数据被传递到 SampleList,它用于存储 Prometheus 可以抓取的指标数据。

安全采集,放心使用:

func (ins *Instance) Gather(slist *types.SampleList) {

	//  collect
	err := inputs.Collect(ins.e, slist)
	if err != nil {
		ins.Println("E! failed to collect metrics:", err)
	}
}

在代码中,ins.e(即 exporter.Exporter 实例)通过调用 inputs.Collect(ins.e, slist) 函数收集指标,slist 是 SampleList 的实例,负责保存采集到的样本数据。


// SafeList is a thread-safe list
type SafeList[T any] struct {
	sync.RWMutex
	L *list.List
}

func NewSafeList[T any]() *SafeList[T] {
	return &SafeList[T]{L: list.New()}
}

SafeList 被认为是线程安全的,因为它使用了 sync.RWMutex 读写锁来保护底层列表操作。

  • 写操作锁:像 PushFront、PopBack 和 RemoveAll 这样的修改操作使用写锁 (Lock),确保同一时间只有一个 goroutine 可以修改列表,避免并发修改冲突。
  • 读操作锁:对于只读操作,如 Len,使用读锁 (RLock),允许多个读操作同时进行,而不会相互阻塞。 这防止了竞争条件,确保多个 goroutine 安全访问列表。

实战

我们演示如何将apache exporter fork到Categraf插件中

  • 配置参数:一般是网络端口配置

# scrape_uri = "http://localhost/server-status/?auto"
# host_override = ""
# insecure = false
# custom_headers = {}
# log_level = "info"
  • 初始化插件
// 定义了一个Apache插件类型,嵌套了config.PluginConfig,用于保存插件的配置信息。
type Apache struct {
	config.PluginConfig
	Instances []*Instance `toml:"instances"`
}
// 定义了一个指向Instance结构体的指针数组,表示该插件可能有多个实例(Instance)。
type Instance struct {
	config.InstanceConfig
	LogLevel string `toml:"log_level"`
	exporter.Config

	e *exporter.Exporter //用于保存导出器实例的指针。这个指针会在Init()方法中进行初始化。
}

// 确保Apache结构体实现了inputs.Input接口。
var _ inputs.Input = new(Apache)
// 确保Instance结构体实现了inputs.SampleGatherer接口。
var _ inputs.SampleGatherer = new(Instance)
// 确保Apache结构体实现了inputs.InstancesGetter接口。
var _ inputs.InstancesGetter = new(Apache)

// inputs.Add()函数用于注册插件,将Apache插件添加到插件系统中。inputs.Add()需要提供插件的名称和一个初始化函数,这里是返回一个新的Apache实例。
func init() {
	inputs.Add(inputName, func() inputs.Input {
		return &Apache{}
	})
}


func (ins *Instance) Init() error {
// 通过exporter.New()函数创建一个新的导出器实例
	e, err := exporter.New(logger, &ins.Config)

	if err != nil {
		return fmt.Errorf("could not instantiate apache lag exporter: %v", err)
	}

	ins.e = e
	return nil

}
  • 采集中 将exporter服务推入到SampleList
func (ins *Instance) Gather(slist *types.SampleList) {

	//收集指标数据的入口函数。
	//调用Collect函数,将ins.e(即exporter.Exporter实例)传递给Collect函数,收集Apache的指标数据。
	inputs.Collect(ins.e, slist)
	
}


func Collect(e prometheus.Collector, slist *types.SampleList, constLabels ...map[string]string) error {
	
	//创建一个metricChan通道,用于接收Prometheus的指标数据。
	metricChan := make(chan prometheus.Metric, capMetricChan)
	go func() {
	//e.Collect(metricChan):以异步的方式(在新的goroutine中)调用e的Collect方法,将指标数据传入通道metricChan。

		e.Collect(metricChan)
		close(metricChan)
	}()
	//通过range循环遍历metricChan,获取每个指标metric。
	for metric := range metricChan {
			desc := metric.Desc()
	

		dtoMetric := &dto.Metric{}
		err := metric.Write(dtoMetric)
	
		//.........

		//slist.PushSample():将Counter、Gauge或Untyped类型的指标推送到样本列表。
		switch {
		case dtoMetric.Counter != nil:
			slist.PushSample("", desc.Name(), *dtoMetric.Counter.Value, labels)
		case dtoMetric.Gauge != nil:
			slist.PushSample("", desc.Name(), *dtoMetric.Gauge.Value, labels)
		}
	}

	return nil
}

// 用于从Apache实例中收集原始数据并处理这些数据
func (e *Exporter) collect(ch chan<- prometheus.Metric) error {
	if err != nil {
		ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
		return fmt.Errorf("error scraping Apache: %w", err)
	}
	ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 1)

	scanner := bufio.NewScanner(bytes.NewReader(data))

	for scanner.Scan() {
		key, v := splitkv(scanner.Text())
		if err != nil {
			continue
		}

		switch {
}		

	return nil
}

// 实现了prometheus.Collector接口中的Collect方法,负责从Exporter中收集数据并发送给Prometheus。
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
	e.mutex.Lock() // To protect metrics from concurrent collects.
	defer e.mutex.Unlock()
	if err := e.collect(ch); err != nil {
		level.Error(e.logger).Log("msg", "Error scraping Apache:", "err", err)
		e.scrapeFailures.Inc()
		e.scrapeFailures.Collect(ch)
	}
}
  • Metrics Agent MetricsAgent:负责管理输入插件及其配置。通过 RegisterInput 方法加载和注册各个采集器(如 Apache、Redis 等)的配置。

InputReader:负责启动并定期调用 gatherOnce(),从每个插件中采集数据。采集的数据被处理后,转发到 writer 模块进行写入。

func (r *InputReader) gatherOnce() {
	defer func() {
		if rc := recover(); rc != nil {
			log.Println("E!", r.inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
		}
	}()

	// plugin level, for system plugins
	slist := types.NewSampleList()
	inputs.MayGather(r.input, slist)

	sl := r.input.Process(slist)
	if r.input.Type() == "debug" {
		arr := sl.PopBackAll()
		debug.ReportMetrics(r.input.UUID(), arr)
	} else {
		r.forward(sl)
	}

	instances := inputs.MayGetInstances(r.input)
	if len(instances) == 0 {
		return
	}

	atomic.AddUint64(&r.runCounter, 1)

	for i := 0; i < len(instances); i++ {
		if !instances[i].Initialized() {
			continue
		}
		r.waitGroup.Add(1)
		go func(ins inputs.Instance) {
			defer r.waitGroup.Done()

			it := ins.GetIntervalTimes()
			if it > 0 {
				counter := atomic.LoadUint64(&r.runCounter)
				if counter%uint64(it) != 0 {
					return
				}
			}

			insList := types.NewSampleList()
			inputs.MayGather(ins, insList)
			sl := ins.Process(insList)
			if ins.Type() == "debug" {
				arr := sl.PopBackAll()
				debug.ReportMetrics(ins.UUID(), arr)
			} else {
				r.forward(sl)
			}
		}(instances[i])
	}

	r.waitGroup.Wait()
}

所以我们写完采集器将采集器的包导到metricsAgent里,然后会在初始化时读取该采集器服务。

结果打印

  • 手动输入命令: ./categraf –test –inputs ExporterServerName 打印采集结果
快猫星云 联系方式 快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云
OpenSource
开源版
Flashcat
Flashcat
FlashDuty
Flashduty