12/07
2015

ELK Stack Advent Calendar Day 7


用 Logstash 接收 Kafka 里的业务日志再写入 Elasticsearch 已经成为一个常见的选择。但是大多数人随后就会碰到一个问题:logstash-input-kafka 的性能上不去!

这个问题,主要是由于 Logstash 用 JRuby 实现,所以数据从 Kafka 下来到最后流转进 Logstash 里,要经过四五次 Ruby 和 Java 之间的数据结构转换,大大浪费和消耗了 CPU 资源。作为优化,我们可以通过修改默认的 logstash-input-kafka 的 codec 配置为 line,把 Jrjackson 处理流程挪到 logstash-filter-json 里多线程处理,但是也只能提高一倍性能而已。

Logstash 开发组目前也在实现纯 Java 版的 logstash-core-event,但是最终能提高多少,也是未知数。

那么在 Logstash 性能提上去之前,围绕 Kafka 还有什么办法能高效又不失灵活的做到数据处理并写入 Elasticsearch 呢?今天给大家推荐一下携程网开源的 hangout

hangout 采用 YAML 格式配置语法,跟 Elasticsearch 一样,省去了 Logstash 解析 DSL 的复杂度。下面一段配置是 repo 中自带的 example 示例:

inputs:
  - Kafka:
    codec: plain
    encoding: UTF8 # defaut UTF8
    topic: 
      app: 2
    consumer_settings:
      group.id: hangout
      zookeeper.connect: 192.168.1.200:2181
      auto.commit.interval.ms: "1000"
      socket.receive.buffer.bytes: "1048576"
      fetch.message.max.bytes: "1048576"
      num.consumer.fetchers: "4"
  - Kafka:
    codec: json
    topic: 
      web: 1
    consumer_settings:
      group.id: hangout
      zookeeper.connect: 192.168.1.201:2181
      auto.commit.interval.ms: "5000"

filters:
  - Grok:
    match:
      - '^(?<logtime>\S+) (?<user>.+) (-|(?<level>\w+)) %{DATA:msg}$'
    remove_fields: ['message']
  - Add:
    fields:
      test: 'abcd'
    if:
      - '<#if message??>true</#if>'
      - '<#if message?contains("liu")>true<#elseif message?contains("warn")>true</#if>'
  - Date:
    src: logtime
    formats:
      - 'ISO8601'
    remove_fields: ['logtime']
  - Lowercase:
    fields: ['user']
  - Add:
    fields:
      me: 'I am ${user}'
  - Remove:
    fields:
      - logtime
  - Trim:
    fields:
      - user
  - Rename:
    fields:
      me: he
      user: she
  - Gsub:
    fields:
      she: ['c','CCC']
      he: ['(^\w+)|(\w+$)','XXX']
  - Translate:
    source: user
    target: nick
    dictionary_path: /tmp/app.dic
  - KV:
    source: msg
    target: kv
    field_split: ' '
    value_split: '='
    trim: '\t\"'
    trimkey: '\"'
    include_keys: ["a","b","xyz","12"]
    exclude_keys: ["b","c"] # b in excluded
    tag_on_failure: "KVfail"
    remove_fields: ['msg']
  - Convert:
    fields:
      cs_bytes: integer
      time_taken: float
  - URLDecode:
    fields: ["query1","query2"]

outputs:
  - Stdout:
    if:
      - '<#if user=="childe">true</#if>'
  - Elasticsearch:
    cluster: hangoutcluster
    hosts:
      - 192.168.1.200
    index: 'hangout-%{user}-%{+YYYY.MM.dd}'
    index_type: logs # default logs
    bulk_actions: 20000 #default 20000
    bulk_size: 15 # default 15 MB
    flush_interval: 10 # default 10 seconds
    concurrent_requests: 0 # default 0, concurrent_requests设置成大于0的数, 意思着多线程处理, 以我应用的经验,还有是一定OOM风险的,强烈建议设置为0
  - Kafka:
    broker_list: 192.168.1.200:9092
    topic: test2

其 pipeline 设计和 Logstash 不同的是:整个 filter 和 output 流程,都在 Kafka 的 consumer 线程中完成。所以,并发线程数完全是有 Kafka 的 partitions 设置来控制的。

实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。

想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。

blog comments powered by Disqus