Kafka Streams 开发单词计数应用
Posted 鄙人阿彬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Streams 开发单词计数应用相关的知识,希望对你有一定的参考价值。
1、添加依赖(IDEA工具操作)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
2、编写代码(IDEA工具操作)
根据上述业务流程分析得出,单词数据通过自定义处处理器接收并执行相应业务计算,因此创建LogProcessor类,并且继承StreamsAPI中的P ocessor接口,在Processor接口中,定义了以下3个方法。
(1)init(ProcessorContextprocessorContext):初如治化上下文对象。
(2)process(Key,Value):每接收到一条消息时,都会调用该方法处理并更新状态进行存储。
(3)close():关闭处理器,这里可以做一些资源清理理工作。 Kafka Streams单词计数详细代码如下所示。
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.util.HashMap;
public class LogProcessor implements Processor<byte[],byte[]>
private ProcessorContext processorContext;
public void init(ProcessorContext processorContext)
this.processorContext = processorContext;
public void process(byte[] key, byte[] value)
String inputOri = new String(value);
HashMap <String,Integer>map = new HashMap<String, Integer>();
int times = 1;
if (inputOri.contains(""))
//截取字段
String[] words = inputOri.split(" ");
for (String word : words)
if (map.containsKey(word))
map.put(word,map.get(word)+1);
else
map.put(word,times);
inputOri = map.toString();
processorContext.forward(key,inputOri.getBytes());
public void close()
单词计数的业务功能开发完之后,Kafka Streams 需要编写一个运行主程序的类App,来测试LogProcessor业务程序,具体代码如下:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class App
public static void main(String[] args)
//声明来源主题
String fromTopic = "test1";
//声明目标主题
String toTopic = "test2";
//设置参数信息
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"master:9092,slave1:9092,slave2:9092");
//实例化StreamsConfig对象
StreamsConfig config = new StreamsConfig(props);
//构建拓扑结构
Topology topology = new Topology();
//添加原处理节点,为原处理节点指定名称和他订阅的主题
topology.addSource("SOURCE",fromTopic)
//添加自定义处理节点,指定处理器类和上一节点的名称
.addProcessor("PROCESSOR", new ProcessorSupplier()
public Processor get()
return new LogProcessor();
,"SOURCE")
//添加目标处理节点,需要指定目标处理节点和上一节点的名称
.addSink("SINK",toTopic,"PROCESSOR");
//实例化KafkaStreams对象
KafkaStreams streams = new KafkaStreams(topology,config);
streams.start();
3、执行测试
代码编写完后,在master节点创建test1和test2主题,命令如下:
创建来源主题
kafka-topics.sh --create \\
--topic test1 \\
--partitions 3 \\
--replication-factor 2 \\
--zookeeper master:2181,slave1:2181,slave2:2181
创建目标主题
kafka-topics.sh --create \\
--topic test2 \\
--partitions 3 \\
--replication-factor 2 \\
--zookeeper master:2181,slave1:2181,slave2:2181
成功创建好目标主题后,分别在master节点和slave1节点启动生产者服务和消费者服务。
启动生产者服务命令如下:
kafka-console-procuder.sh \\
--broker-list master:9092,slave1:9092,slave2:9092 \\
--topic test1
启动消费者服务命令如下:
kafka-console-consumer.sh \\
--from-beginning \\
--topic test2 \\
--bootstrap-server master:9092,slave1:9092,slave2:9092
在生产者服务节点master输入数据:
到IDEA中运行创建好的App文件中的代码,运行结果如下:
以上两张图可以看到Kafka Streams所需的测试环境已经配置完成了。
以上是关于Kafka Streams 开发单词计数应用的主要内容,如果未能解决你的问题,请参考以下文章
如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?
Kafka Streams应用程序在kafka服务器上打开了太多文件