流式计算-Kafka Stream

Posted 曱甴崽

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了流式计算-Kafka Stream相关的知识,希望对你有一定的参考价值。

前面说了Java8的流,这里还说流处理,既然是流,比如水流车流,肯定得有流的源头,源可以有多种,可以自建,也可以从应用端获取,今天就拿非常经典的Kafka做源头来说事,比如要来一套应用日志实时分析框架,或者是高并发实时流处理框架,正是Kafka的拿手好戏。

环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1

难度:新手--战士--老兵--大师

目标:

  1. 理解kafka原理
  2. Linux下kafka集群安装
  3. 使用kafka操作流式处理

说明:

第一部分——原理


1.先看看Kafka,目前kafka的发展已超出消息中间件的范畴,趋于向流平台靠拢,先总结如下:

1.1 Scala语言编写,若作为消息中间件,并发10W+级别,大于其他MQ;

1.2 必须有Zookeeper做协调,ZK保存消费者/生产者状态信息,使得两端非常轻量化;使用发布/订阅模式,所有消息按主题(topic)分类,使用pull模式消费消息;

2301

1.3 每条消息由key + value + timestamp构成,其中key用于计算目的发送分区(partition),消息记录由不可变(immutable)的顺序式Append log文件持久化消息,Append写方式是高吞吐率的重要支撑之一!偏移量(offset)标识消息在文件中的位置,下图来自官网:

流式计算(二)-Kafka Stream
2302

1.4 每条消息不论是否已被消费都将保存一个设定的时间,这是和RabbitMQ的显著差异;消费者仅需保存消息offset信息,可按顺序消费(一个topic只有一个partition),也能进行非顺序式回溯,但随机读写性能差;多个consumer消费互不影响,这也是高并发的支撑之一!下图来自官网:

流式计算(二)-Kafka Stream
2303

1.5 每个topic的所有消息,均衡(或指定)写入多个分区(partition),分区分布在不同的broker上,每个分区使用主(Leader)+从(Follower)多节点,这样的好处,一是分区文件大小和负载可控,增强单个topic的数据承载量,二是适应并行处理;Leader负责读/写,Followers仅复制备份,Leader不可用时,自动选举Follower转为主:

流式计算(二)-Kafka Stream
2304

1.6 每个Consumer实例都属于一个消费者组(consumer group),多个Consumer实例可以存在于不同的进程或机器上(Consumer实例可类比于java类的实例对象),一个消息记录只会发送给有对应主题订阅的消费者组中的一个Consumer实例!在一个消费者组中,每个分区至多只能发送到同一消费者的一个实例上,但一个消费者实例可以消费多个分区,因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息,所以分区(partition)数必须大于等于消费者组中的实例数量。下图中,具有2个server的kafka集群,拥有同一个topic的4个分区,并对接2个消费者组,如果A或B组中Consumer都是同一消费者的实例,则轮询均衡消费,若同组都是不同的消费者实例,则相当于广播消息,下图来自官网:

流式计算(二)-Kafka Stream
2305

1.7 缺少事务特性,没有接收确认和消费确认ACK机制,也没有RocketMQ的二阶段提交。

1.8 使用场景,下图来自官网,这也让我想起了kafka的几个圈圈的图标:

流式计算(二)-Kafka Stream
2306
  • 常规消息系统:消息系统一般有 queuepublish-subscribe两种模式,queue模式下,多个consumer可以并行地各自处理一部分消息,增加吞吐量和速度,但不能一个消息多分发,因为消息被消费掉就不存在了。publish-subscribe模式下,可以广播一个消息给多个订阅者,但无法扩大吞吐量,kafka的consumer group概念下既能并行也能分发!我认为事实上kafka并没有使用队列这个数据结构,因没有先进先出的概念!
  • 实时流处理 :对接KstreamAPI,可以实现流式处理,状态计算。
  • 分布式流式数据储存:分区+副本的磁盘存储方式可以实现高可用,低延时,大数据量下无性能衰减,kafka还具有仅当所有主从复制全部完成时才算写入成功的确认机制,从而可作为commit log存储系统。

第二部分——安装


虽然window下也可使用kafka,但我想生产环境下都是使用linux,我使用RHEL8.0虚拟机,JDK11的安装,略!

流式计算(二)-Kafka Stream
2307

2.1 先进行Zookeeper安装,虽然kafka新版本已经自带ZK,但我还是推荐单独安装ZK,配置和功能独立,步骤比较清晰,且如果是ZK集群,更建议单独配置,为节省篇幅,此部分非重点我就简述了,下载apache-zookeeper-3.5.5-bin.tar.gz,创建/usr/zookeeper目录,cp到该目录,tar命令解压,创建data和logs目录,用于保存zk的数据和log日志,根据zoo_sample.cfg复制一个zoo.cfg文件,并vim编辑如下图,顺带研究下zk的配置:

流式计算(二)-Kafka Stream
2308

然后配置linux环境变量,

[root@localhost ~]# vim /etc/profile
流式计算(二)-Kafka Stream
2309

保存后使用source命令,使配置生效:

[root@localhost ~]# source /etc/profile

ZK启动命令,会自动使用zoo.cfg配置文件:

[root@localhost apache-zookeeper-3.5.5-bin]# ./bin/zkServer.sh start

成功后状态:

流式计算(二)-Kafka Stream
2350

其他ZK管理命令:

  • /查看服务状态: ./zkServer.sh status
  • /停止服务: ./zkServer.sh stop
  • /重启服务: ./zkServer.sh restart
  • /使用ZKCli连接服务器: ./zkCli.sh -server 127.0.0.1:2181,

我本地zkCli实例如下:

流式计算(二)-Kafka Stream
2310

2.2 安装kafka,下载kafka_2.12-2.3.1.tgz,创建/usr/kafka目录,cp到此目录,解压,得到kafka_2.12-2.3.1目录,进入此目录,先看配置,这里有consumer、producer和server三个properties配置文件:

流式计算(二)-Kafka Stream
2311

使用命令启动:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/ server.properties

如下为启动kafka成功:

流式计算(二)-Kafka Stream
2312

再回到zkCli下ls命令查看下,发现创建了很多node,用于保存kafka运行上下文信息:

流式计算(二)-Kafka Stream
2313

新开一个terminal,创建一个topic,指定replication副本因子为1,即复制0份,分区partitions数量指定为 1:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic biao

列出存在的topic:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
流式计算(二)-Kafka Stream
2314

创建另一个topic :

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

下图中创建了一个topic:test,使用本机kafka做集群识别,前面使用zk做集群识别,--bootstrap-server和--zookeeper参数效果一样。再模拟producer,该topic下发送两行消息,默认条件下,每行为一个消息记录:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
流式计算(二)-Kafka Stream
2315

再另开一个terminal,模拟consumer,此terminal输出将会和producer输入一致:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
流式计算(二)-Kafka Stream
2316

Ctrl + C 退出程序。

流式计算(二)-Kafka Stream
2317

2.3 以上为单ZK单kafka搭建,下面搭建单ZK多kafka实例环境:复制出3份配置文件:

[root@localhost kafka_2.12-2.3.1]# cp config/server.properties  config/server-0.properties
[root@localhost kafka_2.12-2.3.1]# cp config/server.properties config/server-1.properties
[root@localhost kafka_2.12-2.3.1]# cp config/server.properties config/server-2.properties

以server-1.properties为例,其他数字依次修改即可:

broker.id=1  #集群内必须唯一
listeners=PLAINTEXT://:9093 #Socket监听地址,没写hostname/IP即为listen所有IP
log.dirs=/tmp/kafka-logs-1 #log目录,每个实例独立,防止互相覆盖
zookeeper.connect #ZK注册地址,因为是单ZK,三个实例一样
流式计算(二)-Kafka Stream
2318

在三个单独的terminal下启动三个实例:

[root@localhost ~]# cd /usr/kafka/kafka_2.12-2.3.1
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/server-0.properties
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/server-1.properties
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/server-2.properties

单独的terminal下创建topic:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

这里:指定replication副本因子为3,即复制2份,分区partitions数量指定为1,

查看topic的详细信息:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
流式计算(二)-Kafka Stream
2319

另一个例子:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic replicated-xiao
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-xiao
流式计算(二)-Kafka Stream
2320

以上每行说明一个partition,

  • "Leader":leader节点,负责读写,一个partition下的leader是随机选取的;
  • "replicas":列出所有同步保存append log文件的节点,不论主从角色和状态是否有效;
  • "isr" :意为"in-sync",即当前有主从同步的有效节点列表;

模拟producer,并输入几行信息:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic replicated-xiao
>xie
>xiaobiao
>hell world
流式计算(二)-Kafka Stream
2321

新terminal下,模拟consumer:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh -bootstrap-server localhost:9092 --from-beginning --topic replicated-xiao

Consumer窗口输出内容会和producer窗口输入内容保持一致:

流式计算(二)-Kafka Stream
2322

容错测试,关闭broker-1实例:

[root@localhost ~]# ps -aux | grep server-1.properties
流式计算(二)-Kafka Stream
2323
[root@localhost ~]# kill 21753

或者直接到server-1界面CTRL+C关闭,效果一样:

流式计算(二)-Kafka Stream
2324

对比上面的图,可以看到Leader发生变化,Isr 里都没有1了:

流式计算(二)-Kafka Stream
2325

再使用consumer读取记录,效果一样,可见容错机制启用了主从替代:

流式计算(二)-Kafka Stream
2326

如果再启动server-1,可见主从替换后,不会恢复:

流式计算(二)-Kafka Stream
2327

到此,kafka集群创建结束!

第三部分——应用


创建一个Springboot+gradle项目,命名为kafka-stream02,

3.1 应用测试01:位于包com.biao.kafka下,实现kafka消息的发送和消费:

流式计算(二)-Kafka Stream
2328

build.gradle中的核心依赖为:

compile group: 'org.springframework.boot', name: 'spring-boot-starter', version: '2.2.1.RELEASE'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.3.3.RELEASE'

创建消息发送者com.biao.kafka.Producer:

@Component
//@Slf4j
public class Producer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

private Logger log = LoggerFactory.getLogger(Producer.class);
private String time = LocalDateTime.now().toString();
private final String msg = "THIS IS MESSAGE CONTENT " + time;

public void send() throws InterruptedException {
log.info("send message is {}",this.msg);
Thread.sleep(1000L);
// kafkaTemplate.sendDefault() 为异步方法,返回 ListenerFuture<T>,
kafkaTemplate.send("HelloWorld","test-key",this.msg);
}
}

以上核心为kafkaTemplate的API, 可以使用kafkaTemplate.send(topic,key,value)同步方法发送消息,或者kafkaTemplate. sendDefault()异步方法发送,

再创建消费者com.biao.kafka.Consumer,使用@KafkaListener注解标识一个topic的监听方法:

@Component
//@Slf4j
public class Consumer {

private Logger log = LoggerFactory.getLogger(Consumer.class);

@KafkaListener(id = "foo",groupId = "test-consumer-group",topics = "HelloWorld")
public void listen(ConsumerRecord<?,?> records){
Optional<?> msg = Optional.ofNullable(records.value());
if (msg.isPresent()){
Object data = msg.get();
log.info("ConsumerRecord >>>>>> {}", records);
log.info("Record Data >>>>>> {}", data);
}
}
}

创建入口类 com.biao.kafka.KafkaApplication:

@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) throws InterruptedException {
System.out.println("KafkaApplication started >>>>>>");
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class,args);
Producer producer = context.getBean(Producer.class);
producer.send();
}
}

配置文件application.properties,请关注下Serializer和Deserializer:

#以下这些值也可以在运行时通过参数指定
#============== kafka ===================
# 指定kafka 代理地址,可以多个,用逗号隔开
spring.kafka.bootstrap-servers=192.168.1.204:9092
# 运行com.biao.wordcount.WordCountApplication时使用,我换了一个linux虚拟机
#spring.kafka.bootstrap-servers=192.168.1.221:9092

#=============== provider =======================
spring.kafka.producer.retries=2
# 每次批量发送消息的数量,kafka是使用流模拟批量处理,每次提交都是批处理方式
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

运行程序即可看到结果,这里使用Springboot的DI机制启动运行了consumer和producer,注意关闭linux的防火墙或打开9092端口:

流式计算(二)-Kafka Stream
2329

再到kafka服务器上验证一下是否真的发送成功:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh -bootstrap-server 192.168.1.204:9092 --from-beginning --topic HelloWorld
流式计算(二)-Kafka Stream
2330

3.2 应用测试02,包com.biao.pipe下,实现一个流处理逻辑,开启一个流传输管道,将一个topic的内容传输到另一个topic中,代码com.biao.pipe.PipeApplication:

public class PipeApplication {
public static void main(String[] args) {
System.out.println("PipeApplication starting .........");
Properties props = new Properties();
// StreamsConfig已经预定义了很多参数名称,运行时console会输出所有StreamsConfig values
// 这里没有使用springboot的application.properties来配置
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
// kafka流都是byte[],必须有序列化,
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

// kafka流计算是一个各broker连接的拓扑结构,以下使用builder来构造拓扑
final StreamsBuilder builder = new StreamsBuilder();
// 构建一个KStream流对象,元素是<String, String>类型的key-value对值,
KStream<String, String> source = builder.stream("streams-plaintext-input");
// 将前面的topic:"streams-plaintext-input"写入另一个topic:"streams-pipe-output"
source.to("streams-pipe-output");
// 以上两行等同以下一行
// builder.stream("streams-plaintext-input").to("streams-pipe-output");

// 查看具体构建的拓扑结构
final Topology topology = builder.build();
System.out.println(topology.describe());

final KafkaStreams streams = new KafkaStreams(topology,props);
// 控制运行次数,一次后就结束
final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try{
streams.start();
latch.await();
}catch (Throwable e){
System.exit(1);
}
System.exit(0);
}
}
流式计算(二)-Kafka Stream
2331

解释:以上构造了有2个处理节点的kafka流计算拓扑结构,源节点:KSTREAM-SOURCE-0000000000,汇聚(Sink)节点:KSTREAM-SINK-0000000001,源节点持续的读取topic为streams-plaintext-input的有序记录并输送到下游Sink节点,Sink节点再将记录写入topic为streams-pipe-output的流,--> 和 <-- 指示左右端对象的上游和下游关系,图中有换行,导致显示不连贯拓扑展示如下:

流式计算(二)-Kafka Stream
2332

3.3 应用测试03,包com.biao.linesplit下,创建一个无状态的流处理逻辑,读取一个topic的记录,并将文本行按空格分开,再传输到另一个topic,代码 com.biao.linesplit.LineSplitApplication:

public class LineSplitApplication {
public static void main(String[] args) {
System.out.println("LineSplitApplication starting .........");
Properties props = new Properties();
// StreamsConfig已经预定义了很多参数名称,运行时console会输出所有StreamsConfig values
// 这里没有使用springboot的application.properties来配置
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-line-split");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
// kafka流都是byte[],必须有序列化,不同的对象使用不同的序列化器
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

// kafka流计算是一个各broker连接的拓扑结构,以下使用builder来构造拓扑
final StreamsBuilder builder = new StreamsBuilder();
// 构建一个KStream流对象,元素是<String, String>类型的key-value对值,
KStream<String, String> source = builder.stream("streams-plaintext-input");
/*
// 以source为输入,产生一条新流words,这里使用了流的扁平化语法,我的前篇文章有讲此基础
KStream<String, String > words = source.flatMapValues(value -> Arrays.asList("\\W+"));
// 将前面的topic:"streams-plaintext-input"写入另一个topic:"streams-pipe-output"
words.to("streams-pipe-output");*/


// 以上两行使用stream链式语法+lambda等同以下一行,我的前篇文章有讲此基础
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");

// 查看具体构建的拓扑结构
final Topology topology = builder.build();
System.out.println(topology.describe());

final KafkaStreams streams = new KafkaStreams(topology,props);
// 控制运行次数,一次后就结束
final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try{
streams.start();
latch.await();
}catch (Throwable e){
System.exit(1);
}
System.exit(0);
}
}
流式计算(二)-Kafka Stream
2333

解释:以上构造了有3个处理节点的kafka流计算拓扑结构,源节点:KSTREAM-SOURCE-0000000000,处理节点:KSTREAM-FLATMAPVALUES-0000000001,汇聚节点:KSTREAM-SINK-0000000002,处理节点从源节点取得流元素,进行处理,再将结果传输给汇聚节点,注意这个过程是无状态的,拓扑展示如下:

流式计算(二)-Kafka Stream
2334

3.4 应用测试04,包com.biao.wordcount下,构建一个无限流处理逻辑,读取一个topic,统计文本单词数,最终输出到另一个topic,代码com.biao.wordcount.WordApplication:

public class WordCountApplication {
public static void main(String[] args) {
System.out.println("WordCountApplication starting .........");
Properties props = new Properties();
// StreamsConfig已经预定义了很多参数名称,运行时console会输出所有StreamsConfig values
// 这里没有使用springboot的application.properties来配置
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-word-count");
// kafka虚拟机linux地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
// kafka流都是byte[],必须有序列化,不同的对象使用不同的序列化器
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

// kafka流计算是一个各node连接的拓扑结构,以下使用builder来构造拓扑
final StreamsBuilder builder = new StreamsBuilder();
// 构建一个KStream流对象,元素是<String, String>类型的key-value对值,topic:streams-plaintext-input
KStream<String, String> source = builder.stream("streams-plaintext-input");

// 以下使用stream链式语法+lambda,具体分开的过程语句我就不写了
// flatMapValues将text line使用空格分隔成words
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy(((key, value) -> value))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(),Serdes.Long()));

// 查看具体构建的拓扑结构
final Topology topology = builder.build();
System.out.println(topology.describe());

final KafkaStreams streams = new KafkaStreams(topology,props);
// 控制运行次数,一次后就结束
final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try{
streams.start();
latch.await();
}catch (Throwable e){
System.exit(1);
}
System.exit(0);
}
}
流式计算(二)-Kafka Stream
2335

解释:最重要一点即此WordCountApplication仅是一个逻辑处理单元,可以理解为一个流水线车间,里面有两条流水线对来料加工再输出加工品。以上可以看出,有两个不连通的拓扑结构,第一个拓扑无状态,其汇聚节点KSTREAM-SINK-0000000005写入到topic: counts-store-repartition,这个topic又作为第二个拓扑的源,此中间topic的作用是因分组聚合运算”打乱”流元素的顺序。插入的节点Processor: KSTREAM-FILTER-0000000005是过滤掉分组聚合key值为空的记录。

第二个拓扑有状态,即生成并保存了计算中间值,因为要做分组统计,分组聚合运算节点KSTREAM-AGGREGATE-0000000003保存状态使用了counts-store,即程序中指定的值。对流中每个元素统计时,会先去保存的状态数据中去查找匹配,如果有则累加一,然后再写入counts-store。每个被更新的统计值都再传输到处理节点KTABLE-TOSTREAM-0000000007,此节点作用是将统计更新的值再解析成新流。最后传输给汇聚节点KSTREAM-SINK-0000000008。以上可见流处理的思想和逻辑,内部迭代确实很强大!拓扑图如下:

流式计算(二)-Kafka Stream
2336

应用04运行步骤:

第一步,启动ZK,再启动kafka,注意先修改config/server.properties 中listeners=PLAINTEXT:// 192.168.1.221:9092:

[root@localhost kafka_2.12-2.3.1]#  ./bin/kafka-server-start.sh config/server.properties

第二步,运行com.biao.wordcount.WordCountApplication,启动kafka流处理车间。

topic数据写入放在包com.biao.wordcount.producer,当然也可以直接在kafka server中使用命令行写入,我这里是为了演示多种代码操作模式。配置类com.biao.wordcount.producer.KafkaConfig,这里使用了kafka的API配置方式,分别配置了topic,producer和consumer的相应参数,并生成Bean对象,请对比application.properties方式:

@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
public KafkaTemplate<Integer,String > kafkaTemplate(){
return new KafkaTemplate<>(this.producerFactory());
}

// topic
@Bean
public KafkaAdmin admin(){
Map<String,Object> configs = new HashMap<>(16);
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
return new KafkaAdmin(configs);
}

@Bean
// NewTopic(String name, int numPartitions, short replicationFactor)
// kafka中每个topic只需创建一次,
public NewTopic topic(){
return new NewTopic("streams-plaintext-input",1, (short) 1);
}

@Bean
// NewTopic(String name, int numPartitions, short replicationFactor)
// kafka中每个topic只需创建一次,
public NewTopic topic2(){
return new NewTopic("streams-wordcount-output",1, (short) 1);
}

// producer
@Bean
public Map<String,Object> producerConfigs(){
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
props.put("acks","all");
props.put("retries",2);
props.put("batch.size",16384);
props.put("linger.ms",1);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("key.converter","org.apache.kafka.connect.storage.IntegerConverter");
// props.put("value.converter","org.apache.kafka.connect.storage.StringConverter");
return props;
}

@Bean
public ProducerFactory<Integer,String> producerFactory(){
return new DefaultKafkaProducerFactory<>(this.producerConfigs());
}

// consumer
@Bean
public Map<String,Object> consumerConfigs(){
HashMap<String,Object> props = new HashMap<>(16);
props.put("bootstrap.servers","192.168.1.221:9092");
props.put("group.id","foo");
props.put("enable.auto.commit","true");
// WordCountApplication 的consumer消费对象是统计的结果 key-value
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.LongDeserializer");
props.put("formatter","kafka.tools.DefaultMessageFormatter");
props.put("print.key","true");
props.put("value.key","true");
// props.put("key.converter","org.apache.kafka.connect.storage.IntegerConverter");
// props.put("value.converter","org.apache.kafka.connect.storage.StringConverter");
return props;
}

@Bean
public ConsumerFactory<Integer,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(this.consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
return factory;
}

@Bean
public SimpleConsumer simpleConsumerLister(){
return new SimpleConsumer();
}
}

定义消费者,com.biao.wordcount.producer.SimpleConsumer:

@Component
public class SimpleConsumer {
private Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
private final CountDownLatch countDownLatch = new CountDownLatch(1);

@KafkaListener(id = "foo",topics = "streams-wordcount-output")
public void listen(byte[] records){
System.out.println("records is >>>> "+ records);
this.countDownLatch.countDown();
log.debug("consume successfully!");
}
//在WordCountApplication实例中,无法打印流结果,因为需要格式化
/* public void listen(ConsumerRecord<?,?> records){
Optional<?> msg = Optional.ofNullable(records.value());
if (msg.isPresent()){
Object data = msg.get();
log.info("Consumer Record >>>>>> {}", records);
log.info("Record Data >>>>>> {}", data);
}
}*/

}

定义生产者,并作为启动类,com.biao.wordcount.producer.KafakaProducer:

@SpringBootApplication
public class KafakaProducer {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(KafkaConfig.class);
// KafkaTemplate<Integer,String> kafkaTemplate = (KafkaTemplate<Integer, String>) context.getBean(KafkaTemplate.class);
KafkaTemplate<Integer,String> kafkaTemplate = (KafkaTemplate<Integer, String>) context.getBean(KafkaTemplate.class);
LocalDateTime time = LocalDateTime.now();
String data = "MSG CONTENT -> " + time ;
// send(String topic, K key, @Nullable V data)
ListenableFuture<SendResult<Integer,String>> send = kafkaTemplate.send("streams-plaintext-input", 1, data);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println(">>>>>>> kafka message send failure");
}

@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println(">>>>>>> kafka message send successfully");
}
});
}
}

第三步,运行com.biao.wordcount.producer.KafakaProducer, 启动topic数据写入,kafka中验证如下:

流式计算(二)-Kafka Stream
2337

如果多次运行导致测试数据太多,影响结果查看,可以先删除topic及其数据,若当前topic有使用过即有传输过信息:并没有真正删除topic只是把这个topic标记为删除(marked for deletion),要彻底删除需到ZK中删除相应的目录:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic HelloWorld
Topic HelloWorld is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

第四步,在kafka server上查看最终word统计结果,命令:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.221:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
流式计算(二)-Kafka Stream
2338

后记:

1.关于有状态无状态,复杂问题简单化!无状态对象本身只是个纯粹的处理逻辑,不依赖上下文信息,也不改变上下文信息,比如FUNC(x+y),只要有输入x和y,就输出相加值,对程序“无害”;有状态指会保留上下文,如统计单词数,必须保留每次计算的中间结果,用于下次累加,有状态对象会破坏程序运行现场,不利于并发和共享。

2.如遇到程序出错:

[AdminClient clientId=adminclient-1] Error connecting to node dubbo204.domain:9092 (id: 0 rack: null)
流式计算(二)-Kafka Stream
2340

3.添加lombok依赖

providedCompile group: 'org.projectlombok', name: 'lombok', version: '1.18.10'

遇到编译错误:

Could not find method providedCompile() for arguments [{group=org.projectlombok, name=lombok, version=1.18.10}]

因providedCompile必须配合 war插件,修改build.gradle:

流式计算(二)-Kafka Stream
2341

4.运行WordCountApplication 报错:

org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

这是序列化问题,必须使用正确的序列化器处理对应的数据,如IntegerDeserializer只能反序列化Integer对象,StringSerializer用于序列化String对象。

5.RHEL8.0版本可用性还是不错的,比7要流畅很多,很多命令都变了,我开的共享:https://pan.baidu.com/s/19gkx07hQ6TuN9UyNWHmChQ 提取码:bg69,绝对保证可用,之前我也下载了几次都是损坏的,每次6.62G大小,快哭了。

2342

总结:kafka API,分为Producer,Consumer,Stream,Connect和AdminClient。Producer/Consumer分别用于管理生产者和消费者,Stream则是自带的KStream,可以类比JDK8的Stream来理解,即在输出到最终sink前进行流式计算,且很多方法使用类似,Connect是用于kafka连接到输入/输出,支持很多类型,如DB,file,redis,ELK等。AdminClient则管理topic/broker等。KStream+kafka强强联手,可以预计未来会干出一番大事!

推荐阅读:



以上是关于流式计算-Kafka Stream的主要内容,如果未能解决你的问题,请参考以下文章

流式计算-Kafka Stream

流式计算-Kafka Stream

流处理Kafka Stream-Spark Streaming-Storm流式计算框架比较选型

介绍一位分布式流处理新贵:Kafka Stream

Kafka核心API——Stream API

Kafka核心API——Stream API