V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
37Y37
V2EX  ›  程序员

Logstash 读取 Kafka 数据写入 HDFS

  •  
  •   37Y37 · 2019-03-27 09:21:02 +08:00 · 1627 次点击
    这是一个创建于 2100 天前的主题,其中的信息可能已经有所发展或是发生改变。

    强大的功能,丰富的插件,让 logstash 在数据处理的行列中出类拔萃

    通常日志数据除了要入 ES 提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇 ELK 的文章介绍过利用 logstash 将 kafka 的数据写入到 elasticsearch 集群,这篇文章将会介绍如何通过 logstash 将数据写入 HDFS

    本文所有演示均基于 logstash 6.6.2 版本

    数据收集

    logstash 默认不支持数据直接写入 HDFS,官方推荐的 output 插件是webhdfs,webhdfs 使用 HDFS 提供的 API 将数据写入 HDFS 集群

    插件安装

    插件安装比较简单,直接使用内置命令即可

    # cd /home/opt/tools/logstash-6.6.2
    # ./bin/logstash-plugin install logstash-output-webhdfs
    

    配置 hosts

    HDFS 集群内通过主机名进行通信所以 logstash 所在的主机需要配置 hadoop 集群的 hosts 信息

    # cat /etc/hosts
    192.168.107.154 master01
    192.168.107.155 slave01
    192.168.107.156 slave02
    192.168.107.157 slave03
    

    如果不配置 host 信息,可能会报下边的错

    [WARN ][logstash.outputs.webhdfs ] Failed to flush outgoing items
    

    logstash 配置

    kafka 里边的源日志格式可以参考这片文章:ELK 日志系统之使用 Rsyslog 快速方便的收集 Nginx 日志

    logstash 的配置如下:

    # cat config/indexer_rsyslog_nginx.conf
    input {
        kafka {
            bootstrap_servers => "10.82.9.202:9092,10.82.9.203:9092,10.82.9.204:9092"
            topics => ["rsyslog_nginx"]
            codec => "json"
        }
    }
    
    filter {
        date {
            match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]
            target => "time_local"
        }
    
        ruby {
            code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))"
        }
    
        ruby {
            code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))"
        }
    }
    
    output {
        webhdfs {
            host => "master01"
            port => 50070
            user => "hadmin"
            path => "/logs/nginx/%{index.date}/%{index.hour}.log"
            codec => "json"
        }
        stdout { codec => rubydebug }
    }
    

    logstash 配置文件分为三部分:input、filter、output

    input指定源在哪里,我们是从 kafka 取数据,这里就写 kafka 集群的配置信息,配置解释:

    • bootstrap_servers:指定 kafka 集群的地址
    • topics:需要读取的 topic 名字
    • codec:指定下数据的格式,我们写入的时候直接是 json 格式的,这里也配置 json 方便后续处理

    filter可以对 input 输入的内容进行过滤或处理,例如格式化,添加字段,删除字段等等

    • 这里我们主要是为了解决生成 HDFS 文件时因时区不对差 8 小时导致的文件名不对的问题,后边有详细解释

    output指定处理过的日志输出到哪里,可以是 ES 或者是 HDFS 等等,可以同时配置多个,webhdfs 主要配置解释:

    • host:为 hadoop 集群 namenode 节点名称
    • user:为启动 hdfs 的用户名,不然没有权限写入数据
    • path:指定存储到 HDFS 上的文件路径,这里我们每日创建目录,并按小时存放文件
    • stdout:打开主要是方便调试,启动 logstash 时会在控制台打印详细的日志信息并格式化方便查找问题,正式环境建议关闭

    webhdfs 还有一些其他的参数例如compression,flush_size,standby_host,standby_port等可查看官方文档了解详细用法

    启动 logstash

    # bin/logstash -f config/indexer_rsyslog_nginx.conf
    

    因为 logstash 配置中开了stdout输出,所以能在控制台看到格式化的数据,如下:

    {
                   "server_addr" => "172.18.90.17",
               "http_user_agent" => "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2 like Mac OS X) AppleWebKit/602.3.12 (KHTML, like Gecko) Mobile/14C92 Safari/601.1 wechatdevtools/1.02.1902010 MicroMessenger/6.7.3 Language/zh_CN webview/ token/e7b92168159736c30401a55589317d8c",
                   "remote_addr" => "172.18.101.0",
                        "status" => 200,
                  "http_referer" => "https://ops-coffee.cn/wx02935bb29080a7b4/devtools/page-frame.html",
        "upstream_response_time" => "0.056",
                          "host" => "ops-coffee.cn",
                   "request_uri" => "/api/community/v2/news/list",
                  "request_time" => 0.059,
               "upstream_status" => "200",
                      "@version" => "1",
          "http_x_forwarded_for" => "192.168.106.100",
                    "time_local" => 2019-03-18T11:03:45.000Z,
               "body_bytes_sent" => 12431,
                    "@timestamp" => 2019-03-18T11:03:45.984Z,
                    "index.date" => "20190318",
                    "index.hour" => "19",
                "request_method" => "POST",
                 "upstream_addr" => "127.0.0.1:8181"
    }
    

    查看 hdfs 发现数据已经按照定义好的路径正常写入

    $ hadoop fs -ls /logs/nginx/20190318/19.log
    -rw-r--r--   3 hadmin supergroup       7776 2019-03-18 19:07 /logs/nginx/20190318/19.log
    

    至此 kafka 到 hdfs 数据转储完成

    遇到的坑

    HDFS 按小时生成文件名不对

    logstash 在处理数据时会自动生成一个字段@timestamp,默认情况下这个字段存储的是 logstash 收到消息的时间,使用的是 UTC 时区,会跟国内的时间差 8 小时

    我们 output 到 ES 或者 HDFS 时通常会使用类似于rsyslog-nginx-%{+YYYY.MM.dd}这样的变量来动态的设置 index 或者文件名,方便后续的检索,这里的变量YYYY使用的就是@timestamp中的时间,因为时区的问题生成的 index 或者文件名就差 8 小时不是很准确,这个问题在 ELK 架构中因为全部都是用的 UTC 时间且最终 kibana 展示时会自动转换我们无需关心,但这里要生成文件就需要认真对待下了

    这里采用的方案是解析日志中的时间字段time_local,然后根据日志中的时间字段添加两个新字段index.dateindex.hour来分别标识日期和小时,在 output 的时候使用这两个新加的字段做变量来生成文件

    logstash filter 配置如下:

    filter {
        # 匹配原始日志中的 time_local 字段并设置为时间字段
        # time_local 字段为本地时间字段,没有 8 小时的时间差
        date {
            match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]
            target => "time_local"
        }
    
        # 添加一个 index.date 字段,值设置为 time_local 的日期
        ruby {
            code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))"
        }
    
        # 添加一个 index.hour 字段,值设置为 time_local 的小时
        ruby {
            code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))"
        }
    }
    

    output 的 path 中配置如下

    path => "/logs/nginx/%{index.date}/%{index.hour}.log"
    

    HDFS 记录多了时间和 host 字段

    在没有指定 codec 的情况下,logstash 会给每一条日志添加时间和 host 字段,例如:

    源日志格式为

    ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
    

    经过 logstash 处理后多了时间和 host 字段

    2019-03-19T06:28:07.510Z %{host}  ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
    

    如果不需要我们可以指定最终的 format 只取 message,解决方法为在 output 中添加如下配置:

    codec => line {
        format => "%{message}"
    }
    

    同时 output 到 ES 和 HDFS

    在实际应用中我们需要同时将日志数据写入 ES 和 HDFS,那么可以直接用下边的配置来处理

    # cat config/indexer_rsyslog_nginx.conf
    input {
        kafka {
            bootstrap_servers => "localhost:9092"
            topics => ["rsyslog_nginx"]
            codec => "json"
        }
    }
    
    filter {
        date {  
            match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
        }
    
        ruby {
            code => "event.set('index.date', event.get('@timestamp').time.localtime.strftime('%Y%m%d'))"
        }
    
        ruby {
            code => "event.set('index.hour', event.get('@timestamp').time.localtime.strftime('%H'))"
        }
    
    
    }
    
    output {
        elasticsearch {
            hosts => ["192.168.106.203:9200"]
            index => "rsyslog-nginx-%{+YYYY.MM.dd}"
        }
    
        webhdfs {
            host => "master01"
            port => 50070
            user => "hadmin"
            path => "/logs/nginx/%{index.date}/%{index.hour}.log"
            codec => "json"
        }
    }
    

    这里我使用 logstash 的 date 插件将日志中的"time_local"字段直接替换为了 @timestamp,这样做有什么好处呢?

    logstash 默认生成的 @timestamp 字段记录的时间是 logstash 接收到消息的时间,这个时间可能与日志产生的时间不同,而我们往往需要关注的时间是日志产生的时间,且在 ELK 架构中 Kibana 日志输出的默认顺序就是按照 @timestamp 来排序的,所以往往我们需要将默认的 @timestamp 替换成日志产生的时间,替换方法就用到了 date 插件,date 插件的用法如下

    date {  
        match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]
        target => "@timestamp"
    }
    

    match:匹配日志中的时间字段,这里为 time_local

    target:将 match 匹配到的时间戳存储到给定的字段中,默认不指定的话就存到 @timestamp 字段

    另外还有参数可以配置:timezone,locale,tag_on_failure等,具体可查看官方文档


    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5212 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 07:15 · PVG 15:15 · LAX 23:15 · JFK 02:15
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.