Kafka 消费者API

Posted IT备忘录

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 消费者API相关的知识,希望对你有一定的参考价值。

消费者api,自动提交offset

public class MyConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        //连接的集群
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //开启自动提交(消费偏移量)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //自动提交的延迟
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
        //KV的反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc");

        //消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅主题
        kafkaConsumer.subscribe(Collections.singletonList("first"));

        while (true){
            //获取数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //解析数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
            }
        }

    }
}

 

手动提交offset,同步提交

public class ConsumerOffsetSync {
    public static void main(String[] args) {

        Properties props = new Properties();
        //连接的集群
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //关闭自动提交(消费偏移量)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //KV的反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1");

        //重置offset。
        //earliest:从头开始消费,触发的条件1,换组;条件2:保留的offset指向的数据已经不存在
        //latest:默认值,消费最新的数据。
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅主题
        kafkaConsumer.subscribe(Collections.singletonList("first"));

        while (true){
            //获取数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //解析数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
            }

            //同步提交,当前线程会阻塞直到 offset 提交成功
            kafkaConsumer.commitSync();
        }

    }
}

 

手动提交offset,异步提交

//异步提交
kafkaConsumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed for" +
                offsets);
    }
});

 

以上是关于Kafka 消费者API的主要内容,如果未能解决你的问题,请参考以下文章

kafka 消费者进行消费数据的各种场景的API(你值得一看)

消息队列kafka java API, 新版旧版消费代码

永远运行 kafka 消费者(新的消费者 API)

我应该使用啥:Kafka Stream 或 Kafka 消费者 API 或 Kafka 连接

2021年大数据Kafka:❤️Kafka的java API编写❤️

2021年大数据Kafka:❤️Kafka的java API编写❤️