SpringBoot整合Kafka下

Posted 楚天阔.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Kafka下相关的知识,希望对你有一定的参考价值。

当消费端是批量接收消息,配置中的自动提交需要关闭,同时要把手动提交打开

kafka:
    ###########【Kafka集群】###########
    bootstrap-servers: 192.168.188.128:9092
    producer:
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
#      properties:
#        #自定义分区器
#        partitioner.class: com.example.demo.kafka.CustomizePartitioner

    consumer:
      group-id: javagroup
      enable-auto-commit: false
      auto-commit-interval: 1000
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 50
      # 批量消费每次最多消费多少条消息
    listener:
      type: batch
      ack-mode: manual_immediate

注意:批量消费和批量发送是无关的,可以一条条发送批量消费也可以批量发送批量消费
当批量发送时:
Producer:

代码如下:

@Data
public class KafkaMessage 
    public Integer index;
    public String id;
    public String value;


@GetMapping("BatchSend")
    public void sendProducerRecord()
        for(int i=0; i<100; i++) 
            KafkaMessage kafkaMessage = new KafkaMessage();
            kafkaMessage.setIndex(i);
            kafkaMessage.setId(UUID.randomUUID().toString());
            kafkaMessage.setValue("producerRecord " + i);
            ProducerRecord<String, Object> producerRecord =
                    new ProducerRecord<>("topic1", "key1", JSONObject.toJSONString(kafkaMessage));
            kafkaTemplate.send(producerRecord);
        
    

Consumer:

代码:

@KafkaListener(id = "consumer1",groupId = "javagroup", topics = "topic1")
    public void onMessage3(List<ConsumerRecord<?, ?>> records) 
        System.out.println(">>>批量消费一次,records.size()="+records.size());
        for (ConsumerRecord<?, ?> record : records) 
            System.out.println("****"+record.value());
        
    

ApiPost发送请求:


当Producer发送一条消息,Consumer批量消费时:
Producer:

    @GetMapping("Producer")
    public void sendMessage(String normalMessage)
        kafkaTemplate.send("topic1",normalMessage);
    

Consumer不变
ApiPost发送请求:


注意:如果把配置文件里批量消费关掉

但是Consumer还是批量接收时,会报错:

因此批量消费时务必修改配置文件中的批量监听以及手动提交
当配置文件中批量监控以及手动提交开启

而Consumer为简单消费时:

无论批量发送还是简单发送都会报错;

总结:
配置关闭,批量消费报错
配置打开,简单消费报错
因此配置和消费要保持一致

当Producer批量发送,配置文件批量监听手动提交都关闭,而Consumer为单体消费时:

@KafkaListener(topics = "topic1")
    public void onMessage1(ConsumerRecord<?,?>record)
        System.out.println("简单消费: "+record.topic()+"_"+record.partition()+"_"+"_"+record.value());
    


二.回调:
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

//    带回掉的Producer  .addCallback(SuccessCallback<>,FailureCallback<>)
    @GetMapping("CallBackOne")
    public void sendMessage2(String callBackMessage)
        kafkaTemplate.send("topic1",callBackMessage).addCallback(successCallBack->
            String topic = successCallBack.getRecordMetadata().topic();
            int partition = successCallBack.getRecordMetadata().partition();
            long offset = successCallBack.getRecordMetadata().offset();
            System.out.println("发送消息成功: "+topic+"-"+partition+"-"+offset);
        ,failureCallBack->
            System.out.println("发送消息失败:"+failureCallBack.getMessage());
        );
    


第二种写法:

//带回调的Producer addCallback(new ListenableFutureCallback<SendResult<String, Object>>() )
    @GetMapping("CallBackTwo")
    public void sendMessage3(String callBackMessage)
        kafkaTemplate.send("topic2",callBackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() 
            @Override
            public void onFailure(Throwable ex) 
                System.out.println("发送消息失败: "+ex.getMessage());
            

            @Override
            public void onSuccess(SendResult<String, Object> result) 
                System.out.println("发送消息成功: "+"-"+result.getRecordMetadata().offset());
            
        );
    


补充:全局回调
注意:这样设置以后,该单例的kafkaTemplate就有了一个回调,注意是全局回调,只要一个实例中设置,全局生效

@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener 

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) 
        System.out.println("Message send success : " + producerRecord.toString());

    

    @Override
    public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) 
        System.out.println("Message send error : " + producerRecord.toString());
    

    @Autowired(required = false)
    private KafkaSendResultHandler producerListener;
    //全局回调
    @GetMapping("GlobalCallBack")
    public void testProducerListen() throws InterruptedException 
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.send("topic1", "test producer listen");
        Thread.sleep(1000);
    


此时如果调用另一个接口:


此接口虽然未设置回调,但由于是全局回调,所以回调依然生效

三:自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;

③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

public class CustomizePartitioner implements Partitioner 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 
        // 得到 topic 的 partitions 信息
        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
        System.out.println("partitionInfoList"+partitionInfoList);
        System.out.println("topic: "+topic+"-"+"key: "+key.toString()+"-"+"value: "+value.toString());
        int size = partitionInfoList.size();
        System.out.println("分区有: "+size+"个");
        //模拟某客服
        if (key.toString().equals("10000")||key.toString().equals("11111"))
            //放到最后一个分区中
            return size-1;
        
        String phoneNum= key.toString();
        return phoneNum.substring(0,3).hashCode()%(size-1);
    

    @Override
    public void close() 

    

    @Override
    public void configure(Map<String, ?> map) 

    

编写一个Kafka生产者来使用自定义分区器:

/**
 * @program:demo
 * @description:测试自定义分区器Producer
 * @auth0r: SYP
 * @create: Creted on 2022-05-16 11:38
 **/
public class PartitionerProducer
    private static final String[] PHONE_NUMS=new String[]"10000", "10000", "11111", "13700000003", "13700000004",
            "10000", "15500000006", "11111", "15500000008",
            "17600000009", "10000", "17600000011"
    ;

    public static void main(String[] args) throws ExecutionException, InterruptedException 
        //定义配置信息
        Properties props = new Properties();
        //kafka地址,多个地址用逗号分隔
        props.put("bootstrap.servers","192.168.188.128:9092");
        props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
        props.put("retries", 3);// 请求失败重试的次数
        props.put("batch.size", 16384);// batch的大小
        props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
        props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量

        //设置分区器
        props.put("partitioner.class","com.example.demo.kafka.CustomizePartitioner");
        //设置序列化类,可以写类的全路径
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        int count=0;
        int length=PHONE_NUMS.length;

        while (count<10)
            Random rand = new Random();
            String phoneNum=PHONE_NUMS[rand.nextInt(length)];
            ProducerRecord<String, String> record = new ProducerRecord<>("topic4", phoneNum, phoneNum);
            RecordMetadata metadata = producer.send(record).get();
            String result="phoneNum ["+record.value()+"] has been sent to partition " +metadata.partition();
            System.out.println(result);
            Thread.sleep(500);
            count++;
        
        producer.close();
    


编写一个消费者来消费数据:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @program:demo
 * @description:自定义分区器Consumer
 * @auth0r: SYP
 * @create: Creted on 2022-05-18 09:42
 **/
public class PartitionerConsumer 
    private static KafkaConsumer<String,String> consumer;
    private final static String TOPIC="topic4";

    public PartitionerConsumer() 
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.188.128:9092");
        //每个消费者分配独立的组号
        props.put("group.id","javagroup");
        //如果value合法,则自动提交偏移量
        props.put("enable.auto.commit", "true");
        //设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        props.put("session.timeout.ms", "30000");
        //自动重置offset
        //earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
        //latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer=new KafkaConsumer<String,String>(props);
    

    public void consume()
        //消费消息
        consumer.subscribe(Arrays.asList(TOPIC));
        try 
            while (true)
                //拉取records
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String,String> record: records)
                    System.out.printf("消费端"+"partition=%d, offset=%d, key=%s, value=%s",record.partition(),
                            record.offset(),record.key(),record.value());
                    System.out.println();
                
            
        finally 
            consumer.close();
        
    

    public static void main(String[] args) 
        new PartitionerConsumer().consume();
    

我们可以看一下

topic4下面有3个partition

分别运行生产者和消费者代码并查看测试结果:

四:kafka事务提交:
如果在发送时需要创建事务.可以使用kafkaTemplate的executeIntransaction方法来声明事务:

    @GetMapping("transaction")
    public void snedMessage7()

        // 声明事务:后面报错消息不会发出去
//        kafkaTemplate.executeInTransaction(operationsCallBack->
//            operationsCallBack.send("topic1","test executeInTransaction");
//            throw new RuntimeException("fail");
//        );
        //不声明事务:后面报错但前面消息已经发送成功了
        kafkaTemplate.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    

调用接口可以发现不声明事务时,后面报错但前面消息已经发送成功了

声明事务:

@GetMapping("transaction")
    @Transactional
    public void snedMessage7()

        // 声明事务:后面报错消息不会发出去
        kafkaTemplate以上是关于SpringBoot整合Kafka下的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合Kafka

SpringBoot整合Kafka

Springboot整合kafka(*)

三.Kafka入门到精通-SpringBoot整合Kafka(上)

SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQSpringBoot 整合 RabbitMQSpringBoot 整合 Kafka)

springboot 整合kafka