实时日志数据处理——KafkaStormElasticSearch集成

Posted EffectiveMind

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时日志数据处理——KafkaStormElasticSearch集成相关的知识,希望对你有一定的参考价值。

基本介绍

  1. Kafka是一个分布式的流式数据处理平台,主要用于发布、订阅流式数据,类似于消息队列或企业级消息系统。

  2. Storm是一个分布式实时计算系统,主要用于实时分析、在线机器学习、持续计算、分布式RPC、ETL等等。可以通过Storm教程了解Storm相关的核心概念

  3. ElasticSearch是一个高可用、开源的全文本检索、分析引擎

本文通过Java程序实现三个框架的集成,实现数据从Kafka读入,经过Storm拓扑处理,最终流向ElasticSearch。

集成方式

  1. Storm提供了与Kafka集成的Spout支持,通过配置KafkaSpout直接从Kafka指定Topic、Group中读取数据,流向Storm拓扑。

  2. elasticsearch-storm是Elastic官方提供的es与Storm集成的工具包,可通过配置EsBolt实现向ES集群批量添加数据的功能。

Maven依赖配置

 
   
   
 
  1. <dependencies>

  2. <dependency>

  3. <groupId>org.apache.kafka</groupId>

  4. <artifactId>kafka_2.10</artifactId>

  5. <version>0.8.2.1</version>

  6. <exclusions>

  7. <exclusion>

  8. <groupId>org.slf4j</groupId>

  9. <artifactId>slf4j-log4j12</artifactId>

  10. </exclusion>

  11. <exclusion>

  12. <artifactId>log4j</artifactId>

  13. <groupId>log4j</groupId>

  14. </exclusion>

  15. <exclusion>

  16. <artifactId>zookeeper</artifactId>

  17. <groupId>org.apache.zookeeper</groupId>

  18. </exclusion>

  19. <exclusion>

  20. <artifactId>slf4j-api</artifactId>

  21. <groupId>org.slf4j</groupId>

  22. </exclusion>

  23. </exclusions>

  24. </dependency>

  25. <dependency>

  26. <groupId>org.apache.storm</groupId>

  27. <artifactId>storm-core</artifactId>

  28. <version>1.2.1</version>

  29. <scope>provided</scope>

  30. <exclusions>

  31. <exclusion>

  32. <artifactId>clojure</artifactId>

  33. <groupId>org.clojure</groupId>

  34. </exclusion>

  35. <exclusion>

  36. <artifactId>slf4j-api</artifactId>

  37. <groupId>org.slf4j</groupId>

  38. </exclusion>

  39. </exclusions>

  40. </dependency>

  41. <dependency>

  42. <groupId>org.apache.storm</groupId>

  43. <artifactId>storm-kafka</artifactId>

  44. <version>1.2.1</version>

  45. <exclusions>

  46. <exclusion>

  47. <artifactId>zookeeper</artifactId>

  48. <groupId>org.apache.zookeeper</groupId>

  49. </exclusion>

  50. </exclusions>

  51. </dependency>

  52. <dependency>

  53. <groupId>org.apache.zookeeper</groupId>

  54. <artifactId>zookeeper</artifactId>

  55. <version>3.4.6</version>

  56. <exclusions>

  57. <exclusion>

  58. <artifactId>slf4j-api</artifactId>

  59. <groupId>org.slf4j</groupId>

  60. </exclusion>

  61. <exclusion>

  62. <artifactId>slf4j-log4j12</artifactId>

  63. <groupId>org.slf4j</groupId>

  64. </exclusion>

  65. </exclusions>

  66. </dependency>

  67. <dependency>

  68. <groupId>org.elasticsearch</groupId>

  69. <artifactId>elasticsearch-storm</artifactId>

  70. <version>6.6.1</version>

  71. </dependency>

  72. <dependency>

  73. <groupId>commons-logging</groupId>

  74. <artifactId>commons-logging</artifactId>

  75. <version>1.2</version>

  76. </dependency>

  77. <dependency>

  78. <groupId>org.codehaus.jackson</groupId>

  79. <artifactId>jackson-mapper-asl</artifactId>

  80. <version>1.9.12</version>

  81. </dependency>

  82. <dependency>

  83. <groupId>commons-httpclient</groupId>

  84. <artifactId>commons-httpclient</artifactId>

  85. <version>3.1</version>

  86. <exclusions>

  87. <exclusion>

  88. <artifactId>commons-logging</artifactId>

  89. <groupId>commons-logging</groupId>

  90. </exclusion>

  91. </exclusions>

  92. </dependency>

  93. </dependencies>

Storm程序入口代码示例

 
   
   
 
  1. public class StormEsDemoApplication {

  2. public static void main(String[] args) throws Exception {

  3. TopologyBuilder builder = new TopologyBuilder();

  4. // set es bolt properties

  5. Map<String, String> esConf = new HashMap<>(1);

  6. esConf.put("es.input.json", "true");

  7. // build topology according to topics

  8. builder.setSpout("spoutId", new KafkaSpout(getKafkaSpoutConfig(topicName, "groupId")), 4);

  9. // add logic bolt to the topology

  10. builder.setBolt("logicBoltId", new LogicBolt(), 2)

  11. .shuffleGrouping("spoutId");

  12. // add es bolt to the topology

  13. // new ESBolt, target is index template

  14. EsBolt esBolt = new EsBolt("target", esConf);

  15. builder.setBolt("esBoltId", esBolt, 2)

  16. .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)

  17. .shuffleGrouping("logicBoltId");

  18. Config stormConf = new Config();

  19. stormConf.setDebug(false);

  20. stormConf.put(Config.WORKER_HEAP_MEMORY_MB, 1024);

  21. stormConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 1024);

  22. stormConf.setNumAckers(4);

  23. stormConf.setNumWorkers(4);

  24. stormConf.setMessageTimeoutSecs(30);

  25. stormConf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);

  26. // set esBolt properties to storm config

  27. stormConf.put("es.nodes", "127.0.0.1");

  28. stormConf.put("es.port", "5555");

  29. stormConf.put("es.storm.bolt.flush.entries.size", 500);

  30. // when run service on the dev env, start service with local mode

  31. if ("dev".equals(SERVICE_ENV)) {

  32. LocalCluster cluster = new LocalCluster();

  33. cluster.submitTopology("app_log_persistence", stormConf, builder.createTopology());

  34. } else {

  35. StormSubmitter.submitTopology("topo_name", stormConf, builder.createTopology());

  36. }

  37. }

  38. /**

  39. * init the kafka spout config

  40. *

  41. * @param topicName topic name

  42. * @param groupId group id

  43. * @return SpoutConfig

  44. */

  45. private static SpoutConfig getKafkaSpoutConfig(String topicName, String groupId) {

  46. BrokerHosts brokerHosts = new ZkHosts("kafka_ZK_SERVERs", "broker_root");

  47. SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topicName, "consumer_root", groupId);

  48. // spout consume Kafka, consumer's offset write zk address config(default storm config zk address)

  49. List<String> zkServers = new ArrayList<>(5);

  50. spoutConf.zkServers = zkServers;

  51. spoutConf.zkPort = 2189;

  52. spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());

  53. return spoutConf;

  54. }

  55. }

小结

Storm程序通过Spout读取Kafka中数据,数据通过设置不同的分组方式流向自行实现的逻辑处理单元Bolt,最后将ES官方框架提供的EsBolt配置到拓扑中,配置相关参数,实现批量写入的效果。示例代码中没有给出逻辑Bolt的实现方式,可以结合自身业务自行实现。程序实现完毕、打包部署后,若发现Cannot flush non-initialized write operation异常,可参考我的另一篇博客解决。

题图:

以上是关于实时日志数据处理——KafkaStormElasticSearch集成的主要内容,如果未能解决你的问题,请参考以下文章

如何使用火花流处理实时流数据/日志?

广告业务系统 之 数据中转站 —— “日志中心-实时服务监控”

使用 apache spark 流式处理实时日志

日志服务Python消费组实战:实时跨域监测多日志库数据

如何实时查看linux下的日志

用于 Web 访问日志的实时数据仓库