实时日志数据处理——KafkaStormElasticSearch集成
Posted EffectiveMind
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时日志数据处理——KafkaStormElasticSearch集成相关的知识,希望对你有一定的参考价值。
基本介绍
Kafka是一个分布式的流式数据处理平台,主要用于发布、订阅流式数据,类似于消息队列或企业级消息系统。
Storm是一个分布式实时计算系统,主要用于实时分析、在线机器学习、持续计算、分布式RPC、ETL等等。可以通过Storm教程了解Storm相关的核心概念
ElasticSearch是一个高可用、开源的全文本检索、分析引擎
本文通过Java程序实现三个框架的集成,实现数据从Kafka读入,经过Storm拓扑处理,最终流向ElasticSearch。
集成方式
Storm提供了与Kafka集成的Spout支持,通过配置KafkaSpout直接从Kafka指定Topic、Group中读取数据,流向Storm拓扑。
elasticsearch-storm是Elastic官方提供的es与Storm集成的工具包,可通过配置EsBolt实现向ES集群批量添加数据的功能。
Maven依赖配置
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>clojure</artifactId>
<groupId>org.clojure</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.2.1</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-storm</artifactId>
<version>6.6.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.12</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
Storm程序入口代码示例
public class StormEsDemoApplication {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// set es bolt properties
Map<String, String> esConf = new HashMap<>(1);
esConf.put("es.input.json", "true");
// build topology according to topics
builder.setSpout("spoutId", new KafkaSpout(getKafkaSpoutConfig(topicName, "groupId")), 4);
// add logic bolt to the topology
builder.setBolt("logicBoltId", new LogicBolt(), 2)
.shuffleGrouping("spoutId");
// add es bolt to the topology
// new ESBolt, target is index template
EsBolt esBolt = new EsBolt("target", esConf);
builder.setBolt("esBoltId", esBolt, 2)
.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)
.shuffleGrouping("logicBoltId");
Config stormConf = new Config();
stormConf.setDebug(false);
stormConf.put(Config.WORKER_HEAP_MEMORY_MB, 1024);
stormConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 1024);
stormConf.setNumAckers(4);
stormConf.setNumWorkers(4);
stormConf.setMessageTimeoutSecs(30);
stormConf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);
// set esBolt properties to storm config
stormConf.put("es.nodes", "127.0.0.1");
stormConf.put("es.port", "5555");
stormConf.put("es.storm.bolt.flush.entries.size", 500);
// when run service on the dev env, start service with local mode
if ("dev".equals(SERVICE_ENV)) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("app_log_persistence", stormConf, builder.createTopology());
} else {
StormSubmitter.submitTopology("topo_name", stormConf, builder.createTopology());
}
}
/**
* init the kafka spout config
*
* @param topicName topic name
* @param groupId group id
* @return SpoutConfig
*/
private static SpoutConfig getKafkaSpoutConfig(String topicName, String groupId) {
BrokerHosts brokerHosts = new ZkHosts("kafka_ZK_SERVERs", "broker_root");
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topicName, "consumer_root", groupId);
// spout consume Kafka, consumer's offset write zk address config(default storm config zk address)
List<String> zkServers = new ArrayList<>(5);
spoutConf.zkServers = zkServers;
spoutConf.zkPort = 2189;
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
return spoutConf;
}
}
小结
Storm程序通过Spout读取Kafka中数据,数据通过设置不同的分组方式流向自行实现的逻辑处理单元Bolt,最后将ES官方框架提供的EsBolt配置到拓扑中,配置相关参数,实现批量写入的效果。示例代码中没有给出逻辑Bolt的实现方式,可以结合自身业务自行实现。程序实现完毕、打包部署后,若发现Cannot flush non-initialized write operation异常,可参考我的另一篇博客解决。
题图:
以上是关于实时日志数据处理——KafkaStormElasticSearch集成的主要内容,如果未能解决你的问题,请参考以下文章