消费消息+不自动提交

Posted abuduri

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消费消息+不自动提交相关的知识,希望对你有一定的参考价值。

依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

代码

package com.perfect.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class KafkaComsumerTest {

    @Test
    public void cunsumertest(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        //关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

        //latest,earliest
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");


        props.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata1");

        KafkaConsumer<String,String> c = new KafkaConsumer<String, String>(props);

        c.subscribe(Collections.singletonList("test2"));

        while(true){

         ConsumerRecords records = c.poll(100);

         records.forEach(System.out::println);
        }

       // c.close();
    }
}

 

以上是关于消费消息+不自动提交的主要内容,如果未能解决你的问题,请参考以下文章

kafka重复消费的原因

32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

kafka的Java客户端消费者

kafka的Java客户端消费者

Kafka - 如何在使用高级消费者的每条消息后提交偏移量?