kafka基础篇——kafka消费者客户端

Posted 敲代码的小小酥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka基础篇——kafka消费者客户端相关的知识,希望对你有一定的参考价值。

一、入门程序

先上代码,从代码入手,讲解kafka消费者客户端的细节。

public class HelloKafkaConsumer {

    public static void main(String[] args) {
        //设置消费者属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        //反序列化器,与生产者的序列化器相对应
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        //设置消费者的消费者群组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            //消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)
            consumer.subscribe(Collections.singletonList("Hello World"));
            //kafka消费者是通过拉取的方式获得服务端消息
            while(true){
            //循环调用poll方法,获取数据。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
                            record.offset(),record.key(),record.value()));
                }
            }
        } finally {
            consumer.close();
        }

    }

二、消费者属性

消费者有以下属性可以进行设置,在ConsumerConfig类里定义了这些属性:

  • auto.offset.reset
    如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办。值有以下几种:
    earliest:自动将偏移量重置为最早偏移量
    latest:自动将偏移量重置为最新偏移量
    none:抛出异常
  • enable.auto.commit
    表明消费者是否自动提交偏移 默认值true。这个的作用是消费者获取到数据后,进行业务处理,当业务处理操作失败时,我们可以选择不提交偏移量,这样的话下次还可以读取这个数据。
  • max.poll.records
    控制每次poll方法返回的的记录数量 默认值500
  • partition.assignment.strategy
    分区分配给消费者的策略。系统提供两种策略。默认为Range, 把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)。RoundRobin是把主题的分区循环分配给消费者。我们也可以继承PartitionAssignor类自定义分区策略。

三、消费者群组

共同订阅一个主题的消费者们,构成一个消费者群组。在一个主题下,一个分区只能被消费者群组里的一个消费者所消费,而不能被多个消费,但是可以被不同群组的消费者所消费。一个消费者,可以消费一个主题下的多个分区,也可以消费不同主题下的分区。
群组只有一个消费者,则主题下的分区都由这个消费者消费

根据分配策略,将分区分配给消费者
分区和消费者正好一对一
多出来的消费者会空闲
多消费者群组

四、偏移量和提交

消费者读取到分区数据到哪里了,通过偏移量来记录。将读取到信息的偏移量上传给kafka服务器,称之为提交。kafka服务中_consumer_offset 的特殊主题记录了消费者读取的偏移量。需要注意的是,偏移量的提交,提交的是最后一刻的偏移量,消费者一下读取多条数据,偏移量只是提交最后的那个偏移量,而不是每条数据的偏移量都读取。
消费者提交偏移量的方式:
自动提交:自动提交即enable.auto.comnit为true。自动提交我们无需关注,kafka每隔几秒的时候提交一次偏移量。
自动提交的弊端:1.当一个消费者挂了时,如果该消费者的偏移量已经提交了,但是消息的业务处理还没处理完,那么,这种情况下我们会认为是消息丢失了。
2.我们处理完业务了,偏移量还没提交,此时消费者挂掉了,那么会造成数据的重复消费问题。

手动提交:手动提交可以在我们处理完业务后,进行偏移量的提交。如果业务处理失败,我们可以选择不提交,重复读取业务处理失败的消息。手动提交分为同步提交和异步提交。同步提交,线程会等待偏移量提交成功。异步提交不会阻塞线程,可以在回调函数里判断成功和失败。代码如下:

public class SyncAndAsync {
    public static void main(String[] args) {
        /*消息消费者*/
        Properties properties = KafkaConst.consumerConfig("SyncAndAsync",
                StringDeserializer.class,
                StringDeserializer.class);
        /*取消自动提交*/
        properties.put("enable.auto.commit",false);

        KafkaConsumer<String,String> consumer
                = new KafkaConsumer<String, String>(properties);
        try {
            consumer.subscribe(Collections.singletonList(
                    "Hello World"));
            while(true){
                ConsumerRecords<String, String> records
                        = consumer.poll(500);
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format(
                            "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                            record.topic(),record.partition(),record.offset(),
                            record.key(),record.value()));
                    //do our work
                }
                //业务处理的时候,异步提交。
                //consumer.commitAsync();
                /*允许执行回调*/
                consumer.commitAsync(new OffsetCommitCallback() {
                    public void onComplete(
                            Map<TopicPartition, OffsetAndMetadata> offsets,
                            Exception exception) {
                        if(exception!=null){
                            System.out.print("Commmit failed for offsets ");
                            System.out.println(offsets);
                            exception.printStackTrace();
                        }
                    }
                });
            }
        } catch (CommitFailedException e) {
            System.out.println("Commit failed:");
            e.printStackTrace();
        } finally {
            try {
                //最后,再同步提交一次,保证都提交了。
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

五、多线程安全问题

消费者在多线程环境下是不安全的,即多个线程公用一个消费者的话,消息是不安全的。所以我们不要将一个消费者,让多个线程使用。做到一个线程使用一个消费者。

六、群组协调

消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个 消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息, 只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。

七、分区再均衡

当有新的消费者加入群组或者某个消费者挂掉时,或者主题的分区数量发生变化时,都会触发分区再均衡。分区再均衡就是分区与消费者的映射关系重新洗牌。在再均衡期间,消费者无法读取数据,会造成一段时间的停顿。发生分区再均衡后,消费者会先读取_consumer_offset主题中读取分区的最后偏移量,然后再往后进行读取。
需要注意的是,在发生分区再均衡前,消费者会进行一次清理工作:提交偏移量,避免发生消息丢失等问题。
理解: 消费者群组是针对主题而言的。虽然消费者都是和分区进行对接,而且提交的偏移量都是分区的偏移量。但是这个偏移量是这个消费者群组整体的读取偏移量。所以,当发生分区再均衡,一个分区换成这个消费者群组其他的消费者消费时,也是按该分区在该群组的偏移量去继续进行的。

八、再均衡监听器

ConsumerRebalancelistener是kafka为我们提供的再均衡监听器接口,在分区再均衡发生之前和发生之后,我们可以做一些我们自己的业务,ConsumerRebalancelistener源码如下:

public interface ConsumerRebalanceListener {
    void onPartitionsRevoked(Collection<TopicPartition> var1);

    void onPartitionsAssigned(Collection<TopicPartition> var1);
}

其中,onPartitionsRevoked是再均衡之前,执行的方法。onPartitionsAssigned是分区再均衡完成之后,执行的方法。从上面的方法定义中我们可以看到,参数是TopicPartition的集合。它是消费者所订阅的分区。我们可以遍历分区,将偏移量进行提交,以确保再均衡前,把所有的偏移量都提交了,同时,可以把偏移量存在数据库中,确保偏移量的正确性。
同理,分区再均衡之后,遍历消费者订阅的分区,调用seek方法,指定分区再均衡之前的偏移量(从数据库中获取),进行继续消费。

九、处理失败数据再处理思路

当消费者获取到数据并进行业务处理时,如果业务处理失败了,我们不要提交它的偏移量。但是kafka不止返回一条数据,会返回多条记录。消费者在循环遍历数据的时候,如果前一个偏移量数据处理失败了,但是后一个偏移量的数据处理成功了。那么kafka会提交后一个数据的偏移量。这样的话,前一个失败的数据,也就无法读取了。针对这种情况,我们可以改变思路。我们将处理失败的数据的分区,偏移量等信息保存到数据库中,另外单独开启一个消费者,专门去处理这些失败的数据。
在kafka中,我们一定要有结合关系型数据库的思路,来配合kafka的偏移量来解决我们的实际业务问题。

十、spring整合kafka消费者客户端

与spring整合生产者类似,只不过spring配置文件里的配置需要配消费者的。这里我们需要注意一点,在配置生产者时,我们最终得到productTemplate类,通过这个类去调用生产者的方法,发送消息,我们无需自己定义生产者类。而在消费者中,spring整合kafka后,是以监听的形式在接收消息,我们需要手动实现消费者类监听消息后对消息的业务处理,并实现MessageListener接口。接口的onMessage方法就是监听到消费者接收消息触发的方法,在该方法里,就是我们拿接收到的数据做业务处理的方法。
spring配置文件配置具体如下:

<!-- 1.定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                <entry key="group.id" value="spring-kafka-group" />
                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>
     <!-- 2.创建consumerFactory bean -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>
     <!-- 消费者自行确认-3.定义消费实现类,实现MessageListener接口,定义onMessage方法,监听消息并对消息进行业务处理 -->
    <bean id="kafkaConsumerServiceAck" class="xxx.xxx.KafkaConsumer" />

<!-- 4.消费者容器配置信息,监听哪些主题,相当于原生API中的subscribe方法 -->
    <bean id="containerProperties"
          class="org.springframework.kafka.listener.ContainerProperties">
        <constructor-arg name="topics">
            <list>
                <value>kafka-spring-topic</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerService"></property>

    </bean>
    <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
    <bean id="messageListenerContainer"
          class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
          init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        <!--  并发几个消费者 -->
        <property name="concurrency" value="3" />
    </bean>

<!-- 上面我们配置了一个消费者群组,我们还可以配置多个消费者群组,下面我们配置手动提交偏移量的消费者群组 -->
 <!-- 消费者自行确认-1.定义consumer的参数 -->
    <bean id="consumerPropertiesAck" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                <entry key="group.id" value="spring-kafka-group-ack" />
                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="enable.auto.commit" value="false"/>
            </map>
        </constructor-arg>
    </bean>
    <!-- 消费者自行确认-2.创建consumerFactory bean -->
    <bean id="consumerFactoryAck"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
        <constructor-arg>
            <ref bean="consumerPropertiesAck" />
        </constructor-arg>
    </bean>

<!-- 消费者自行确认-3.定义消费实现类 -->
    <bean id="kafkaConsumerServiceAck" class="xxx.xxx.KafkaConsumerAck" />
     <!-- 消费者自行确认-4.消费者容器配置信息 -->
    <bean id="containerPropertiesAck"
          class="org.springframework.kafka.listener.ContainerProperties">
        <!-- topic -->
        <constructor-arg name="topics">
            <list>
                <value>kafka-spring-topic-b</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerServiceAck" />
        <!-- 消费者自行确认模式 -->
        <property name="ackMode" value="MANUAL_IMMEDIATE"></property>
    </bean>
<!-- 消费者自行确认-5.消费者并发消息监听容器,执行doStart()方法 -->
    <bean id="messageListenerContainerAck"
          class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
          init-method="doStart" >
        <constructor-arg ref="consumerFactoryAck" />
        <constructor-arg ref="containerPropertiesAck" />
        <property name="concurrency" value="3" />
    </bean>



下面我们看需要我们手动定义的处理接收消息的MessageListener实现类:

//普通消费者,自动提交偏移量
public class KafkaConsumer  implements MessageListener<String,String> {

    public void onMessage(ConsumerRecord<String, String> data) {
        String name = Thread.currentThread().getName();
        System.out.println(name+"|"+String.format(
                "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                data.topic(),data.partition(),data.offset(),
                data.key(),data.value()));
    }
}

//手动提交偏移量
public class KafkaConsumerAck implements AcknowledgingMessageListener<String,String> {

    public void onMessage(ConsumerRecord<String, String> data,
                          Acknowledgment acknowledgment) {
        String name = Thread.currentThread().getName();
        System.out.println(name+"|"+String.format(
                "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                data.topic(),data.partition(),data.offset(),
                data.key(),data.value()));
        //手动确认
        acknowledgment.acknowledge();

    }
}

这样,我们就配置好了spring环境下的kafka消费者。由此可以看到,在消费者中,我们只需手动实现处理消息的业务逻辑,其他的都通过配置即可。

十一、springboot整合kafka消费者

首先在application.properties配置文件中配置kafka的属性信息:

#============== kafka ===================
kafka.consumer.zookeeper.connect=localhost:2181/kafka-one
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10

kafka.producer.servers=localhost:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

然后,定义配置类:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean ena

以上是关于kafka基础篇——kafka消费者客户端的主要内容,如果未能解决你的问题,请参考以下文章

kafka基础篇——kafka服务端原理及工作机制详解

Kafka3.x核心速查手册二客户端使用篇-2分组消费机制

Kafka3.x核心速查手册二客户端使用篇-2分组消费机制

Kafka3.x核心速查手册二客户端使用篇-1从基础的客户端说起

Kafka3.x核心速查手册二客户端使用篇-1从基础的客户端说起

技术分享|kafka 协议分析 基础篇