fluentd实践

Fluentd在日志收集和日志缓存方面有很大的应用,fluentd组件很多,这里主要针对微服务架构的部分组件做一些例子

关于fluend安装和日志直接聚合为Prometheus提供数据查看这里

手动抛指标给fluentd

echo '{"message":"hello"}' | fluent-cat debug.log --host testserver --port 24225

直接使用tail读取日志并指定key数据格式

<source>
  @type tail
  path /a.log
  pos_file /a.pos
  tag a.log
  format /^(?<count>\d+)$/
  types count:integer
  read_from_head true
</source>

这里的type可用的格式为integer (“int” would NOT work!)、string、bool、float、time、time、array read_from_head表示从头开始读

一个tag复制成多份不同tag分别处理

  • 安装rewrite_tag_filter
  gem install fluent-plugin-rewrite-tag-filter
  • 配置文件如下

    <match a.*.*>
      @type copy
      <store>
        @type rewrite_tag_filter
        <rule>
        key message
        tag aaa.${tag}
        pattern .*
        </rule>
      </store>
      <store>
        @type rewrite_tag_filter
        <rule>
          key message
          tag bbb.${tag_parts[1]}
          pattern .*
        </rule>
      </store>
    </match>
    

    这里使用了fluentd的placeholder属性${tag_parts[1]}表示取第1个用.分隔参数,如a.b.c。rewrite_tag_filter还有其他作用看字面意思就能领会了哦,这里不做赘述。

fluentd处理docker日志包装的多行报错

  • docker对日志包装了一层json,导致读起来比较头疼,fluentd现在提供了如下解决方案,如果你使用fluent-bit可以参考此处

  • 首先读取json格式的docker log

  <source>
    @type tail
    path /a.log
    pos_file /a.pos
    tag a.b.log
    format json
  </source>
  • 安装fluent-plugin-concat
  gem install fluent-plugin-concat
  • 联合多行日志
  <filter a.b.log>
    @type concat
    key log
    multiline_start_regexp /^\d{4}\-\d{2}\-\d{2}.*/
    # stream_identity_key container_id
    # multiline_end_regexp xxx
  </filter>

stream_identity_keymultiline_end_regexp看字面意思就行了

  • 编辑文件b.log
  {"log":"2017-11-09 08:54:27.148  INFO [devops-service,,,] 1 --- [           main] c.h.h.c.devops.DevopsServiceApplication  : Started DevopsServiceApplication in 50.341 seconds (JVM running for 53.521)\n","stream":"stdout","time":"2017-11-09T08:54:27.149216543Z"}
  {"log":"2017-11-09 08:54:39.856  INFO [devops-service,,,] 1 --- [0.1-8061-exec-1] o.a.c.c.C.[Tomcat-1].[localhost].[/]     : Initializing Spring FrameworkServlet 'dispatcherServlet'\n","stream":"stdout","time":"2017-11-09T08:54:39.856714983Z"}i
  {"log":"\u00009INFO [devops-service,,,] 1 --- [0.1-8061-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started\n","stream":"stdout","time":"2017-11-09T08:54:39.85678153Z"}
  {"log":"2017-11-09 08:54:39.881  INFO [devops-service,,,] 1 --- [0.1-8061-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 25 ms\n","stream":"stdout","time":"2017-11-09T08:54:39.881980003Z"}
  • 执行下面命令测试
  cat b.log >> a.log
  • 查看结果变成了两条,成功!
  1970-01-01 00:33:37.856714983 +0000 a.b.log: {"log":"2017-11-09 08:54:27.148  INFO [devops-service,,,] 1 --- [           main] c.h.h.c.devops.DevopsServiceApplication  : Started DevopsServiceApplication in 50.341 seconds (JVM running for 53.521)\n","stream":"stdout"}
  1970-01-01 00:33:37.881980003 +0000 a.b.log: {"log":"2017-11-09 08:54:39.856  INFO [devops-service,,,] 1 --- [0.1-8061-exec-1] o.a.c.c.C.[Tomcat-1].[localhost].[/]     : Initializing Spring FrameworkServlet 'dispatcherServlet'\n\n\u00009INFO [devops-service,,,] 1 --- [0.1-8061-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started\n","stream":"stdout"}

发送给kafka(zookeeper)

  • 安装kafka插件
  gem install fluent-plugin-kafka zookeeper
  • 配置使用kafka输出
  <match *.**>
    @type kafka
    brokers "share.hd.wenqi.us:9092"
    default_topic "test"
    get_kafka_client_log true
    output_include_tag true
    output_data_type json
  </match>

这里kafka可以替换给kafka_buffered,多个brokers使用,号分隔,其他可选参数点击这里

从kafka中读取

  • 安装kafka插件
  gem install fluent-plugin-kafka zookeeper
  • 安装rewrite_tag_filter
  gem install fluent-plugin-rewrite-tag-filter
  • 配置使用kafka输入
  <source>
    @type kafka
    brokers "share.hd.wenqi.us:9092"
    topics "test"
    format "json"
    message_key "message"
    add_prefix "xx"
    add_suffix "yy"
  </source>
  <match xx.test.yy>
    @type rewrite_tag_filter
    <rule>
      key tag
      tag ab.b.c
      pattern "ab.b.c"
    </rule>
  </match>
  <match **>
     @type stdout
  </match>

修改record

  • 安装record_modifier

    gem install fluent-plugin-record-modifier
    
  • 修改把记录中的key:message改为log

    <filter **>
    @type record_modifier
    remove_keys "message"
    <record>
      log ${record['message']}
    </record>
    </filter>
    
  • message加后缀irx

    <filter **>
    @type record_modifier
    <record>
      message ${record['message']}_irx
    </record>
    </filter>
    

fluentd处理中文编码

windows的日志可能存在着不同于Linux常规的编码,fluentd提供了读文件指定编码的方法

  • 编辑source
  <source>
    @type tail
    path /jdb20171110.txt
    pos_file /ab.pos
    read_from_head true
    tag a.b.log
    format none
    from_encoding 'utf-16le'
    encoding UTF-8
  </source>

即可将utf-16le的编码转换为UTF-8

VinkDong

open to open