从KafkaProducer源码学习异步发送,缓冲区管理,NIO编程。

Posted jazon@

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从KafkaProducer源码学习异步发送,缓冲区管理,NIO编程。相关的知识,希望对你有一定的参考价值。

KafkaProducer初始化参数

  • clientId没主动设置clientId时,后台都会生成一个client.id,producer-自增长的数字,producer-1。

  • partitioner决定消息路由到Topic的哪个分区里去的

  • metadata组件,生产端拉取Topic的元数据,包括Topic有哪些分区,分区的Leader位于哪个broker,有一个metadata.max.age参数默认是五分钟,强制重新刷新数据。

  • request.max.size默认是1mb,一次请求最大为1mb。buffer.memory默认是32mb,异步用的缓冲区。max.block.ms用于控制send方法阻塞多久,默认60s。

  • 核心组件:RecordAccumulator,缓冲区,负责消息的复杂的缓冲机制,发送到每个分区的消息会被打包成batch,一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb),设置一个linger.ms,如果在指定时间范围内,都没凑出来一个batch把这条消息发送出去,那么到了这个linger.ms指定的时间,比如说5ms,如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去。

  • 核心行为:始化的时候,直接调用Metadata组件的方法,去broker上拉取了一次集群的元数据过来,后面每隔5分钟会默认刷新一次集群元数据,但是在发送消息的时候,如果没找到某个Topic的元数据,一定也会主动去拉取一次的

  • 核心组件:网络通信的组件,NetworkClient,一个网络连接最多空闲多长时间(9分钟),每个连接最多有几个request没收到响应(5个),重试连接的时间间隔(50ms),Socket发送缓冲区大小(128kb),Socket接收缓冲区大小(32kb)

  • 核心组件:Sender线程,负责从缓冲区里获取消息发送到broker上去,request最大大小(1mb),acks(1,只要leader写入成功就认为成功),重试次数(0,无重试),请求超时的时间(30s),线程类叫做“KafkaThread”,线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动。

  • 核心组件:序列化组件,拦截器组件

集群元数据存储

KafkaProducer在初始化的时候是不会去拉取集群的元数据的,做了一个最最基本的初始化,也就是仅仅把我们配置的那个broker的地址放了进去,在客户端缓存集群元数据的时候,采用了哪些数据结构。

  • List<Node>,Kafka Broker节点,一台机器
  • unautorhizedTopics,没有被授权访问的Topic的列表,就是kafka是可以支持权限控制的,如果你的客户端没有被授权访问某个Topic,那么就会放在这个列表里。
  • Map<TopicParittion, PartitionInfo>,TopicPartition就代表了一个分区,里面就是他的topic的名字,以及他在topic里的分区号;PartitioinInfo,就代表了分区的详细信息,属于哪个topic,分区号,每个分区都有多个副本,Leader在哪个broker上,followers在哪些broker上,ISR列表,都在里面。
  • partitionsByTopic,每个topic有哪些分区
  • availablePartitionsByTopic,每个topic有哪些当前可用的分区,如果某个分区没有leader是存活的,此时那个分区就不可用了。
  • partitionsByNode,每个broker上放了哪些分区。
  • nodesById,broker.id -> Node

Producer.Send()

  • 回调自定义的拦截器
  • 同步阻塞等待获取topic元数据

如果你要往一个topic里发送消息,必须是得有这个topic的元数据的,你必须要知道这个topic有哪些分区,然后根据Partitioner组件去选择一个分区,然后知道这个分区对应的leader所在的broker,才能跟那个broker建立连接,发送消息。调用同步阻塞的方法,去等待先得获取到那个topic对应的元数据,如果此时客户端还没缓存那个topic的元数据,那么一定会发送网络请求到broker去拉取那个topic的元数据过来,但是下一次就可以直接根据缓存好的元数据来发送了

  • 序列化key和value

你的key和value可以是各种各样的类型,比如说String、Double、Boolean,或者是自定义的对象,但是如果要发送消息到broker,必须对这个key和value进行序列化,把那些类型的数据转换成byte[]字节数组的形式

  • 基于获取到的topic元数据,使用Partitioner组件获取消息对应的分区
  • 检查要发送的这条消息是否超出了请求最大大小,以及内存缓冲最大大小
  • 设置好自定义的callback回调函数以及对应的interceptor拦截器的回调函数
  • 将消息添加到内存缓冲里去,RecordAccumulator组件负责的
  • 如果某个分区对应的batch填满了,或者是新创建了一个batch,此时就会唤醒Sender线程,让他来进行工作,负责发送batch

Topic元数据细粒度按需加载及阻塞等待

在这里插入图片描述

如果元数据拉取成功,那么version会加一,所以在唤醒后只需要判断当前version是不是大于之前的version就可以判定元数据是否拉取成功。如果超时还没判定成功,则认为是元数据拉取失败。

Sender线程初始化

public KafkaThread(final String name, Runnable runnable, boolean daemon) {
        super(runnable, name);
        configureThread(name, daemon);
    }

    private void configureThread(final String name, boolean daemon) {
        setDaemon(daemon);
        setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            public void uncaughtException(Thread t, Throwable e) {
                log.error("Uncaught exception in thread '{}':", name, e);
            }
        });
    }

如果没指定分区key是如何对消息负载均衡分发到分区的

counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());

初始值是一个随机的integer类型的数字,接下来默认是递增的,一定会保证是一个正整数,就是比如说topic有5个分区,就会对这个递增的数字(23),对topic的分区数量进行取模。

Kafka的内存缓冲区

Kafka实现了一个BufferPool,缓冲池,可以利用它申请内存。

/**
有人的消息是52kb,超出了16kb,分配的那个ByteBuffer就会是52kb,如果对52kb的ByteBuffer进行处理,当deallocate的时候他会直接释放掉这块内存,不去加入到free,让gc掉,avaialbeMemory给加回去
*/
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;
        this.lock = new ReentrantLock();
        // 要释放的ByteBuffer会放入free缓存起来,避免重复向操作系统申请
        // 这里只会将poolableSize的放入,也就是等于batch.size的
        // 下次申请size等于poolableSize的ByteBuf就可以从free直接返回
        this.free = new ArrayDeque<>();
        this.waiters = new ArrayDeque<>();
        this.totalMemory = memory;
        this.nonPooledAvailableMemory = memory;
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
        MetricName rateMetricName = metrics.metricName("bufferpool-wait-ratio",
                                                   metricGrpName,
                                                   "The fraction of time an appender waits for space allocation.");
        MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
                                                   metricGrpName,
                                                   "The total time an appender waits for space allocation.");
        this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
    }

batches是Kafka实现的一个并发安全的Map即CopyOrWriteMap类,在put方法上加了synchronized,并且是使用的copy进行put。batches的Key是TopicPartition,value是Deque<ProducerBatch>。当尝试将一条记录写入到batch失败时,就会申请ByteBuf以创建新的Batch,并且将创建的Batch放入Dequeue中。

一条消息是如何按照二进制协议规范写入Batch的ByteBuf中的

offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value

是严格的按照二进制协议的规范,他规范里规定了,就是先是几个字节的offset,然后是几个字节的size,然后是几个字节的crc,接着是几个字节的magic,以此类推,他就是完全按照规范来写入ByteBuffer里去的,可以看到他最最底层的写入ByteBuffer的IO流的方式。ByteBufferOutputStream包裹了ByteBuffer,持有一个针对ByteBuffer的输出流,接着会把ByteBufferOutputStream给包裹在一个压缩流里,gzip、lz4、snappy,如果是包裹在压缩流里,写入的时候会先进入压缩流的缓冲区,压缩流会把一条消息放在缓冲区里,用压缩算法给压缩了,再写入底层的ByteBufferOutputStream里去,如果是非压缩的模式,最最普通的情况下,就是DataOutputStream包裹了ByteBufferOutputSteram,然后写入数据,Long、Byte、String,都会在底层转换为字节进入到ByteBuffer里去。

判断Batch是否还有足够空间写入一条记录

public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
        if (isFull())
            return false;

        // We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed)
        if (numRecords == 0)
            return true;

        final int recordSize;
        if (magic < RecordBatch.MAGIC_VALUE_V2) {
            recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
        } else {
            int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
            long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp;
            recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
        }

        // Be conservative and not take compression of the new record into consideration.
        return this.writeLimit >= estimatedBytesWritten() + recordSize;
    }

KafkaSender线程在做什么

1.确定哪些partition有已经写满的batch,batch创建的时间已经超过了linger.ms,此时就有可以发送出去的batch了,收集出来的PartitionLeader所在的broker。

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

2.如果有一些partition对应的元数据没拉取到,此时就必须标识一下,必须要在后面去尝试拉取元数据。

3.检查一下是否准备好可以向那些Borker发送数据了,就是说如果此时还没跟某个Broker建立好连接,必须在这里把长连接准备好,TCP连接,然后才可以把数据发送过去,直接就是基于最底层的NIO来开发的。

4.你有很多Partiton可以发送数据,有一些Partition Leader是在同一个Broker上,此时按照Broker对Partition进行分组,找到一个Broker对应的多个Partition的Batch,如果一个batch已经在内存缓冲里停留超过60s,超时不要了。

5.对每个Broker都创建一个ClientRequest,包括了多个Batch,就是在这个Broker上的多个LeaderPartition所对应的Batch,聚合起来组成一个ClientRequest,形成一个请求,将他设置到Sender变量中,等待NetWorkClient发送。

6.通过NetWorkClient走底层的网络通信,把每个Broker的ClientRequest给发送过去就可以了,poll方法,他是负责实际的 进行网络IO通信操作的一个核心的方法,负责发送数据出去,也包括读取响应回来。

Batch何时判定为可以发送出去

ProducerBatch batch = deque.peekFirst();
if (batch != null) {
    long waitedTimeMs = batch.waitedTimeMs(nowMs);
    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
    boolean full = deque.size() > 1 || batch.isFull();
    boolean expired = waitedTimeMs >= timeToWaitMs;
    boolean sendable = full || expired || exhausted || closed || flushInProgress();
    if (sendable && !backingOff) {
        readyNodes.add(leader);
    } else {
        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
        // Note that this results in a conservative estimate since an un-sendable partition may have
        // a leader that will later be found to have sendable data. However, this is good enough
        // since we'll just wake up and then sleep again for the remaining time.
        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
    }
}

刚开始的时候,默认情况下,发送一个Batch,肯定是不涉及到重试,attempts就一定是0,一定没有进入重试的状态。waitedTimeMs:当前时间减去上一次发送这个Batch的时间,假设一个Batch从来没有发送过,此时当前时间减去这个Batch被创建出来的那个时间,这个Batch从创建开始到现在已经等待了多久了。timeToWaitMs:这个Batch从创建开始算起,最多等待多久就必须去发送,如果是在重试的阶段,这个时间就是重试间隔,但是在非重试的初始阶段,就是linger.ms的时间(100ms)。full:Batch是否已满,如果说Dequeue里超过一个Batch了,说明这个peekFirst返回的Batch就一定是已经满的,另外就是如果假设Dequeue里只有一个Batch,但是判断发现这个Batch达到了16kb的大小,也是已满。expired:当前Batch已经等待的时间大于等于前面计算的最多只能等待的时间。如果linger.ms默认是0,就意味着说,只要Batch创建出来了,在这个地方一定是expired = true。sendable:综合上述所有条件来判断,这个Batch是否需要发送出去,如果Bach已满必须得发送,如果Batch没有写满但是expired也必须得发送出去,如果说Batch没有写满而且也没有expired,但是内存已经消耗完毕也要发送,flushInProgress()就是客户端关闭了,此时也会发送。这里判断成功,是将Node即broker加入到readNodes中。

判定readyNodes里哪些node是可以发送数据过去的

(1)有一个Broker连接状态的缓存,先查一下这个缓存,当前这个Broker是否已经建立了连接了,如果是的话,才可以继续判断其他的条件。

(2)Selector,你大概可以认为底层封装的就是Java NIO的 Selector,但凡是看过我的NIO课程,跟着做NIO研发分布式文件系统,Selector上要注册很多Channel,每个Channel就代表了跟一个Broker建立的连接。

(3)inFlightRequests,有一个参数可以设置这个东西,默认是对同一个Broker同一时间最多容忍5个请求发送过去但是还没有收到响应,所以如果对一个Broker已经发送了5个请求,都没收到响应,此时就不可以继续发送了。

Producer与broker连接

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        ensureNotRegistered(id);
        SocketChannel socketChannel = SocketChannel.open();
        SelectionKey key = null;
        try {
            configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
            // 异步的,不一定真一下连上了
            boolean connected = doConnect(socketChannel, address);
            // 向selector表示对连接感兴趣,如果上一步此时没连上,在selector有个事件出来的
            key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

            if (connected) {
                // OP_CONNECT won't trigger for immediately connected channels
                log.debug("Immediately connected to node {}", id);
                immediatelyConnectedKeys.add(key);
                key.interestOps(0);
            }
        } catch (IOException | RuntimeException e) {
            if (key != null)
                immediatelyConnectedKeys.remove(key);
            channels.remove(id);
            socketChannel.close();
            throw e;
        }
    }

工业级NIO底层应该设置哪些网络参数

// 非阻塞
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
// 长连接
socket.setKeepAlive(true);
// 发送缓冲区
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    socket.setSendBufferSize(sendBufferSize);
// 接受缓冲区
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    socket.setReceiveBufferSize(receiveBufferSize);
// 非延迟,送出去的数据包立马就是通过网络传输过去
socket.setTcpNoDelay(true);

在不断轮询的poll方法中如何将request发送出去

如果说已经发送完毕数据了,那么就可以取消对OP_WRITE事件的关注,否则如果一个Request的数据都没发送完毕,此时还需要保持对OP_WRITE事件的关注,而且如果发送完毕了,就会放到completedSends里面去。

对于已经发送给Broker的请求会进行什么样的后续处理

expectResponse应该是通过acks计算出来的,如果说acks = 0的话,也就是不需要对一个请求接收响应,此时expectResponse应该就是false,这个时候直接就会把这个Request从inFlightRequests里面移出去,直接就可以返回一个响应了,其实就是做一个回调。

Kafka的生产端发送时如何处理拆包问题

如果说一个请求对应的ByteBuffer中的二进制字节数据一次write没有全部发送完毕,如果说一次请求没有发送完毕,此时肯定remaining是大于0,此时就不会取消对OP_WRITE事件的监听,假设此时针对某个Broker是说,此时是可以再次发送一个Request了,必须得先判断一下,这个Broker上一次发送的Request请求是否发送完毕了,那个request中的数据是否发送完了呢?即使发送完毕了,还得限制为最多只发送5个request是没有收到响应的,如果说上一次 request出现了类似拆包的问题,一次请求没有发送完毕,此时下次就不会继续往这个broker发送请求了,但是此时针对这个broker还是保持着OP_WRITE的监听,下次调用poll,会发现对这个broker可以再次执行WRITABLE事件,最终会再次对SocketChannel调用write方法,把ByteBuffer里剩余的数据继续往Broker去写,上述的过程重复多次,一定会把这个请求发送完毕的。

Kafka的生产端在读取数据时如何解决粘包问题

要解决粘包问题,就是每个响应中间必须插入一个特殊的几个字节的分隔符,一般来说用作分隔符比如很经典的就是在响应消息前面先插入4个字节(integer类型的)代表响应消息自己本身数据大小的数字

响应消息1,199个字节;响应消息2,238个字节;响应消息3,355个字节

199响应消息(1)238响应消息(2)355响应消息(3)

此时会从channel中读取4个字节的数字,写入到一个叫变量名称为size的ByteBuffer(4个字节),就是如果已经读取到了4个字节,position就会变成4,就会跟limit是一样的,此时就代表着size ByteBuffer的4个字节已经读满了,意味着size读到了,可以直接读实际数据了。ByteBuffer.rewind,把position设置为0,一个ByteBuffer写满之后,调用rewind,把position重置为0,此时就可以从ByteBuffer里读取数据了。ByteBuffer.getInt(),就会默认从ByteBuffer当前position的位置获取4个字节,转换为一个int类型的数字返回给你。接下来就会直接把channel里的一条响应消息的数据读取到一个跟他的大小一致的ByteBuffer中去,粘包问题的解决,就是完美的通过每条消息基于一个4个字节的int数字(他们自己的大小)来进行分割。拆包,假如说size是4个字节,你一次read就读取到了2个字节,连size都没有读取完毕,出现了拆包,此时怎么办呢?或者你读取到了一个size,199个字节,但是在读取响应消息的时候,就读取到了162个字节,拆包问题,响应消息没有读取完毕。对于前者szie buffer还没读满,会接着读size buffer,对于后者,buffer没读完,也要接着读。

如果broker响应为异常,producer将如何处理

判断重试次数,将batch重新加入给放回到Accumulator里的Queue去,会直接放到头部去。

重新在内存缓冲里入队的Batch在什么时机下会判定可以重试

其实就是前面判断batch能否发送的代码,已经考虑了重试的情况了。

如果一个inFlightRequest一直没有收到响应,会怎么处理

如果说发现有节点对请求是超时响应的,过了60s还没响应,此时会关闭掉跟那个Broker的连接,认为那个Broker已经故障了 ,做很多内存数据结构的清理,再次标记为需要去重新拉取元数据

整体流程

在这里插入图片描述

以上是关于从KafkaProducer源码学习异步发送,缓冲区管理,NIO编程。的主要内容,如果未能解决你的问题,请参考以下文章

4深潜KafkaProducer —— RecordAccumulator精析

源码分析 Kafka 消息发送流程(文末附流程图)

源码分析 Kafka 消息发送流程(文末附流程图)

kafka源码分析Metadata的数据结构与读取更新策略

95-40-014-生产者-KafkaProducer

关于高并发下kafka producer send异步发送耗时问题的分析