Spring Boot整合Kafka

Posted Charge8

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot整合Kafka相关的知识,希望对你有一定的参考价值。

一、Spring Boot整合Kafka

创建 SpringBoot项目,引入 kafka依赖:

        <!-- Springboot整合 Kafka使用。注意:版本一致   -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.1</version>
        </dependency>

1、yaml配置文件

在配置文件中,配置Kafka服务信息。

spring:
  application:
    name: kafka-springboot
  # kafka配置信息
  kafka:
    bootstrap-servers: 192.168.xxx.xxx:9092 # 集群用逗号分隔
    producer: # 生产者
      retries: 3 # 失败重试次数
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者
      group-id: default-group # 消费组
      enable-auto-commit: false
      # auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: MANUAL_IMMEDIATE

配置参数不了解的,请查看之前文章,或者查看官方文档。

这里主要说明一下 listener配置参数:

  • RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  • BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  • TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  • COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  • COUNT_TIME:TIME | COUNT 有一个条件满足时提交
  • MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  • MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交

作用就是影响消费者监听器(ListenerConsumer)处理之后提交动作。一般我们选用手动提交。

2、生产者

使用 KafkaTemplate类的 send方法发送消费。

2.1 分区发送

@Service
public class Producer 
    private final static String TOPIC_NAME = "my-springboot-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMsg(int orderId) 
        //1 构建消息数据
        Order order = new Order(orderId, "订单-" + orderId, 1000.00);

        //2.发送消息
        // 未指定分区发送
        kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
        // 指定分区发送
        //kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));

    

2.2 同步发送

    /**
     * 同步发送
     * @param orderId
     */
    public void syncSendMsg(int orderId) 
        //1 构建消息数据
        Order order = new Order(orderId, "订单-" + orderId, 1000.00);

        //2.发送消息
        // 未指定分区发送
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
        SendResult<String, String> sendResult = null;
        try 
            sendResult = future.get();
         catch (InterruptedException e) 
            e.printStackTrace();
         catch (ExecutionException e) 
            e.printStackTrace();
        
        RecordMetadata metadata = sendResult.getRecordMetadata();
        ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
        System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    

2.3 异步发送

1)首先需要自定义一个生产者监听器类

自定义生产者发送结果监听类,需要实现 ProducerListener类

  • 发送消息成功则会回调用 onSuccess方法,
  • 发送消息失败则会回调用 onError方法。
@Component
public class MyKafkaProducerSendResultListener implements ProducerListener 

	@Override
	public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) 
		System.out.println("消息发送成功 : " + producerRecord.toString());
	

	@Override
	public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) 
		System.out.println("消息发送成功,exception=" + exception.getMessage());
	

2)添加生产者监听器类之后在发送消息

    @Autowired
    private MyKafkaProducerSendResultListener myKafkaProducerSendResultListener;
    /**
     * 异步发送
     * @param orderId
     */
    public void asyncSendMsg(int orderId) 
        //1 构建消息数据
        Order order = new Order(orderId, "订单-" + orderId, 1000.00);

        //2.发送消息
        // 添加发送回调监听
        kafkaTemplate.setProducerListener(myKafkaProducerSendResultListener);
        // 未指定分区发送
        kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
    

3、消费者

使用 @KafkaListener注解来注入消费者。常见参数如下:

@KafkaListener(
    groupId = "mySpringBootGroup", 
    topicPartitions = 
        @TopicPartition(topic = "topic1", partitions = "0", "1"),
        @TopicPartition(
            topic = "topic2", 
            partitions = "0",
            partitionOffsets = @PartitionOffset(
                partition = "1", 
                initialOffset = "100"))
          ,
        concurrency = "6"
)
  • group-id:表示消费组,如果没有指定,则会使用配置文件中设置的默认的groupId。
  • topicPartitions:一个消费组可以消费多个主题分区
  • TopicPartition:主题分区相关
  • concurrency:同组下的消费者个数,必须小于等于分区总数,大于意义不大,没必要大于。

3.1 多消费组消费主题

如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener注解来注入多个消费者即可。

@Component
public class MyConsumer 

    @KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup")
    public void listenConsumerGroup1(ConsumerRecord<String, String> record, Acknowledgment ack) 
        String value = record.value();
        //System.out.println(record);
        System.out.printf("ConsumerGroup1收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
        //手动提交offset
        ack.acknowledge();
    

    //配置多个消费组
    @KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup2")
    public void listenConsumerGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) 
        String value = record.value();
        System.out.printf("ConsumerGroup2收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
        //手动提交offset
        ack.acknowledge();
    

4、启动项目

启动项目,从打印信息看出,每个消费者都会输出其配置信息。


单元测试ok:

有了之前 Kafka客户端API的使用,Spring Boot整合Kafka使用就更加简单了。

传送门(Kafka客户端API):https://blog.csdn.net/qq_42402854/article/details/124994563

– 求知若饥,虚心若愚。

以上是关于Spring Boot整合Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Kafka学习--spring boot 整合kafka

spring boot+kafka整合(未完待续)

kafka学习Spring Boot 整合 Kafka

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

知识点16--spring boot整合kafka

spring boot怎么启动kafka