kafka的使用详解

Posted 咕噜_咕噜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka的使用详解相关的知识,希望对你有一定的参考价值。

kafka的安装

1.安装jdk

 

2.安装ZooKeeper

 

3.安装kafka

 

4.验证、启动和关闭

  启动ZooKeeper:zkServer.sh start

  验证启动状态:zkServer.sh status

  启动kafka:kafka-server-start.sh  server.properties(该配置文件的路径,能找到改文件既可)

  关闭kafka:kafka-server-stop.sh

kafka的使用1(单机使用)

1.主题管理

通过kafka-topic.sh 脚本,加上相关参数完成主题的管理,包含创建主题、查看分区、删除主题等相关操作

如:

列出现有主题:kafka-topics.sh --list --zookeeper localhost:2181/myKafka

创建主题:kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1

查看分区:kafka-topics.sh --zookeeper localhost:2181/myKafka --list
删除主题:kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_1

2.生产消息

开启生产者:kafka-console-producer.sh --topic topic_1 --broker-list localhost:9092

3.消费消息

开启消费者:

  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1
  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1  --from-beginning(从头消费,不按照偏移量消费)

kafka的实战(通过API使用kafka)

kafka整合springboot以及核心参数的使用

kafka基本使用以及设计原理


上一篇主要讲了kafka的基本安装以及使用,这一篇主要讲解与springboot的整合,以及各个参数的使用,底层的基本原理等

一,springboot整合kafka以及参数详解

需要的依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

yml文件如下

server:
  port: 8080
spring:
  kafka:
    #bootstrap-servers: 175.178.75.153:9092,175.178.75.153:9093,175.178.75.153:9094
    bootstrap-servers: 175.178.75.153:9092
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE

kafka客户端代码填写

@RestController
public class KafkaController 
    //定义主题
    private final static String TOPIC_NAME = "zhs-topic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @RequestMapping("/send")
    public void send() 
        //主题名 , 分区 , k/v
        kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
    

kafka消费端代码的填写

@Component
public class MyConsumer 
    /**
     * @KafkaListener(groupId = "testGroup", topicPartitions = 
     *             @TopicPartition(topic = "topic1", partitions = "0", "1"),
     *             @TopicPartition(topic = "topic2", partitions = "0",
     *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     *     ,concurrency = "6")
     *  //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
     * @param record
     */
    @KafkaListener(topics = "zhs-topic",groupId = "zhsGroup")
    public void listenZhsGroup(ConsumerRecord<String, String> record, Acknowledgment ack) 
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手动提交offset
        //ack.acknowledge();
    
    //配置多个消费组
    /*@KafkaListener(topics = "my-replicated-topic",groupId = "zhsGroup")
    public void listenZhsGroup(ConsumerRecord<String, String> record, Acknowledgment ack) 
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        ack.acknowledge();
    */

这样的话,就可以简单的实现一个kafka的消息发送了。

二,kafka核心参数详解

2.1,生产者核心参数详解

1,生产者发出消息的持久化参数

spring:
	kafka:		
		producer:	
			acks: 1

(1)当ack为0时,表示producer不需要broker确认收到消息的回复,直接将消息发送到集群上面的leader即可,不管消息是否发送成功,就可以继续发送下一条消息,性能最高,但是也有一个缺点,就是很容易丢失消息。适用于海量日志场景,偶尔丢一两条消息无所谓
(2)当acks为1时,至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一 条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。默认使用acks为1,通过后序补偿机制来解决这个消息丢失的问题。
(3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略 会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。

2,kafka客户端发送消息失败重试次数

spring:
	kafka:		
		producer:	
			retries: 1

在消息发送失败的情况下,会进行重试的操作,默认重试间隔为100ms,当然也可以手动设置,通过重试来保证消息的可靠性。
但是重试也可能会造成消息重复发送,broker重复接收消息的情况,因此需要消费者那边做好接收消息的幂等性的问题,如通过日志,或者redis判断消息是否接收过。

3,发送消息的本地缓冲区

spring:
	kafka:		
		producer:	
			buffer-memory: 33554432
            batch-size: 16384

在高并发的情况下,生产者每次可能会生产大量的消息,首先会将消息发送的一个本地的缓冲区buffer-memory里面,然后通过这个batch批量的去本地缓存中拉取消息,默认大小为16k,也就是说当这个batch满了大小16kb,就会将消息发送到broker里面。当然如果batch没满16kb,也不会一直等待,可以通过一个LINGER_MS来设置他的一个batch的存活时间,如设置100ms,就是只要到了100ms就会发送一批数据,不管这个batch是否装满。

2.2,生产者核心参数详解

1,通过poll长轮询去拉取消息
和一般的发布订阅模式不一样,一般的发布订阅模式是通过这个push去推送消息,而kafka消费端是通过poll主动去broker里面拉取消息。

2,消息是否自动提交

spring:
	kafka:		
		consumer:	
			enable-auto-commit: false
            #enable-auto-commit: true
            #auto-commit-interval: 10ms  #自动提交的时间

如果enable-auto-commit被设置为true,则消息会被自动提交,kafka底层会有一个offset来记录消息被消费的偏移量的位置,存储在broker里面,下次去消费的时候就可以知道从哪里开始消费。会通过一个auto-commit-interval来设置自动提交的时间。默认会设置手动提交。

3,消息手动提交的方式

consumer.commitSync();

手动同步提交offset的方式,消费端会通过长轮询的方式,主动去broker里面poll拉取数据。获取到消息之后,会将这个offset写入到这个broker里面的topic之后,才会走后面的业务逻辑。一般这个同步提交这个offset使用的比较多。

consumer.commitAsync(new OffsetCommitCallback() 
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) 
                        if (exception != null) 
                            System.err.println("Commit failed for " + offsets);
                            System.err.println("Commit failed exception: " + exception.getStackTrace());
                        
                    
                );

异步的方式提交这个offset,如果出现异常,就会进行重试的操作。

4,消费方式

 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

(1)根据分区指定消费

//订阅主题,也可以批量订阅多个主题
 consumer.subscribe(Arrays.asList(TOPIC_NAME));
 //消费指定分区
 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

(2)消息回溯消费的方式消费,就是可以重复消费

//消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

(3)指定哪个主题下面的偏移量进行消费

//指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

(4)指定时间消费

5,消息的处理能力问题设置

//一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
/*
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
会将其踢出消费组,将分区分配给别的consumer消费
*/
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

6,当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费

spring:
	kafka:		
		consumer:	
			auto-offset-reset: earliest

(1)latest(默认) :只消费自己启动之后发送到主题的消息,从最后一条消息开始消费,即只消费新来的消息。
(2)earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)

三,kafka设计原理详解

3.1,kafka的核心总控制器controller

在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。

controller作用
1,当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
2,当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
3,当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。
4,controller同时也会监听这个broker和topic,一旦有什么变化,就会作出相应的响应

controller选举机制
在kafka集群启动的时候,会选举一台broker作为controller来管理整个集群。
1,如果集群中的机器是一台一台启动的,那么就会选择第一台机器作为这个controller
2,如果是同时启动,选举的过程是集群中每个broker都会在zookeeper上创建一个 /controller 临时节点,zookeeper会保证有且仅有一个broker能创建成功,这个broker就会成为集群的总控器controller。

当这个controller角色的broker宕机了,此时zookeeper临时节点会消失,集群里其他broker会一直监听这个临时节 点,发现临时节点消失了,就竞争再次创建临时节点,zookeeper又会保证有一个broker成为新的controller。当所有结点停掉之后,zookeeper里面的/controller就会消失不见。

3.2,Partition副本选举Leader机制

当controller感知到分区leader所在的broker挂了,controller会从ISR备份列表里挑第一个broker作为leader,因为第一个broker最先放进ISR 列表,可能是同步数据最多的副本。

副本在进入ISR列表有两个条件:
1,副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通
2,副本能复制leader上的所有写操作,并且不能落后太多

3.3,offset消息记录机制

每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是 consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。

3.4,消费者Rebalance分区分配策略

就是说如果说分区数和这个消费者数量不一致,或者说这个分区数或者消费者数发生改变时,那么就会触发这个rebalance重新平衡的一个分配策略。主要的平衡分区分配策略有三种,分别是:range、round-robin、sticky。

假设存在一个主题有10个分区(0-9),现在有三个consumer消费:
range策略:就是按照分区序号排序,假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消 费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。 比如分区0-3给一个consumer1,分区4-6给一个consumer2,分区7~9给一个consumer3。
round-robin策略:就是轮询分配,比如分区0、3、6、9给一个consumer1,分区1、4、7给一个consumer2,分区2、5、 8给一个consumer3
sticky策略:初始时分配策略与round-robin类似,但是在rebalance的时候,需要保证如下两个原则。
1,分区的分配要尽可能均匀 。
2,分区的分配尽可能与上次分配的保持相同。 当两者发生冲突时,第一个目标优先于第二个目标 。

前面两种策略和最后这一种策略的区别就是,前面两种策略只有存在变动,那么整体都会发生变化,如一个consumer出现宕机的情况,那么所有的consumer和这个分区都会进行一个重新分配;但是,最后一种这个分区策略不需要重新进行这个分配,如果前面两个消费者已经分区好了,就不会去改变这两个原始的分配状态,只需要将这个最后消费者对应的分区直接加到前面两个消费者上面即可。

四,线上问题以及解决方案

4.1,消息丢失问题

消息发送端:当acks设置为0或者1时,都可能造成消息丢失的情况。因此可以将acks设置成-1或者all
消息消费端:消息自动提交的时候可能会造成消息丢失的情况,因此可以将消息设置成手动提交。

4.2,消息重复消费问题

消息发送端:发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
消息消费端: 如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
一般消费端都是要做消费幂等处理的,如通过redis的分布式锁可以实现。

4.3,消息乱序问题

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第 一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了 所以,是否一定要配置重试要根据业务情况而定。
也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送 端到消费端全链路有序。 kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

如果是在多个分区里面,要保证消息消费的顺序消费,可以有以下设计思路:
1,设置一个countDownLautch,收到全部消息,然后设置好这个序号1,2,3,通过这个序号去进行排队消费
2,设置多个内存队列,类似于rabbitMq的路由模式,消费者通过不同的路由去消费。

4.4,消息积压问题

1,线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。 此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,由于一个分区对应一个消费者,因此直接增加的消费者也不能对这个分区里面的消息进行消费,因此可以将这个消息快速转发到其他topic主题里面,在这个topic里面设置很多的分区,然后在启动多个消费者去消费这个新主题的分区,这样就可以大大的增加消费的速度了。
2,由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。 此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

4.5,延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多。 比如 :
1,在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理, 这时就可以使用延时队列来处理这些订单了。
2,订单完成1小时后通知用户进行评价。
实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一 般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处 理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对 应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

4.6,消息回溯

如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。

4.7,消息传递保障问题

at most once(消费者最多收到一次消息,0-1次):acks = 0 可以实现。
at least once(消费者至少收到一次消息,1-多次):ack = all 可以实现。 exactly once(消费者刚好收到一次消息):
at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实 现。

kafka生产者的幂等性:因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加 上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。 具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和 Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再 接收。

4.8,kafka高性能原因

1,磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
2,数据传输的零拷贝
3,读写数据的批量batch处理以及压缩传输

4.9,kafka事务

Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,而Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,不能用来做分布式事务。

4.10,kafka和rabbitMq最大区别

最大的区别就是kafka将这个rabbitMq的各个队列做了一个拆分,里面多了很多这个broker,主题和分区,kafka主要是实现了分布式存储。

以上是关于kafka的使用详解的主要内容,如果未能解决你的问题,请参考以下文章

kafka整合springboot以及核心参数的使用

Kafka 详解------简介

Kafka 监听器详解

一文详解Kafka API

kafka 消费者详解

Kafka核心概念详解