Categraf log-agent 配置说明

说明 Categraf 日志采集代理的配置方法,覆盖 logs.toml 中的发送目标、压缩、TLS、Kafka 参数、容器日志采集规则与多行处理。

版本说明

categraf 从 0.1.2 版本开始支持发送日志到日志网关(http/tcp),0.1.8 版本开始支持发送日志到 Kafka。请使用 0.1.8 以上的版本进行日志采集。

⚠️ 开源夜莺不提供 HTTP/TCP 日志接收网关,即开源方案下 categraf 只能把日志发给 Kafka,后续自行从 Kafka 里消费、存储,整个链路跟开源夜莺无关。

conf/logs.toml 配置说明

[logs]
# 在 http 发送日志模式下生效,用于鉴权, 其他模式下占位符
api_key = "TCXDuFMmYKPBaruXzWeDwyKFpk4gxslz"

# 是否开启 log-agent
enable = true

# 日志发送到哪
send_to = "47.111.118.27:9090" 

# 接收日志的后端类型:tcp / http / kafka
send_type = "http"

# 日志对应的 topic,在 Kafka 模式下生效
topic = "test_log_topic" 

# 是否支持压缩:http只支持gzip压缩。 kafka支持多种压缩, 需要配合compression_codec指定。
use_compression = false 

# gzip压缩级别:0 表示不压缩,1-9 表示压缩级别
compression_level=0

# Kafka 支持的压缩: none / gzip / snappy / lz4 / zstd 
compression_codec="none"

# 是否采用 TLS 发送
send_with_tls = false 

# 批量发送的等待时间,单位为秒
batch_wait = 5 

# categraf 记录日志偏移的目录,需要有写权限
run_path = "xxxx" 

# 最大打开的文件数
open_files_limit = 100

# 扫描目录的周期,单位为秒,默认为10秒
scan_period = 10

# udp采集的帧大小
frame_size = 10

## 读取日志缓冲区,行数,默认100
chan_size = 1000

## 有多少协程处理日志规则
pipeline=10

# 默认0,表示按照读取顺序串行写入kafka,如果对日志顺序有要求,请保持该默认配置。 
# 当前都是stream模式, 配置为0,表示发送一条日志,等收到kafka ack后再写入下一条。
# 这个配置不为0,表示异步发送,一个连接上允许异步发送的最大请求个数,然后block等待, 推荐设置为100。
batch_max_concurrence = 0

# 发送缓冲区的大小(行数),如果设置比chan_size小,会自动设置为跟chan_size相同
batch_max_size=1000

# 每次最大发送的内容上限,默认1000000 Byte(与batch_max_size先到者触发发送)
batch_max_content_size=1000000

## 指定kafka版本
kafka_version="3.3.2"
# kafka producer client timeout in seconds
producer_timeout= 10

# 指定写入kafka的分区策略,不配置采用的是random策略
# hash 则是按照采集端 hostname+file path(或者 tcp/udp port 或者 container id) 进行 hash
# partition_strategy="hash"

# #是否开启sasl模式
# sasl_enable = false
# sasl_user = "admin"
# sasl_password = "admin"
# 支持 PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
# v0.3.88版本开始支持 SCRAM-SHA-256/SCRAM-SHA-512
# sasl_mechanism= "PLAIN"
# v1
# sasl_version=1
# set true
# sasl_handshake = true
# optional
# sasl_auth_identity=""

# 开源版本v0.3.40, 企业版v0.3.51及以上版本新增
# 是否开启 pod stdout/stderr 日志采集
# 设置为 false,则不采集 pod 日志
# 设置为 true,则根据 collect_container_all 配置项决定,是采集所有 pod,还是只采集打了相应 annotation 的 pod
enable_collect_container=false

# 设置为 true,无论 pod annotation 是否配置采集,stdout/stderr 都会被采集
# 设置为 false,则 pod annotation 配置了采集的 pod ,其 stdout/stderr 会被采集
collect_container_all = false 

# 开源版本v0.3.83 企业版v0.3.155及以上版本新增
# 容器日志解析, 支持的参数: docker/containerd/podman
# default:dockerd
# docker: 按照json格式解析
# containerd: 按照text格式解析
# podman: 按照text格式解析
container_logs_parser="dockerd"


# 只采集哪些 pod 的 stdout/stderr
container_include=[""] 
# 排除哪些 pod 的 stdout/stderr
container_exclude=[""] 

  # 全局处理规则, 该处不支持多行合并。多行日志合并需要在logs.items中配置
  [[logs.processing_rules]] 
  type = "exclude_at_match"
  name = "exclude_xxx_users"
  pattern="\\w+@flashcat.cloud" 

  # 日志采集配置
  [[logs.items]]
  # 日志类型 file, 还支持tcp和udp类型,当type为tcp和udp时,一定要配置port
  type = "file" 
  # 字符编码支持,可以选 utf-8 gb18030 gb2312 hz-gb2312 gbk big5,默认为 utf-8
  encoding = "utf-8" 
  # 日志路径,支持通配符,用通配符,默认从最新位置开始采集
  # type 为 file 时,path 是必选参数
  path="xxxx" 
  # 当type为tcp和udp时,port是必选参数
  # port = 8080
  # 日志的默认 label,标识日志来源的模块
  source = "tomcat" 
  # 日志的默认 label,标识日志来源的服务
  service = "my_service"
  # 其他额外想加的自定义 label 
  tags = ["monitoring=test", "k=v"] 
  # kafka 模式下,给单个日志处理项配置不同于全局的 topic
  # 如果不配置,则使用全局topic 
  topic = "xxxx"  

    # 日志处理规则
    [[logs.items.log_processing_rules]]
      type = "exclude_at_match"
      name = "exclude_xxx_users"
      pattern="\\w+@flashcat.cloud"

开启日志采集

enable=true 标识日志采集打开

推送鉴权

api_key 是给日志网关推送时,用于鉴权,后期saas产品的主要模式。推送到自建kafka集群不需要配置,当成占位符保持就可。

推送目标地址

send_type 表示推送目标地址的类型,支持kafka/tcp/http,send_to 表示推送的目标地址。 当send_type 为kafka类型时,send_to地址支持逗号分割的broker地址列表,topic选项为指定的kafka topic。

日志压缩

use_compression 表示压缩发送,http下只支持gzip压缩。

compression_codec 表示kafka压缩类型,支持none gzip snappy lz4 zstd (开源版 v0.3.58/企业版v0.3.97开始支持)

compression_level 表示压缩级别,0表示不压缩,1-9表示压缩级别

开启tls

send_with_tls 是否开启tls模式

批量等待

日志等待batch_wait 秒之后再发送

偏移目录

run_path 用于记录日志采集的偏移,categraf重启后,会按照上次的偏移继续采集

open_files_limit

categraf单个进程 最大同时打开的日志限制,默认100个文件

扫描周期

scan_period 表示扫描日志目录的周期

udp采集帧大小

frame_size udp采集的帧大小

采集所有Pod的标准输出

开启

  • enable_collect_container=true 表示开启 pod stdout/stderr日志采集, 根据 collect_container_all 决定采集所有pod,还是采集只打了annotation的. 如果设置为false,则不采集pod日志
  • collect_container_all=true enable_collect_container设置为true的前提下, 该选项也设置为true,表示采集所有pod的stdout/stderr, 不管pod是否配置了annotation

只采集某些pod的stdout/stderr

container_include 规则可以用

  • name:log-demo.* 表示采集container name匹配log-demo字符的pod
  • image:xxx.* 表示采集image 匹配 xxx的pod
  • kube_namespace:xxxx.* 表示采集namespace 匹配xxx下的pod
  • pod_name:xxxx.* 表示采集pod_name 匹配pod xxxx #企业版v0.4.37开始支持的特性

如果你想通过name来采集某个deployment的下所有pod的stdout/stderr,那么需要包含所有container(包括init container的name)。 如果你的deployment输出如下

containerStatuses:
  - containerID: containerd://8e9893a7339b421ef97277cfdc3cac475845f3e79d4603ecc7c69a3efe1fd68e
    ...
    name: istio-proxy
    ...
    state:
      ...
  - containerID: containerd://4e4a1a8bacde48bcc4344fbeb5985a17f16ba279a063a1bd8321c41a5be59bf6
    ...
    name: service-gw
    ...
    state:
      ...
  initContainerStatuses:
  - containerID: containerd://86b81fcb1a356a3f128d60d307e1b7ef711c928c52483635e65c1059a0c82bed
    ...
    name: istio-init
    ...
    state:
      ...
  - containerID: containerd://5f98bc3e2da2c07379579aa34747751085b8fdaa07c076a11a0ea27696220518
    ...
    name: opentelemetry-auto-instrumentation
    state:
      ...

那么你需要这样配置container_include=["name:istio-init|opentelemetry-auto-instrumentation|istio-proxy|service-gw"] 才能采集到这个deployment对应pod的日志,配置少一个container的name, 都会导致service-gw的日志采集不到。

新增pod_name过滤规则后,则可以这样配置container_include=["pod_name:service-gw.*"] 如果只想采集其中某一个pod的日志,那可以这样配置container_include=["pod_name:service-gw-85fdd48d85-h82fw"]

  • 这里说匹配,是表示正则匹配的意思
  • 如果只采集某些pod的日志,需要将container_exclude=["name:.*"] 同时配置上。

排除指定pod的日志采集

container_exclude 规则可以用

  • name:log-demo.* 表示排除container name匹配log-demo字符的pod
  • image:xxx.* 表示排除image 匹配 xxx的pod
  • kube_namespace:xxxx.* 表示排除namespace 匹配xxx下的pod
  • pod_name:xxxx.* 表示排除pod name 为 xxxx的pod #企业版v0.4.37开始支持的特性

例,以下配置表示只采集namespacemonitoringflashcat下pod的stdout/stderr

collect_container_all = true
container_include = ["kube_namespace:monitoring", "kube_namespace:flashcat"]
container_exclude = ["name:.*"]

container_include container_exclude括号中的多个规则是或的关系,categraf判断一个pod的日志是否要采集时,首先通过container_include规则包含当前pod不,如果包含,则立即将pod日志采集走; 如果不包含,则通过container_exclude中的规则判断是否要排除当前的pod日志,如果不排除掉,则采集;如果排除,则不采集。

通过pod annotation来指定采集pod的stdout/stderr

#表示当前的pod需要采集stdout/stderr
categraf/logs.stdout.collect="true" 

# 如果logs.toml中配置了send_type="kafka",这个选项表示当前的pod的stdout/stderr 写入kafka的topic
categraf/logs.stdout.topic="test" 

#表示日志中会将label 和annotaion中以abc为前缀的标签附加到日志tag中
categraf/tags.prefix="abc" 

该选项的值是processing_rules json序列化之后的字符串。
categraf/logs.stdout.processing_rules="[{\"type\":\"exclude_at_match\",\"name\":\"exclude_xxx_users\",\"pattern\":\"\\\\w+@flashcat.cloud\"}]"

具体采集项

logs.items.type 表示采集的类型,当前主要支持file 和 tcp/udp, journald(依赖cgo需要联系我们编译)。

像网络设备的日志、syslog这类日志采集,可以将logs.items.type配置为tcp或者udp,并指定port,此时categraf会自动添加源IP作为日志的tag

type=file时,path是必选参数。 type=tcp或者type=udp ,port是必选参数

以采集文件为例: path表示文件路径,支持统配符。 /path/to/*/*.log 可以采集/path/to/a/*.log ,不能采集 /path/to/a.log

后面的选项会作为日志的label发送,source 标识日志来源模块 ,service标识日志来源的服务。 tags可以附加更多的标签。

[[logs.items]]
type = "file"
path="xxxx" #日志路径,支持统配符,用统配符,默认从最新位置开始采集
source = "tomcat" # 日志的label 标识日志来源的模块
service = "my_service" #日志的label 标识日志来源的服务
tags = ["monitoring=test", "k=v"] # 其他额外想加的tag

如果你想用udp方式接收日志,可以参考如下的配置样例

 [[logs.items]]
  type = "udp"
  port = 5141
  source = "udp"
  service = "network-device"

多行日志和替换处理

单个日志采集 logs.items 和全局 logs 都可以配置“日志处理规则”,单个日志处理规则为logs.items.log_processing_rules,全局处理规则为 logs.processing_rules

exclude_at_match

type = "exclude_at_match"
name = "exclude_xxx_users"
pattern="\\w+@flashcat.cloud"

表示日志中匹配到@flashcat.cloud 的行不发送。

include_at_match

type = "include_at_match"
name = "include_demo"
pattern="^2022*"

表示日志中匹配到2022开头的行才发送。

mask_sequences

type = "mask_sequences"
name = "mask_phone_number"
replace_placeholder = "[186xxx]"
pattern="186\\d{8}"

表示186的手机号会被[186xxx] 代替。

multi_line (不支持全局处理规则)

type = "multi_line"
name = "new_line_with_date"
pattern="\\d{4}-\\d{2}-\\d{2}" (多行规则不需要添加^ ,代码会自动添加)

表示以日期为日志的开头,多行的日志合并为一行进行采集。

企业版v0.4.34及以上版本支持按日期格式自动扫描日志文件

如果日志的path配置如下

[[logs.items]]
type = "file"
path = "/home/flashcat/logs/%{+yyyy}/*/???.log"
...

那日志/home/flashccat/logs/2025/09/abc.log 和 /home/flashccat/logs/2025/08/123.log 则会被收集。 通配符* ?可以直接使用ls 来验证。

path支持的日期格式,以当天是2025 09 29 15:04:05为例

%{+yy}           # 两位年份 25
%{+yyyy}         # 4位年份 2025
%{+MM}           # 月份09
%{+M}            # 月份9, 不带0
%{+dd}           # 日期29
%{+d}            # 日期29 不带0
%{+HH}           # 15 (24小时制)
%{+hh}           # 03 (12小时制)
%{+mm}           # 04 分钟 
%{+ss}           # 05 秒 
%{+SSS}          # 000毫秒
%{+yyyy-MM-dd HH:mm:ss}     # 2025-09-29 15:04:05
%{+ISO8601}      # 2025-09-29T15:04:05.123Z

更新时间 2024-09-20

快猫星云 联系方式 快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云 联系方式
快猫星云