logstash消费阿里云kafka消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash消费阿里云kafka消息相关的知识,希望对你有一定的参考价值。

logstash版本: 5.5.3 及以后
logstash消费阿里云kafka信息并返回到elasticsearch系统

配置信息解析:

        bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"]  #kafka系统的连接地址
        client_id => ‘tt‘   #客户端上传到es时,新增字段
        group_id => "CID-LOG"   #kafka分组的信息
        auto_offset_reset => "latest" #从最新的偏移量开始消费
        consumer_threads => 5
        #decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
        topics => ["alikafka-cid-log"] #//数组类型,可配置多个topic
        type => "bhy" #//所有插件通用属性,尤其在input里面配置多个数据源时很有用
        security_protocol => "SASL_SSL"  #kafka连接阿里云的协议
        sasl_mechanism => "ONS"          #kafka阿里云的消费机制, logstash中默认的是 GSSAPI
        jaas_path => "/data/logstash/config/kafka_client_jaas.conf"  # ONS登录信息的路径
        ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘  #证书
        ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘#信任证书
        ssl_keystore_password => "xxxx"      #证书密码
        ssl_truststore_password => "xxxx"    #证书密码

关键信息:
logstash使用ONS机制连接kafka时,需要需要用到一些额外的jar包,可以把开发所使用的jar包,都放到 /data/logstash/vendor/jruby/lib/ 下面。

我的配置模板:

input{
      kafka {
        bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"]
        client_id => ‘tt‘
        group_id => "CID-LOG"
        auto_offset_reset => "latest" #从最新的偏移量开始消费
        consumer_threads => 5
        #decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
        topics => ["alikafka-cid-log"] #//数组类型,可配置多个topic
        type => "bhy" #//所有插件通用属性,尤其在input里面配置多个数据源时很有用
        security_protocol => "SASL_SSL"
        sasl_mechanism => "ONS"
        jaas_path => "/data/logstash/config/kafka_client_jaas.conf"
        ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘
        ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘
        ssl_keystore_password => "xx"
        ssl_truststore_password => "xx"
      }
}

output {
   elasticsearch {
       hosts => ["es-ip:9200"]
       user => ["xxxx"]
       password => ["xxxx"]
       index => ["services"]
   }
   stdout {
       codec=>plain
   }
}

java参数优化路径:

        config/jvm.options

以上是关于logstash消费阿里云kafka消息的主要内容,如果未能解决你的问题,请参考以下文章

logstash 消费数据到kafka异常

阿里云消息队列 Kafka-消息检索实践

Kafka配置SSL(云环境)

Kafka与Logstash的数据采集对接

阿里云日志消费

阿里云日志消费