Categraf log-agent 配置说明
版本说明
categraf 从 0.1.2 版本开始支持发送日志到日志网关(http/tcp),0.1.8 版本开始支持发送日志到 Kafka。请使用 0.1.8 以上的版本进行日志采集。
conf/logs.toml
配置说明
[logs]
# 在 http 发送日志模式下生效,用于鉴权, 其他模式下占位符
api_key = "TCXDuFMmYKPBaruXzWeDwyKFpk4gxslz"
# 是否开启 log-agent
enable = true
# 日志发送到哪
send_to = "47.111.118.27:9090"
# 接收日志的后端类型:tcp / http / kafka
send_type = "http"
# 日志对应的 topic,在 Kafka 模式下生效
topic = "test_log_topic"
# 是否支持压缩:http只支持gzip压缩。 kafka支持多种压缩, 需要配合compression_codec指定。
use_compression = false
# gzip压缩级别:0 表示不压缩,1-9 表示压缩级别
compression_level=0
# Kafka 支持的压缩: none / gzip / snappy / lz4 / zstd
compression_codec="none"
# 是否采用 TLS 发送
send_with_tls = false
# 批量发送的等待时间,单位为秒
batch_wait = 5
# categraf 记录日志偏移的目录,需要有写权限
run_path = "xxxx"
# 最大打开的文件数
open_files_limit = 100
# 扫描目录的周期,单位为秒,默认为10秒
scan_period = 10
# udp采集的帧大小
frame_size = 10
## 读取日志缓冲区,行数,默认100
chan_size = 1000
## 有多少协程处理日志规则
pipeline=10
# 默认0,表示按照读取顺序串行写入kafka,如果对日志顺序有要求,请保持该默认配置。
# 当前都是stream模式, 配置为0,表示发送一条日志,等收到kafka ack后再写入下一条。
# 这个配置不为0,表示异步发送,一个连接上允许异步发送的最大请求个数,然后block等待, 推荐设置为100。
batch_max_concurrence = 0
# 发送缓冲区的大小(行数),如果设置比chan_size小,会自动设置为跟chan_size相同
batch_max_size=1000
# 每次最大发送的内容上限,默认1000000 Byte(与batch_max_size先到者触发发送)
batch_max_content_size=1000000
## 指定kafka版本
kafka_version="3.3.2"
# kafka producer client timeout in seconds
producer_timeout= 10
# 指定写入kafka的分区策略,不配置采用的是random策略
# hash 则是按照采集端 hostname+file path(或者 tcp/udp port 或者 container id) 进行 hash
# partition_strategy="hash"
# #是否开启sasl模式
# sasl_enable = false
# sasl_user = "admin"
# sasl_password = "admin"
# 支持 PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
# v0.3.88版本开始支持 SCRAM-SHA-256/SCRAM-SHA-512
# sasl_mechanism= "PLAIN"
# v1
# sasl_version=1
# set true
# sasl_handshake = true
# optional
# sasl_auth_identity=""
# 开源版本v0.3.40, 企业版v0.3.51及以上版本新增
# 是否开启 pod stdout/stderr 日志采集
# 设置为 false,则不采集 pod 日志
# 设置为 true,则根据 collect_container_all 配置项决定,是采集所有 pod,还是只采集打了相应 annotation 的 pod
enable_collect_container=false
# 设置为 true,无论 pod annotation 是否配置采集,stdout/stderr 都会被采集
# 设置为 false,则 pod annotation 配置了采集的 pod ,其 stdout/stderr 会被采集
collect_container_all = false
# 开源版本v0.3.83 企业版v0.3.155及以上版本新增
# 容器日志解析, 支持的参数: docker/containerd/podman
# default:dockerd
# docker: 按照json格式解析
# containerd: 按照text格式解析
# podman: 按照text格式解析
container_logs_parser="dockerd"
# 只采集哪些 pod 的 stdout/stderr
container_include=[""]
# 排除哪些 pod 的 stdout/stderr
container_exclude=[""]
# 全局处理规则, 该处不支持多行合并。多行日志合并需要在logs.items中配置
[[logs.processing_rules]]
type = "exclude_at_match"
name = "exclude_xxx_users"
pattern="\\w+@flashcat.cloud"
# 日志采集配置
[[logs.items]]
# 日志类型 file, 还支持tcp和udp类型,当type为tcp和udp时,一定要配置port
type = "file"
# 字符编码支持,可以选 utf-8 gb18030 gb2312 hz-gb2312 gbk big5,默认为 utf-8
encoding = "utf-8"
# 日志路径,支持通配符,用通配符,默认从最新位置开始采集
# type 为 file 时,path 是必选参数
path="xxxx"
# 当type为tcp和udp时,port是必选参数
# port = 8080
# 日志的默认 label,标识日志来源的模块
source = "tomcat"
# 日志的默认 label,标识日志来源的服务
service = "my_service"
# 其他额外想加的自定义 label
tags = ["monitoring=test", "k=v"]
# kafka 模式下,给单个日志处理项配置不同于全局的 topic
# 如果不配置,则使用全局topic
topic = "xxxx"
# 日志处理规则
[[logs.items.log_processing_rules]]
type = "exclude_at_match"
name = "exclude_xxx_users"
pattern="\\w+@flashcat.cloud"
开启日志采集
enable=true 标识日志采集打开
推送鉴权
api_key 是给日志网关推送时,用于鉴权,后期saas产品的主要模式。推送到自建kafka集群不需要配置,当成占位符保持就可。
推送目标地址
send_type 表示推送目标地址的类型,支持kafka/tcp/http,send_to 表示推送的目标地址。 当send_type 为kafka类型时,send_to地址支持逗号分割的broker地址列表,topic选项为指定的kafka topic。
日志压缩
use_compression 表示压缩发送,http下只支持gzip压缩。
compression_codec 表示kafka压缩类型,支持none gzip snappy lz4 zstd (开源版 v0.3.58/企业版v0.3.97开始支持)
compression_level 表示压缩级别,0表示不压缩,1-9表示压缩级别
开启tls
send_with_tls 是否开启tls模式
批量等待
日志等待batch_wait 秒之后再发送
偏移目录
run_path 用于记录日志采集的偏移,categraf重启后,会按照上次的偏移继续采集
open_files_limit
categraf单个进程 最大同时打开的日志限制,默认100个文件
扫描周期
scan_period 表示扫描日志目录的周期
udp采集帧大小
frame_size udp采集的帧大小
采集所有Pod的标准输出
开启
enable_collect_container=true
表示开启 pod stdout/stderr日志采集, 根据 collect_container_all 决定采集所有pod,还是采集只打了annotation的. 如果设置为false,则不采集pod日志collect_container_all=true
enable_collect_container设置为true的前提下, 该选项也设置为true,表示采集所有pod的stdout/stderr, 不管pod是否配置了annotation
只采集某些pod的stdout/stderr
container_include
规则可以用
name:log-demo.*
表示采集container name
匹配log-demo字符的podimage:xxx.*
表示采集image
匹配 xxx的podkube_namespace:xxxx.*
表示采集namespace
匹配xxx下的pod
注:
- 这里说匹配,是表示正则匹配的意思
- 如果只采集某些pod的日志,需要将
container_exclude=["name:.*"]
同时配置上。
排除指定pod的日志采集
container_exclude
规则可以用
name:log-demo.*
表示排除container name
匹配log-demo字符的podimage:xxx.*
表示排除image
匹配 xxx的podkube_namespace:xxxx.*
表示排除namespace
匹配xxx下的pod
例,以下配置表示只采集namespace
为monitoring
和flashcat
下pod的stdout/stderr
collect_container_all = true
container_include = ["kube_namespace:monitoring", "kube_namespace:flashcat"]
container_exclude = ["name:.*"]
通过pod annotation来指定采集pod的stdout/stderr
#表示当前的pod需要采集stdout/stderr
categraf/logs.stdout.collect="true"
# 如果logs.toml中配置了send_type="kafka",这个选项表示当前的pod的stdout/stderr 写入kafka的topic
categraf/logs.stdout.topic="test"
#表示日志中会将label 和annotaion中以abc为前缀的标签附加到日志tag中
categraf/tags.prefix="abc"
该选项的值是processing_rules json序列化之后的字符串。
categraf/logs.stdout.processing_rules="[{\"type\":\"exclude_at_match\",\"name\":\"exclude_xxx_users\",\"pattern\":\"\\\\w+@flashcat.cloud\"}]"
具体采集项
logs.items.type
表示采集的类型,当前主要支持file 和 tcp/udp, journald(依赖cgo需要联系我们编译)。
像网络设备的日志、syslog这类日志采集,可以将logs.items.type
配置为tcp或者udp,并指定port,此时categraf会自动添加源IP作为日志的tag
type=file时,path是必选参数。 type=tcp或者type=udp ,port是必选参数
以采集文件为例: path
表示文件路径,支持统配符。 /path/to/*/*.log
可以采集/path/to/a/*.log
,不能采集 /path/to/a.log
后面的选项会作为日志的label发送,source
标识日志来源模块 ,service
标识日志来源的服务。 tags
可以附加更多的标签。
[[logs.items]]
type = "file"
path="xxxx" #日志路径,支持统配符,用统配符,默认从最新位置开始采集
source = "tomcat" # 日志的label 标识日志来源的模块
service = "my_service" #日志的label 标识日志来源的服务
tags = ["monitoring=test", "k=v"] # 其他额外想加的tag
多行日志和替换处理
单个日志采集 logs.items 和全局 logs 都可以配置“日志处理规则”,单个日志处理规则为logs.items.logs_processing_rules
,全局处理规则为 logs.processing_rules
exclude_at_match
type = "exclude_at_match"
name = "exclude_xxx_users"
pattern="\\w+@flashcat.cloud"
表示日志中匹配到@flashcat.cloud 的行不发送。
include_at_match
type = "include_at_match"
name = "include_demo"
pattern="^2022*"
表示日志中匹配到2022开头的行才发送。
mask_sequences
type = "mask_sequences"
name = "mask_phone_number"
replace_placeholder = "[186xxx]"
pattern="186\\d{8}"
表示186的手机号会被[186xxx] 代替。
multi_line (不支持全局处理规则)
type = "multi_line"
name = "new_line_with_date"
pattern="\\d{4}-\\d{2}-\\d{2}" (多行规则不需要添加^ ,代码会自动添加)
表示以日期为日志的开头,多行的日志合并为一行进行采集。