Kafka Streams 开发单词计数应用

Posted 鄙人阿彬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Streams 开发单词计数应用相关的知识,希望对你有一定的参考价值。

1、添加依赖(IDEA工具操作)

<dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-streams</artifactId>

            <version>2.0.0</version>

        </dependency>

2、编写代码(IDEA工具操作)

根据上述业务流程分析得出,单词数据通过自定义处处理器接收并执行相应业务计算,因此创建LogProcessor类,并且继承StreamsAPI中的P ocessor接口,在Processor接口中,定义了以下3个方法。

(1)init(ProcessorContextprocessorContext):初如治化上下文对象。

(2)process(Key,Value):每接收到一条消息时,都会调用该方法处理并更新状态进行存储。

(3)close():关闭处理器,这里可以做一些资源清理理工作。 Kafka Streams单词计数详细代码如下所示。

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

import java.util.HashMap;

public class LogProcessor implements Processor<byte[],byte[]>

    private ProcessorContext processorContext;

    public void init(ProcessorContext processorContext)

        this.processorContext = processorContext;

    

    public void process(byte[] key, byte[] value)

        String inputOri = new String(value);

        HashMap <String,Integer>map = new HashMap<String, Integer>();

        int times = 1;

        if (inputOri.contains(""))

            //截取字段

            String[] words = inputOri.split(" ");

            for (String word : words)

                if (map.containsKey(word))

                    map.put(word,map.get(word)+1);

                else

                    map.put(word,times);

                

            

        

        inputOri = map.toString();

        processorContext.forward(key,inputOri.getBytes());

    

    public void close()

    

单词计数的业务功能开发完之后,Kafka Streams 需要编写一个运行主程序的类App,来测试LogProcessor业务程序,具体代码如下:

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;

import java.util.Properties;

public class App

    public static void main(String[] args)

        //声明来源主题

        String fromTopic = "test1";

        //声明目标主题

        String toTopic = "test2";

        //设置参数信息

        Properties props = new Properties();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");

        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,

                "master:9092,slave1:9092,slave2:9092");

        //实例化StreamsConfig对象

        StreamsConfig config = new StreamsConfig(props);

        //构建拓扑结构

        Topology topology = new Topology();

        //添加原处理节点,为原处理节点指定名称和他订阅的主题

        topology.addSource("SOURCE",fromTopic)

                //添加自定义处理节点,指定处理器类和上一节点的名称

        .addProcessor("PROCESSOR", new ProcessorSupplier()

            public Processor get()

                return new LogProcessor();

            

        ,"SOURCE")

                //添加目标处理节点,需要指定目标处理节点和上一节点的名称

                .addSink("SINK",toTopic,"PROCESSOR");

        //实例化KafkaStreams对象

        KafkaStreams streams = new KafkaStreams(topology,config);

        streams.start();

    

3、执行测试

 代码编写完后,在master节点创建test1和test2主题,命令如下:

创建来源主题

kafka-topics.sh --create \\

--topic test1 \\

--partitions 3 \\

--replication-factor 2 \\

--zookeeper master:2181,slave1:2181,slave2:2181

 

创建目标主题

kafka-topics.sh --create \\

--topic test2 \\

--partitions 3 \\

--replication-factor 2 \\

--zookeeper master:2181,slave1:2181,slave2:2181

 

成功创建好目标主题后,分别在master节点和slave1节点启动生产者服务和消费者服务。

启动生产者服务命令如下:

kafka-console-procuder.sh \\

--broker-list master:9092,slave1:9092,slave2:9092 \\

--topic test1

 

启动消费者服务命令如下:

kafka-console-consumer.sh \\

--from-beginning \\

--topic test2 \\

--bootstrap-server master:9092,slave1:9092,slave2:9092

 

 

在生产者服务节点master输入数据:

 

到IDEA中运行创建好的App文件中的代码,运行结果如下: 

 

 

以上两张图可以看到Kafka Streams所需的测试环境已经配置完成了。

以上是关于Kafka Streams 开发单词计数应用的主要内容,如果未能解决你的问题,请参考以下文章

如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

Kafka Streams开发入门

Kafka Streams State Store

Kafka Streams应用程序在kafka服务器上打开了太多文件

Kafka Streams - 根据 Streams 数据发送不同的主题

Kafka ETL 的应用及架构解析|告别 Kafka Streams,让轻量级流处理更加简单