kafka基础篇——kafka消费者客户端
Posted 敲代码的小小酥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka基础篇——kafka消费者客户端相关的知识,希望对你有一定的参考价值。
一、入门程序
先上代码,从代码入手,讲解kafka消费者客户端的细节。
public class HelloKafkaConsumer {
public static void main(String[] args) {
//设置消费者属性
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
//反序列化器,与生产者的序列化器相对应
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
//设置消费者的消费者群组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
try {
//消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)
consumer.subscribe(Collections.singletonList("Hello World"));
//kafka消费者是通过拉取的方式获得服务端消息
while(true){
//循环调用poll方法,获取数据。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
record.offset(),record.key(),record.value()));
}
}
} finally {
consumer.close();
}
}
二、消费者属性
消费者有以下属性可以进行设置,在ConsumerConfig类里定义了这些属性:
- auto.offset.reset
如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办。值有以下几种:
earliest:自动将偏移量重置为最早偏移量
latest:自动将偏移量重置为最新偏移量
none:抛出异常 - enable.auto.commit
表明消费者是否自动提交偏移 默认值true。这个的作用是消费者获取到数据后,进行业务处理,当业务处理操作失败时,我们可以选择不提交偏移量,这样的话下次还可以读取这个数据。 - max.poll.records
控制每次poll方法返回的的记录数量 默认值500 - partition.assignment.strategy
分区分配给消费者的策略。系统提供两种策略。默认为Range, 把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)。RoundRobin是把主题的分区循环分配给消费者。我们也可以继承PartitionAssignor类自定义分区策略。
三、消费者群组
共同订阅一个主题的消费者们,构成一个消费者群组。在一个主题下,一个分区只能被消费者群组里的一个消费者所消费,而不能被多个消费,但是可以被不同群组的消费者所消费。一个消费者,可以消费一个主题下的多个分区,也可以消费不同主题下的分区。
四、偏移量和提交
消费者读取到分区数据到哪里了,通过偏移量来记录。将读取到信息的偏移量上传给kafka服务器,称之为提交。kafka服务中_consumer_offset 的特殊主题记录了消费者读取的偏移量。需要注意的是,偏移量的提交,提交的是最后一刻的偏移量,消费者一下读取多条数据,偏移量只是提交最后的那个偏移量,而不是每条数据的偏移量都读取。
消费者提交偏移量的方式:
自动提交:自动提交即enable.auto.comnit为true。自动提交我们无需关注,kafka每隔几秒的时候提交一次偏移量。
自动提交的弊端:1.当一个消费者挂了时,如果该消费者的偏移量已经提交了,但是消息的业务处理还没处理完,那么,这种情况下我们会认为是消息丢失了。
2.我们处理完业务了,偏移量还没提交,此时消费者挂掉了,那么会造成数据的重复消费问题。
手动提交:手动提交可以在我们处理完业务后,进行偏移量的提交。如果业务处理失败,我们可以选择不提交,重复读取业务处理失败的消息。手动提交分为同步提交和异步提交。同步提交,线程会等待偏移量提交成功。异步提交不会阻塞线程,可以在回调函数里判断成功和失败。代码如下:
public class SyncAndAsync {
public static void main(String[] args) {
/*消息消费者*/
Properties properties = KafkaConst.consumerConfig("SyncAndAsync",
StringDeserializer.class,
StringDeserializer.class);
/*取消自动提交*/
properties.put("enable.auto.commit",false);
KafkaConsumer<String,String> consumer
= new KafkaConsumer<String, String>(properties);
try {
consumer.subscribe(Collections.singletonList(
"Hello World"));
while(true){
ConsumerRecords<String, String> records
= consumer.poll(500);
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format(
"主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
//do our work
}
//业务处理的时候,异步提交。
//consumer.commitAsync();
/*允许执行回调*/
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(
Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if(exception!=null){
System.out.print("Commmit failed for offsets ");
System.out.println(offsets);
exception.printStackTrace();
}
}
});
}
} catch (CommitFailedException e) {
System.out.println("Commit failed:");
e.printStackTrace();
} finally {
try {
//最后,再同步提交一次,保证都提交了。
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}
五、多线程安全问题
消费者在多线程环境下是不安全的,即多个线程公用一个消费者的话,消息是不安全的。所以我们不要将一个消费者,让多个线程使用。做到一个线程使用一个消费者。
六、群组协调
消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个 消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息, 只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。
七、分区再均衡
当有新的消费者加入群组或者某个消费者挂掉时,或者主题的分区数量发生变化时,都会触发分区再均衡。分区再均衡就是分区与消费者的映射关系重新洗牌。在再均衡期间,消费者无法读取数据,会造成一段时间的停顿。发生分区再均衡后,消费者会先读取_consumer_offset主题中读取分区的最后偏移量,然后再往后进行读取。
需要注意的是,在发生分区再均衡前,消费者会进行一次清理工作:提交偏移量,避免发生消息丢失等问题。
理解: 消费者群组是针对主题而言的。虽然消费者都是和分区进行对接,而且提交的偏移量都是分区的偏移量。但是这个偏移量是这个消费者群组整体的读取偏移量。所以,当发生分区再均衡,一个分区换成这个消费者群组其他的消费者消费时,也是按该分区在该群组的偏移量去继续进行的。
八、再均衡监听器
ConsumerRebalancelistener是kafka为我们提供的再均衡监听器接口,在分区再均衡发生之前和发生之后,我们可以做一些我们自己的业务,ConsumerRebalancelistener源码如下:
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> var1);
void onPartitionsAssigned(Collection<TopicPartition> var1);
}
其中,onPartitionsRevoked是再均衡之前,执行的方法。onPartitionsAssigned是分区再均衡完成之后,执行的方法。从上面的方法定义中我们可以看到,参数是TopicPartition的集合。它是消费者所订阅的分区。我们可以遍历分区,将偏移量进行提交,以确保再均衡前,把所有的偏移量都提交了,同时,可以把偏移量存在数据库中,确保偏移量的正确性。
同理,分区再均衡之后,遍历消费者订阅的分区,调用seek方法,指定分区再均衡之前的偏移量(从数据库中获取),进行继续消费。
九、处理失败数据再处理思路
当消费者获取到数据并进行业务处理时,如果业务处理失败了,我们不要提交它的偏移量。但是kafka不止返回一条数据,会返回多条记录。消费者在循环遍历数据的时候,如果前一个偏移量数据处理失败了,但是后一个偏移量的数据处理成功了。那么kafka会提交后一个数据的偏移量。这样的话,前一个失败的数据,也就无法读取了。针对这种情况,我们可以改变思路。我们将处理失败的数据的分区,偏移量等信息保存到数据库中,另外单独开启一个消费者,专门去处理这些失败的数据。
在kafka中,我们一定要有结合关系型数据库的思路,来配合kafka的偏移量来解决我们的实际业务问题。
十、spring整合kafka消费者客户端
与spring整合生产者类似,只不过spring配置文件里的配置需要配消费者的。这里我们需要注意一点,在配置生产者时,我们最终得到productTemplate类,通过这个类去调用生产者的方法,发送消息,我们无需自己定义生产者类。而在消费者中,spring整合kafka后,是以监听的形式在接收消息,我们需要手动实现消费者类监听消息后对消息的业务处理,并实现MessageListener接口。接口的onMessage方法就是监听到消费者接收消息触发的方法,在该方法里,就是我们拿接收到的数据做业务处理的方法。
spring配置文件配置具体如下:
<!-- 1.定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}" />
<entry key="group.id" value="spring-kafka-group" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 2.创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<!-- 消费者自行确认-3.定义消费实现类,实现MessageListener接口,定义onMessage方法,监听消息并对消息进行业务处理 -->
<bean id="kafkaConsumerServiceAck" class="xxx.xxx.KafkaConsumer" />
<!-- 4.消费者容器配置信息,监听哪些主题,相当于原生API中的subscribe方法 -->
<bean id="containerProperties"
class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics">
<list>
<value>kafka-spring-topic</value>
</list>
</constructor-arg>
<property name="messageListener" ref="kafkaConsumerService"></property>
</bean>
<!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainer"
class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart" >
<constructor-arg ref="consumerFactory" />
<constructor-arg ref="containerProperties" />
<!-- 并发几个消费者 -->
<property name="concurrency" value="3" />
</bean>
<!-- 上面我们配置了一个消费者群组,我们还可以配置多个消费者群组,下面我们配置手动提交偏移量的消费者群组 -->
<!-- 消费者自行确认-1.定义consumer的参数 -->
<bean id="consumerPropertiesAck" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}" />
<entry key="group.id" value="spring-kafka-group-ack" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="enable.auto.commit" value="false"/>
</map>
</constructor-arg>
</bean>
<!-- 消费者自行确认-2.创建consumerFactory bean -->
<bean id="consumerFactoryAck"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
<constructor-arg>
<ref bean="consumerPropertiesAck" />
</constructor-arg>
</bean>
<!-- 消费者自行确认-3.定义消费实现类 -->
<bean id="kafkaConsumerServiceAck" class="xxx.xxx.KafkaConsumerAck" />
<!-- 消费者自行确认-4.消费者容器配置信息 -->
<bean id="containerPropertiesAck"
class="org.springframework.kafka.listener.ContainerProperties">
<!-- topic -->
<constructor-arg name="topics">
<list>
<value>kafka-spring-topic-b</value>
</list>
</constructor-arg>
<property name="messageListener" ref="kafkaConsumerServiceAck" />
<!-- 消费者自行确认模式 -->
<property name="ackMode" value="MANUAL_IMMEDIATE"></property>
</bean>
<!-- 消费者自行确认-5.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainerAck"
class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart" >
<constructor-arg ref="consumerFactoryAck" />
<constructor-arg ref="containerPropertiesAck" />
<property name="concurrency" value="3" />
</bean>
下面我们看需要我们手动定义的处理接收消息的MessageListener实现类:
//普通消费者,自动提交偏移量
public class KafkaConsumer implements MessageListener<String,String> {
public void onMessage(ConsumerRecord<String, String> data) {
String name = Thread.currentThread().getName();
System.out.println(name+"|"+String.format(
"主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
data.topic(),data.partition(),data.offset(),
data.key(),data.value()));
}
}
//手动提交偏移量
public class KafkaConsumerAck implements AcknowledgingMessageListener<String,String> {
public void onMessage(ConsumerRecord<String, String> data,
Acknowledgment acknowledgment) {
String name = Thread.currentThread().getName();
System.out.println(name+"|"+String.format(
"主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
data.topic(),data.partition(),data.offset(),
data.key(),data.value()));
//手动确认
acknowledgment.acknowledge();
}
}
这样,我们就配置好了spring环境下的kafka消费者。由此可以看到,在消费者中,我们只需手动实现处理消息的业务逻辑,其他的都通过配置即可。
十一、springboot整合kafka消费者
首先在application.properties配置文件中配置kafka的属性信息:
#============== kafka ===================
kafka.consumer.zookeeper.connect=localhost:2181/kafka-one
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
kafka.producer.servers=localhost:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
然后,定义配置类:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean ena以上是关于kafka基础篇——kafka消费者客户端的主要内容,如果未能解决你的问题,请参考以下文章
Kafka3.x核心速查手册二客户端使用篇-1从基础的客户端说起