kafka 消费者进行消费数据的各种场景的API(你值得一看)

Posted 健康平安的活着

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 消费者进行消费数据的各种场景的API(你值得一看)相关的知识,希望对你有一定的参考价值。

一 kafka消费端的参数

 

二 实现案例

2.1 订阅某个主题

创建一个独立消费者,消费 kafka-ljf 主题中数据。 注意: 在消费者 API 代码中必须配置消费者组 id 。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id 2.消费者代码
package com.ljf.spring.boot.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @ClassName: ConsumerTopicDemo
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/10 14:02:05
 * @Version: V1.0
 **/
public class ConsumerTopicDemo 
    public static void main(String[] args) 
        // 1.创建消费者的配置对象
        Properties properties = new Properties();
        // 2.给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new
                KafkaConsumer<String, String>(properties);
        // 注册要消费的主题(可以消费多个主题)
        ArrayList<String> topics = new ArrayList<>();
        topics.add("kafka-ljf");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true) 
            // 设置 1s 中消费一批数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) 
                System.out.println(consumerRecord);
            
        
    


3.执行生产者产生数据

 4.消费数据,观察

2.2 订阅某个主题下的某个分区

需求:创建一个独立消费者,消费 kafka-ljf主题 0 号分区的数据。

2.代码

 

package com.ljf.spring.boot.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @ClassName: ConsumerPartitionDemo
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/10 14:55:31
 * @Version: V1.0
 **/
public class ConsumerPartitionDemo 
    public static void main(String[] args) 
        // 1.创建消费者的配置对象
        Properties properties = new Properties();
        // 2.给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(必须),名字可以任意起
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"beijing");
        KafkaConsumer<String, String> kafkaConsumer = new
                KafkaConsumer<>(properties);
        // 消费某个主题的某个分区数据,0号分区
        ArrayList<TopicPartition> topicPartitions = new
                ArrayList<>();
        topicPartitions.add(new TopicPartition("kafka-ljf", 0));
        kafkaConsumer.assign(topicPartitions);
        while (true)
            ConsumerRecords<String, String> consumerRecords =
                    kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord :
                    consumerRecords) 
                System.out.println(consumerRecord);
            
        
    

 3.生产者生产数据

 4.消费者消费

可见只消费了0号分区上的数据 

2.3  消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

1.consumer代码复制一份,变为两个消费者

 

2. 消费者2:

 3.生产者:

 

4.查看消费者消费信息

 5.查看消费者2消费信息

 结论:即可看到两个消费者在消费不同 分区的数据。消费者一消费分区1的数据,消费者2消费分区2的数据。

2.4  指定offset消费

auto.offset.reset = earliest | latest | none   其中 默认是 latest Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量 时(例如该数据已被删除),该怎么办? 1 earliest :自动将偏移量重置为最早的偏移量,相当于   --from-beginning (2) latest (默认值) :自动将偏移量重置为最新偏移量。 (3) none :如果未找到消费者组的先前偏移量,则向消费者抛出异常。

 (4)任意指定 offset 位移开始消费

代码:

具体代码:

package com.ljf.spring.boot.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

/**
 * @ClassName: ConsumerSeekDemo
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/10 16:08:01
 * @Version: V1.0
 **/
public class ConsumerSeekDemo 
    public static void main(String[] args) 
        // 0 配置信息
        Properties properties = new Properties();
        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key value 反序列化

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
        // 1 创建一个消费者
        KafkaConsumer<String, String> kafkaConsumer = new
                KafkaConsumer<>(properties);
        // 2 订阅一个主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("kafka-ljf");
        kafkaConsumer.subscribe(topics);
        Set<TopicPartition> assignment= new HashSet<>();
        while (assignment.size() == 0) 
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assignment = kafkaConsumer.assignment();
        
        // 遍历所有分区,并指定 offset 从 10 的位置开始消费
        for (TopicPartition tp: assignment) 
            kafkaConsumer.seek(tp, 10);
        
        // 3 消费该主题数据
        while (true) 
            ConsumerRecords<String, String> consumerRecords =
                    kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) 
                System.out.println(consumerRecord);
            
        
    


 结果:

可以看到都是从0,1分区中,offset为10的位置开始查询的。 

注意:每次执行完,需要修改消费者组名;

2.4  指定offset设置为earliest

auto.offset.reset = earliest | latest | none   其中默认是 latest。本案例设置为earliest。

注意:每次执行完,需要修改消费者组名;每次执行要起一个不同的消费组的名字

代码

 

package com.ljf.spring.boot.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @ClassName: ConsumerDefineOffset
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/10 16:30:01
 * @Version: V1.0
 **/
public class ConsumerDefineOffset 
    public static void main(String[] args) 
        // 1.创建消费者的配置对象
        Properties properties = new Properties();
        // 2.给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        //设置读取的offset的位置
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        // 配置消费者组(必须),名字可以任意起
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"shanghai");//注意:每次执行完,需要修改消费者组名;
        KafkaConsumer<String, String> kafkaConsumer = new
                KafkaConsumer<>(properties);
        // 消费某个主题的某个分区数据,0号分区
        ArrayList<TopicPartition> topicPartitions = new
                ArrayList<>();
        topicPartitions.add(new TopicPartition("kafka-ljf", 0));
        kafkaConsumer.assign(topicPartitions);
        while (true)
            ConsumerRecords<String, String> consumerRecords =
                    kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord :
                    consumerRecords) 
                System.out.println(consumerRecord);
            
        
    

3.执行结果

 2.5  指定时间消费数据

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。 例如要求按照时间消费前一天的数据,怎么处理?
package com.ljf.spring.boot.demo.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * @ClassName: ConsumerRangeTime
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/04/10 16:53:36
 * @Version: V1.0
 **/
public class ConsumerRangeTime 
    public static void main(String[] args) 
        Properties properties = new Properties();
        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
        // key value 反序列化

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-time");
        // 1 创建一个消费者
        KafkaConsumer<String, String> kafkaConsumer = new
                KafkaConsumer<>(properties);
        // 2 订阅一个主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("kafka-ljf");
        kafkaConsumer.subscribe(topics);
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size() == 0) 
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assignment = kafkaConsumer.assignment();
        
        HashMap<TopicPartition, Long> timestampToSearch = new
                HashMap<>();
        // 封装集合存储,每个分区对应一天前的数据
        for (TopicPartition topicPartition : assignment) 
            timestampToSearch.put(topicPartition, System.currentTimeMillis() - 5 * 24 * 3600 * 1000);
        
        // 获取从 1 天前开始消费的每个分区的 offset
        Map<TopicPartition, OffsetAndTimestamp> offsets =
                kafkaConsumer.offsetsForTimes(timestampToSearch);
        // 遍历每个分区,对每个分区设置消费时间。
        for (TopicPartition topicPartition : assignment) 
            OffsetAndTimestamp offsetAndTimestamp =
                    offsets.get(topicPartition);
            // 根据时间指定开始消费的位置
            if (offsetAndTimestamp != null)
                kafkaConsumer.seek(topicPartition,
                        offsetAndTimestamp.offset());
            
        
        // 3 消费该主题数据
        while (true) 
            ConsumerRecords<String, String> consumerRecords =
                    kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord :
                    consumerRecords) 
                System.out.println(consumerRecord);
            
        
    


结果:

 

以上是关于kafka 消费者进行消费数据的各种场景的API(你值得一看)的主要内容,如果未能解决你的问题,请参考以下文章

解开Kafka神秘的面纱:kafka架构与应用场景

我应该使用啥:Kafka Stream 或 Kafka 消费者 API 或 Kafka 连接

springboot中实现kafa指定offset消费

社区福利 | Kafka通过Flume传输数据到HBase

分布式场景下Kafka消息顺序性的思考

Kafka核心API——Consumer消费者