Spring Boot整合Kafka
Posted Charge8
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot整合Kafka相关的知识,希望对你有一定的参考价值。
一、Spring Boot整合Kafka
创建 SpringBoot项目,引入 kafka依赖:
<!-- Springboot整合 Kafka使用。注意:版本一致 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
1、yaml配置文件
在配置文件中,配置Kafka服务信息。
spring:
application:
name: kafka-springboot
# kafka配置信息
kafka:
bootstrap-servers: 192.168.xxx.xxx:9092 # 集群用逗号分隔
producer: # 生产者
retries: 3 # 失败重试次数
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: # 消费者
group-id: default-group # 消费组
enable-auto-commit: false
# auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: MANUAL_IMMEDIATE
配置参数不了解的,请查看之前文章,或者查看官方文档。
这里主要说明一下 listener配置参数:
- RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
- BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
- TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
- COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
- COUNT_TIME:TIME | COUNT 有一个条件满足时提交
- MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
- MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交
作用就是影响消费者监听器(ListenerConsumer)处理之后提交动作。一般我们选用手动提交。
2、生产者
使用 KafkaTemplate类的 send方法
发送消费。
2.1 分区发送
@Service
public class Producer
private final static String TOPIC_NAME = "my-springboot-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMsg(int orderId)
//1 构建消息数据
Order order = new Order(orderId, "订单-" + orderId, 1000.00);
//2.发送消息
// 未指定分区发送
kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
// 指定分区发送
//kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
2.2 同步发送
/**
* 同步发送
* @param orderId
*/
public void syncSendMsg(int orderId)
//1 构建消息数据
Order order = new Order(orderId, "订单-" + orderId, 1000.00);
//2.发送消息
// 未指定分区发送
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
SendResult<String, String> sendResult = null;
try
sendResult = future.get();
catch (InterruptedException e)
e.printStackTrace();
catch (ExecutionException e)
e.printStackTrace();
RecordMetadata metadata = sendResult.getRecordMetadata();
ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
2.3 异步发送
1)首先需要自定义一个生产者监听器类
自定义生产者发送结果监听类,需要实现 ProducerListener类
。
- 发送消息成功则会回调用 onSuccess方法,
- 发送消息失败则会回调用 onError方法。
@Component
public class MyKafkaProducerSendResultListener implements ProducerListener
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata)
System.out.println("消息发送成功 : " + producerRecord.toString());
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception)
System.out.println("消息发送成功,exception=" + exception.getMessage());
2)添加生产者监听器类之后在发送消息
@Autowired
private MyKafkaProducerSendResultListener myKafkaProducerSendResultListener;
/**
* 异步发送
* @param orderId
*/
public void asyncSendMsg(int orderId)
//1 构建消息数据
Order order = new Order(orderId, "订单-" + orderId, 1000.00);
//2.发送消息
// 添加发送回调监听
kafkaTemplate.setProducerListener(myKafkaProducerSendResultListener);
// 未指定分区发送
kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
3、消费者
使用 @KafkaListener注解
来注入消费者。常见参数如下:
@KafkaListener(
groupId = "mySpringBootGroup",
topicPartitions =
@TopicPartition(topic = "topic1", partitions = "0", "1"),
@TopicPartition(
topic = "topic2",
partitions = "0",
partitionOffsets = @PartitionOffset(
partition = "1",
initialOffset = "100"))
,
concurrency = "6"
)
- group-id:表示消费组,如果没有指定,则会使用配置文件中设置的默认的groupId。
- topicPartitions:一个消费组可以消费多个主题分区
- TopicPartition:主题分区相关
- concurrency:同组下的消费者个数,必须小于等于分区总数,大于意义不大,没必要大于。
3.1 多消费组消费主题
如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener注解来注入多个消费者即可。
@Component
public class MyConsumer
@KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup")
public void listenConsumerGroup1(ConsumerRecord<String, String> record, Acknowledgment ack)
String value = record.value();
//System.out.println(record);
System.out.printf("ConsumerGroup1收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
//手动提交offset
ack.acknowledge();
//配置多个消费组
@KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup2")
public void listenConsumerGroup2(ConsumerRecord<String, String> record, Acknowledgment ack)
String value = record.value();
System.out.printf("ConsumerGroup2收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
//手动提交offset
ack.acknowledge();
4、启动项目
启动项目,从打印信息看出,每个消费者都会输出其配置信息。
单元测试ok:
有了之前 Kafka客户端API的使用,Spring Boot整合Kafka使用就更加简单了。
传送门(Kafka客户端API):https://blog.csdn.net/qq_42402854/article/details/124994563
– 求知若饥,虚心若愚。
以上是关于Spring Boot整合Kafka的主要内容,如果未能解决你的问题,请参考以下文章