V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
iian
V2EX  ›  Elasticsearch

Elasticsearch 新增字段匹配查询的问题

  •  
  •   iian · 200 天前 · 943 次点击
    这是一个创建于 200 天前的主题,其中的信息可能已经有所发展或是发生改变。
    想使用 es 对上网日志进行分析,流控设备记录的用户访问日志可输出给 kafka ,日志中有用的信息如下:
    时间 域名 URI 账号

    目前思路是通过 logstash 读 kafka 数据,拆分后进 es 中建索引,但是最终想统计的是账号所在的部门对某个站点的访问量,例如:1 个月内,技术部,访问 www.163.com 的次数。

    现在索引里面只有账号信息,账号和部门的对应关系在其他数据库中,应该如何把部门信息与账号匹配后存在 es 中?

    现在想到的两种方式(但是不知道是否可以以及如何实现)

    1.将账号和部门信息存在文件或 redis 里,logstash 有多个 input ,同时从 kafka 和文件读,从 kafka 读一条日志的时候,用账号去匹配部门,然后一起写到 es 中,如果可以,需要用 logstash 如何来实现?

    2.logstash 正常处理日志进 es ,在 es 中新增一个部门字段,然后用账号匹配部门信息(不知道如何实现),写到这个新字段里。这样应该用到 es 的什么功能?

    Elasticsearch 新手,望不吝赐教。
    6 条回复    2023-10-11 09:12:02 +08:00
    justest123
        1
    justest123  
       200 天前
    如果决定使用 kafka -> logstash -> elasticsearch 的方案,结合我以前的经验,大概率是可以在 logstash 这一环节补充账号对应的部门信息的(最近几年没怎么实际用过 logstash 了,不敢打保票)。

    先回答你的两种方式:

    第一种,多个 input 同时读取,这种是不可行的,对多个 input 来说,它们采集到的数据是相互独立的,没有办法结合。

    第二种,es 应该要新增部门字段,但这个字段比较难在写入文档的时候从账号关联到部门,印象里 es 有个 script 脚本功能,但好像都是用在更新、查询的时候,能不能用在文档写入阶段就不懂了(→_→ 有没有大佬有实际应用的案例能长长见识。

    最后,关于怎么实现账号找部门:logstash 的插件分三类,input filter output ,可以尝试 logstash-filter-ruby 这个 filter 插件来写 ruby 代码。

    1. 如果 input 插件读取到的日志信息是 json 格式的,可以用一下 logstash-filter-json 插件,将内容先解析出来。

    2. logstash-filter-ruby 插件中拿到账号,如果可以将账号和部门信息存在文件里,就可以写 ruby 代码读取本地文件,找到部门,将部门字段同时写进 logstash 的 event 对象里。

    3. filter 结束,output 环节照常,es 中新增一个部门字段,写入即可。
    baozhibo
        2
    baozhibo  
       200 天前
    这个我们用的 flink 解决,从 kafka 读日志,flink 日志泛化异步查询 redis 部门员工信息,输出到 es 去。这样 es 里展示的日志就是都有部门信息了。
    iian
        3
    iian  
    OP
       200 天前
    @justest123 #1 logstash-filter-ruby 插件的方式我再看看说明如何来实现
    iian
        4
    iian  
    OP
       200 天前
    @baozhibo #2 flink 的方式我查查资料,是否使用 logstash 倒是无所谓,只要能实现从 kafka 读,中间过程匹配出部门信息,最后写到 es 中就行。
    justest123
        5
    justest123  
       200 天前
    @iian 简单写了个,测试了下可以用,但只能读本地文件确实没直接实时读 redis 方便。

    ```
    input {
    file {
    path => "D:/logstash-test/input.txt"
    }
    }

    filter {
    ruby {
    init => '
    # 引入 json ,方便操作
    require "json"

    # 从本地文件中读取,解析后初始化一个 hash ,key 为 userId ,value 为部门 id
    @@userDepMap = Hash.new
    File.open("D:/logstash-test/filter.txt", "r").each_line do |line|
    userDepArray = line.split(",")
    @@userDepMap[userDepArray[0]] = userDepArray[1]
    end
    '
    code => '
    # 从 message 中拿到消息本身,转 json
    msg = event.get("message")
    msgJson = JSON.parse(msg)

    # 从消息中拿到 userId ,从 hash 中找到对应的部门
    user = msgJson["user"].to_s
    dep = @@userDepMap[user]

    # 将部门保存到消息 json 中
    msgJson["dep"] = dep

    # 将最新的 json 转字符串,重新设置回 event 中
    event.set("message", JSON.generate(msgJson))
    '
    }
    }

    output {
    stdout {

    }
    file {
    path => "D:/logstash-test/output.txt"
    }
    }

    ```
    iian
        6
    iian  
    OP
       199 天前 via iPhone
    @justest123 感谢🙏,我测试看看。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2756 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 11:58 · PVG 19:58 · LAX 04:58 · JFK 07:58
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.