kafka 消费者进行消费数据的各种场景的API(你值得一看)
Posted 健康平安的活着
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 消费者进行消费数据的各种场景的API(你值得一看)相关的知识,希望对你有一定的参考价值。
一 kafka消费端的参数
二 实现案例
2.1 订阅某个主题
创建一个独立消费者,消费 kafka-ljf 主题中数据。 注意: 在消费者 API 代码中必须配置消费者组 id 。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id 。 2.消费者代码package com.ljf.spring.boot.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @ClassName: ConsumerTopicDemo
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/10 14:02:05
* @Version: V1.0
**/
public class ConsumerTopicDemo
public static void main(String[] args)
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<String, String>(properties);
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("kafka-ljf");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true)
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords)
System.out.println(consumerRecord);
3.执行生产者产生数据
4.消费数据,观察
2.2 订阅某个主题下的某个分区
需求:创建一个独立消费者,消费 kafka-ljf主题 0 号分区的数据。
2.代码
package com.ljf.spring.boot.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @ClassName: ConsumerPartitionDemo
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/10 14:55:31
* @Version: V1.0
**/
public class ConsumerPartitionDemo
public static void main(String[] args)
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(必须),名字可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"beijing");
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(properties);
// 消费某个主题的某个分区数据,0号分区
ArrayList<TopicPartition> topicPartitions = new
ArrayList<>();
topicPartitions.add(new TopicPartition("kafka-ljf", 0));
kafkaConsumer.assign(topicPartitions);
while (true)
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords)
System.out.println(consumerRecord);
3.生产者生产数据
4.消费者消费
可见只消费了0号分区上的数据
2.3 消费者组案例
测试同一个主题的分区数据,只能由一个消费者组中的一个消费。1.consumer代码复制一份,变为两个消费者
2. 消费者2:
3.生产者:
4.查看消费者消费信息
5.查看消费者2消费信息
结论:即可看到两个消费者在消费不同 分区的数据。消费者一消费分区1的数据,消费者2消费分区2的数据。
2.4 指定offset消费
auto.offset.reset = earliest | latest | none 其中 默认是 latest 。 当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量 时(例如该数据已被删除),该怎么办? ( 1 ) earliest :自动将偏移量重置为最早的偏移量,相当于 --from-beginning 。 (2) latest (默认值) :自动将偏移量重置为最新偏移量。 (3) none :如果未找到消费者组的先前偏移量,则向消费者抛出异常。(4)任意指定 offset 位移开始消费
代码:
具体代码:
package com.ljf.spring.boot.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
/**
* @ClassName: ConsumerSeekDemo
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/10 16:08:01
* @Version: V1.0
**/
public class ConsumerSeekDemo
public static void main(String[] args)
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("kafka-ljf");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0)
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
// 遍历所有分区,并指定 offset 从 10 的位置开始消费
for (TopicPartition tp: assignment)
kafkaConsumer.seek(tp, 10);
// 3 消费该主题数据
while (true)
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords)
System.out.println(consumerRecord);
结果:
可以看到都是从0,1分区中,offset为10的位置开始查询的。
注意:每次执行完,需要修改消费者组名;2.4 指定offset设置为earliest
auto.offset.reset = earliest | latest | none 其中默认是 latest。本案例设置为earliest。
注意:每次执行完,需要修改消费者组名;每次执行要起一个不同的消费组的名字
代码
package com.ljf.spring.boot.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @ClassName: ConsumerDefineOffset
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/10 16:30:01
* @Version: V1.0
**/
public class ConsumerDefineOffset
public static void main(String[] args)
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
//设置读取的offset的位置
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// 配置消费者组(必须),名字可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"shanghai");//注意:每次执行完,需要修改消费者组名;
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(properties);
// 消费某个主题的某个分区数据,0号分区
ArrayList<TopicPartition> topicPartitions = new
ArrayList<>();
topicPartitions.add(new TopicPartition("kafka-ljf", 0));
kafkaConsumer.assign(topicPartitions);
while (true)
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords)
System.out.println(consumerRecord);
3.执行结果
2.5 指定时间消费数据
在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。 例如要求按照时间消费前一天的数据,怎么处理?package com.ljf.spring.boot.demo.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* @ClassName: ConsumerRangeTime
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/04/10 16:53:36
* @Version: V1.0
**/
public class ConsumerRangeTime
public static void main(String[] args)
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-time");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new
KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("kafka-ljf");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0)
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
HashMap<TopicPartition, Long> timestampToSearch = new
HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment)
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 5 * 24 * 3600 * 1000);
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets =
kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment)
OffsetAndTimestamp offsetAndTimestamp =
offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null)
kafkaConsumer.seek(topicPartition,
offsetAndTimestamp.offset());
// 3 消费该主题数据
while (true)
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords)
System.out.println(consumerRecord);
结果:
以上是关于kafka 消费者进行消费数据的各种场景的API(你值得一看)的主要内容,如果未能解决你的问题,请参考以下文章