知识点16--spring boot整合kafka
Posted 尘世壹俗人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了知识点16--spring boot整合kafka相关的知识,希望对你有一定的参考价值。
本篇知识点沿用知识点15的项目,为大家介绍spring boot如何连接kafka,本章有些长请耐心看完。没有kafka集群的去我主页找各类型大数据集群搭建文档
–>大数据原生集群本地测试环境搭建三
第一步:首先导入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
第二步:修改spring boot配置文件
spring.kafka.bootstrap-servers=192.168.88.186:9092,192.168.88.187:9092,192.168.88.188:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries=0
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.group-id=test
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
第三步:我们建立一个测试类,来认识如何使用生产者
package com.wy.scjg;
import com.wy.scjg.bean.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.Resource;
@SpringBootTest
public class KfKTest
@Resource
private KafkaTemplate kafkaTemplate;
@Test
void pro_test()
User user = new User();
user.setName("张三");
user.setAge(20);
kafkaTemplate.send("test",user.toString());
我们在服务器端开一个消费者,消费test主题的消息,集群地址改你自己的
./bin/kafka-console-consumer.sh --bootstrap-server hdp1:9092,hdp2:9092,hdp3:9092 --topic test
我们运行测试类,看效果
![](https://image.cha138.com/20230402/c972166cb4714d208adc080bbe2bebc8.jpg)
通过效果,我们可以看到数据成功发送
第四步:我们要知道spring boot整合的kafka为我们提供了两种可供选择的生产方式,上一步是其一叫做异步发送,也是默认的发送方式,另一种是同步发送,区别就在于异步发送是生产者消息发送到集群后一边等集群成功收到消息的回馈一边发送下一条,同步发送是发送后不再去马上准备下一条,而是等收到集群反馈的成功消息才准备下一条,下面我们看一下如何使用同步发送,修改测试类的发送方法
@Test
void pro_test() throws ExecutionException, InterruptedException, TimeoutException
User user = new User();
user.setName("张三");
user.setAge(20);
ListenableFuture<SendResult<String, Object>> sendResult = kafkaTemplate.send("test", user.toString());
SendResult<String, Object> result = sendResult.get(3, TimeUnit.SECONDS);
System.out.println("监听到的结果-------"+result.getProducerRecord().value());
再次运行,我们可以看到,集群再次收到了消息
![](https://image.cha138.com/20230402/411a48b4950a461492bf6f8d09d9c8c0.jpg)
控制台也监听到了结果
![](https://image.cha138.com/20230402/f930144345b24668a09858d93800c77e.jpg)
第五步:生产者我们会用了,我们现在看看消费者如何使用,消费者必须要框架的支撑,因此我们不在使用测试类,我们让它以一个配置类的形式存在,当然大家如果愿意也可以新建一个独立的包存放,然后使用@Component注解向框架注册
package com.wy.scjg.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class ConsumerConfig
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(id = "test1", topics = "test")
public void listen(String mes)
logger.info("我收到的数据是-------------"+mes);
我们在服务器开一个生产者
./bin/kafka-console-producer.sh --broker-lisdp1:9092,hdp2:9092,hdp3:9092 --topic test
运行项目,生产者发生消息,看控制台的输出
![](https://image.cha138.com/20230402/b416ac72dca9405c9f01797052d0ff13.jpg)
但是消费者还需要两个东西,这里提供给大家
package com.wy.scjg.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
@Configuration
public class ConsumerConfig
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默认取spring boot里配置的
@KafkaListener(id = "test1", topics = "test")
public void listen(String mes)
logger.info("我收到的数据是-------------"+mes);
@Bean
public RecordMessageConverter converter()
return new StringJsonMessageConverter();
@Bean
public NewTopic topic()
//主题 分区数 副本数
return new NewTopic("test", 1, (short) 1);
我这里说一下这两个东西有什么用,NewTopic 是消费一个没有的主题时框架会创建,一般工作中如果用到了,切记副本数一定要按需求做更改,一般不可能是1,RecordMessageConverter是很早之前我碰到一个没有这个bean的bug,所以大家如果也遇到了可以怎么写一个
但其实消费者不止这一种写法,你还可以写成下面这个样子
@Component
public class KafkaConsumer
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默认取spring boot配置文件里面的
@KafkaListener(topics = "test")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord)
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent())
Object msg = optional.get();
logger.info("message:", msg);
第六步:前面我们在发送消息时对同步的发送做了监听,那大家有没有想过,异步发送这么监听?有人会说同步监听不就相当于异步监听了吗,但本质上是不一样的,异步优点就是发送消息不会由于监听结果而造成阻塞,所以这个时候也就需要去注册一个异步监听的配置类
package com.wy.scjg.config;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Configuration
public class ProSendYBLen
private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);
@Resource
KafkaTemplate kafkaTemplate;
//配置监听
@PostConstruct
private void listener()
kafkaTemplate.setProducerListener(new ProducerListener()
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata)
logger.info("我已经接收到消息-----message=", producerRecord.value());
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception)
logger.error("接收失败--------message=", producerRecord.value());
);
想要看到效果,我们就需要有一个生产者的Controller,因为如果你还使用测试类的话生产者发送完消息后测试程序就结束了,和项目没关联的
package com.wy.scjg.controller;
import com.wy.scjg.bean.User;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@Controller
@RequestMapping("/kfk")
public class KfkController
@Resource
private KafkaTemplate kafkaTemplate;
@RequestMapping("/send")
@ResponseBody
public String pro_test() throws ExecutionException, InterruptedException, TimeoutException
User user = new User();
user.setName("张三");
user.setAge(20);
kafkaTemplate.send("test", user.toString());
return "发送完成";
注意如果你是跟着知识点走的,那就把拦截器设置一下,不然影响演示
![](https://image.cha138.com/20230402/e0603a4631d842dd80c0d98306fea76c.jpg)
然后运行项目,对发送消息的kafka控制器发出请求,之后你会在控制台中发现如下输出
![](https://image.cha138.com/20230402/2b19d6a69a4b4fb5943030da6bd0764f.jpg)
到此我希望大家不要钻牛角尖,去考虑消费者监听怎么写,消费者进程本身就是个监听器,没有说像生产者一样再去写一个的说法
第七步:这一步是个理论知识,可以跳过
我们知道生产者和消费者是有系列化方式的,一般默认用org.apache.kafka.common.serialization.StringDeserializer
,同时数据的发送一般都是字符串就足以满足需求,很少用到其他的类型,但是我们要知道系列化方式不止有String,还有Long等等的其他类型,大家可以看下序列化包下的类,如下图
![](https://image.cha138.com/20230402/35dbe846387b42fdb60204429f402e0a.jpg)
在网络上你会看到自己写系列化类的,比如我想要发出去的数据本身就是JSON格式,那就自定义一个生产者用的系列化类,实现Serializable
接口,重写public byte[] serialize(String s, Object o)
方法,然后在有一个消费者用的反系列化类,实现Deserializer
接口,重写public Object deserialize(String s, byte[] bytes)
方法,最后将这两个类配置在spring boot的kafka配置里
这一套理论上是可行的,但是实际开发中我们不会去做,因为没有必要,一个搞不好出问题还不好解决,同时之所以说是理论上可行,还因为Serializable和Deserializer接口的方法编译不让重写会报错
第八步:在生产和消费的时候,我们可以指定发送到那个分区下,想要指定分区有两种方式
kafkaTemplate.send("test", 0, key, "");
kafkaTemplate.send("test", key, "")
消费的时候你也可以指定消费的分区,不过只能直接指定
@KafkaListener(topics = "test",topicPattern = "0")
你如果想用给自定义的也可以,那你就需要自定义一个分区的策略类
package com.wy.scjg.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner
@Override
public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
String keyStr = key+"";
if (keyStr.startsWith("0"))
return 0;
else
return 1;
@Override
public void close()
@Override
public void configure(Map<String, ?> map)
然后自定义一个Kafka的配置类
package com.wy.scjg.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyPartitionTemplate
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("$spring.kafka.bootstrap-servers")
private String bootstrapServers;
KafkaTemplate kafkaTemplate;
@PostConstruct
public void setKafkaTemplate()
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
public KafkaTemplate getKafkaTemplate()
return kafkaTemplate;
解释一下PostConstruct的作用,它用来标注一个非静态的方法,用来在spring boot注入一个对象后被调用,对该对象做配置
这样你就不能直接用KafkaTemplate
了,你要注入你自定义的MyPartitionTemplate
,我们用前面介绍生产者时的测试类修改一下
package com.wy.scjg;
import com.wy.scjg.config.MyPartitionTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@SpringBootTest
public class KfKTest
@Autowired
private MyPartitionTemplate myPartitionTemplate;
@Test
void pro_test() throws ExecutionException, InterruptedException, TimeoutException
myPartitionTemplate.getKafkaTemplate().send("test",0,"0", "0开头的数据");
同时你的消费者配置类也要改一下,我改的时候为了和前面呼应,我将消费者改成了另一种写法,大家注意一下
package com.wy.scjg.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import java.util.Optional;
@Configuration
public class ConsumerConfig
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(id = "test1", topics = "test" ,topicPattern = "0")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord)
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.spring boot整合kafka
最近项目需求用到了kafka信息中间件,在此做一次简单的记录,方便以后其它项目用到。
引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置文件
kafka.consumer.servers=127.0.0.1:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.group.id=kafka-test-group
kafka.consumer.concurrency=10
kafka.producer.servers=127.0.0.1:9092
kafka.producer.retries=1
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
生产者配置类
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
消费者配置类
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(8);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
生产者类
@Component
public class KafkaProducer {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String message) {
logger.info("on message:{}", message);
kafkaTemplate.send(topic, message);
}
}
消费者类
@Component
public class VideoCosConsumer {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = {"test-topic"})
public void consumerMessage(String message) {
logger.info("on message:{}", message);
}
}
以上就是spring cloud整合kafka的过程,现在spring让我们代码搬运工越来越没有活干了,连复制粘贴都不行了,只能简单的拼装需要的实体类。
以上是关于知识点16--spring boot整合kafka的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot整合Kafka
Kafka学习--spring boot 整合kafka
spring boot整合kafka
kafka学习Spring Boot 整合 Kafka
spring boot+kafka整合(未完待续)
Spring Kafka和Spring Boot整合实现消息发送与消费简单案例