从KafkaProducer源码学习异步发送,缓冲区管理,NIO编程。
Posted jazon@
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从KafkaProducer源码学习异步发送,缓冲区管理,NIO编程。相关的知识,希望对你有一定的参考价值。
KafkaProducer初始化参数
-
clientId没主动设置clientId时,后台都会生成一个client.id,producer-自增长的数字,producer-1。
-
partitioner决定消息路由到Topic的哪个分区里去的
-
metadata组件,生产端拉取Topic的元数据,包括Topic有哪些分区,分区的Leader位于哪个broker,有一个metadata.max.age参数默认是五分钟,强制重新刷新数据。
-
request.max.size默认是1mb,一次请求最大为1mb。buffer.memory默认是32mb,异步用的缓冲区。max.block.ms用于控制send方法阻塞多久,默认60s。
-
核心组件:RecordAccumulator,缓冲区,负责消息的复杂的缓冲机制,发送到每个分区的消息会被打包成batch,一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb),设置一个linger.ms,如果在指定时间范围内,都没凑出来一个batch把这条消息发送出去,那么到了这个linger.ms指定的时间,比如说5ms,如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去。
-
核心行为:始化的时候,直接调用Metadata组件的方法,去broker上拉取了一次集群的元数据过来,后面每隔5分钟会默认刷新一次集群元数据,但是在发送消息的时候,如果没找到某个Topic的元数据,一定也会主动去拉取一次的
-
核心组件:网络通信的组件,NetworkClient,一个网络连接最多空闲多长时间(9分钟),每个连接最多有几个request没收到响应(5个),重试连接的时间间隔(50ms),Socket发送缓冲区大小(128kb),Socket接收缓冲区大小(32kb)
-
核心组件:Sender线程,负责从缓冲区里获取消息发送到broker上去,request最大大小(1mb),acks(1,只要leader写入成功就认为成功),重试次数(0,无重试),请求超时的时间(30s),线程类叫做“KafkaThread”,线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动。
-
核心组件:序列化组件,拦截器组件
集群元数据存储
KafkaProducer在初始化的时候是不会去拉取集群的元数据的,做了一个最最基本的初始化,也就是仅仅把我们配置的那个broker的地址放了进去,在客户端缓存集群元数据的时候,采用了哪些数据结构。
- List<Node>,Kafka Broker节点,一台机器
- unautorhizedTopics,没有被授权访问的Topic的列表,就是kafka是可以支持权限控制的,如果你的客户端没有被授权访问某个Topic,那么就会放在这个列表里。
- Map<TopicParittion, PartitionInfo>,TopicPartition就代表了一个分区,里面就是他的topic的名字,以及他在topic里的分区号;PartitioinInfo,就代表了分区的详细信息,属于哪个topic,分区号,每个分区都有多个副本,Leader在哪个broker上,followers在哪些broker上,ISR列表,都在里面。
- partitionsByTopic,每个topic有哪些分区
- availablePartitionsByTopic,每个topic有哪些当前可用的分区,如果某个分区没有leader是存活的,此时那个分区就不可用了。
- partitionsByNode,每个broker上放了哪些分区。
- nodesById,broker.id -> Node
Producer.Send()
- 回调自定义的拦截器
- 同步阻塞等待获取topic元数据
如果你要往一个topic里发送消息,必须是得有这个topic的元数据的,你必须要知道这个topic有哪些分区,然后根据Partitioner组件去选择一个分区,然后知道这个分区对应的leader所在的broker,才能跟那个broker建立连接,发送消息。调用同步阻塞的方法,去等待先得获取到那个topic对应的元数据,如果此时客户端还没缓存那个topic的元数据,那么一定会发送网络请求到broker去拉取那个topic的元数据过来,但是下一次就可以直接根据缓存好的元数据来发送了
- 序列化key和value
你的key和value可以是各种各样的类型,比如说String、Double、Boolean,或者是自定义的对象,但是如果要发送消息到broker,必须对这个key和value进行序列化,把那些类型的数据转换成byte[]字节数组的形式
- 基于获取到的topic元数据,使用Partitioner组件获取消息对应的分区
- 检查要发送的这条消息是否超出了请求最大大小,以及内存缓冲最大大小
- 设置好自定义的callback回调函数以及对应的interceptor拦截器的回调函数
- 将消息添加到内存缓冲里去,RecordAccumulator组件负责的
- 如果某个分区对应的batch填满了,或者是新创建了一个batch,此时就会唤醒Sender线程,让他来进行工作,负责发送batch
Topic元数据细粒度按需加载及阻塞等待
如果元数据拉取成功,那么version会加一,所以在唤醒后只需要判断当前version是不是大于之前的version就可以判定元数据是否拉取成功。如果超时还没判定成功,则认为是元数据拉取失败。
Sender线程初始化
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}
private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.error("Uncaught exception in thread '{}':", name, e);
}
});
}
如果没指定分区key是如何对消息负载均衡分发到分区的
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
初始值是一个随机的integer类型的数字,接下来默认是递增的,一定会保证是一个正整数,就是比如说topic有5个分区,就会对这个递增的数字(23),对topic的分区数量进行取模。
Kafka的内存缓冲区
Kafka实现了一个BufferPool,缓冲池,可以利用它申请内存。
/**
有人的消息是52kb,超出了16kb,分配的那个ByteBuffer就会是52kb,如果对52kb的ByteBuffer进行处理,当deallocate的时候他会直接释放掉这块内存,不去加入到free,让gc掉,avaialbeMemory给加回去
*/
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
// 要释放的ByteBuffer会放入free缓存起来,避免重复向操作系统申请
// 这里只会将poolableSize的放入,也就是等于batch.size的
// 下次申请size等于poolableSize的ByteBuf就可以从free直接返回
this.free = new ArrayDeque<>();
this.waiters = new ArrayDeque<>();
this.totalMemory = memory;
this.nonPooledAvailableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
MetricName rateMetricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
metricGrpName,
"The total time an appender waits for space allocation.");
this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
}
batches是Kafka实现的一个并发安全的Map即CopyOrWriteMap类,在put方法上加了synchronized,并且是使用的copy进行put。batches的Key是TopicPartition,value是Deque<ProducerBatch>。当尝试将一条记录写入到batch失败时,就会申请ByteBuf以创建新的Batch,并且将创建的Batch放入Dequeue中。
一条消息是如何按照二进制协议规范写入Batch的ByteBuf中的
offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value
是严格的按照二进制协议的规范,他规范里规定了,就是先是几个字节的offset,然后是几个字节的size,然后是几个字节的crc,接着是几个字节的magic,以此类推,他就是完全按照规范来写入ByteBuffer里去的,可以看到他最最底层的写入ByteBuffer的IO流的方式。ByteBufferOutputStream包裹了ByteBuffer,持有一个针对ByteBuffer的输出流,接着会把ByteBufferOutputStream给包裹在一个压缩流里,gzip、lz4、snappy,如果是包裹在压缩流里,写入的时候会先进入压缩流的缓冲区,压缩流会把一条消息放在缓冲区里,用压缩算法给压缩了,再写入底层的ByteBufferOutputStream里去,如果是非压缩的模式,最最普通的情况下,就是DataOutputStream包裹了ByteBufferOutputSteram,然后写入数据,Long、Byte、String,都会在底层转换为字节进入到ByteBuffer里去。
判断Batch是否还有足够空间写入一条记录
public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
if (isFull())
return false;
// We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed)
if (numRecords == 0)
return true;
final int recordSize;
if (magic < RecordBatch.MAGIC_VALUE_V2) {
recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
} else {
int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp;
recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
}
// Be conservative and not take compression of the new record into consideration.
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
KafkaSender线程在做什么
1.确定哪些partition有已经写满的batch,batch创建的时间已经超过了linger.ms,此时就有可以发送出去的batch了,收集出来的PartitionLeader所在的broker。
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
2.如果有一些partition对应的元数据没拉取到,此时就必须标识一下,必须要在后面去尝试拉取元数据。
3.检查一下是否准备好可以向那些Borker发送数据了,就是说如果此时还没跟某个Broker建立好连接,必须在这里把长连接准备好,TCP连接,然后才可以把数据发送过去,直接就是基于最底层的NIO来开发的。
4.你有很多Partiton可以发送数据,有一些Partition Leader是在同一个Broker上,此时按照Broker对Partition进行分组,找到一个Broker对应的多个Partition的Batch,如果一个batch已经在内存缓冲里停留超过60s,超时不要了。
5.对每个Broker都创建一个ClientRequest,包括了多个Batch,就是在这个Broker上的多个LeaderPartition所对应的Batch,聚合起来组成一个ClientRequest,形成一个请求,将他设置到Sender变量中,等待NetWorkClient发送。
6.通过NetWorkClient走底层的网络通信,把每个Broker的ClientRequest给发送过去就可以了,poll方法,他是负责实际的 进行网络IO通信操作的一个核心的方法,负责发送数据出去,也包括读取响应回来。
Batch何时判定为可以发送出去
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
刚开始的时候,默认情况下,发送一个Batch,肯定是不涉及到重试,attempts就一定是0,一定没有进入重试的状态。waitedTimeMs:当前时间减去上一次发送这个Batch的时间,假设一个Batch从来没有发送过,此时当前时间减去这个Batch被创建出来的那个时间,这个Batch从创建开始到现在已经等待了多久了。timeToWaitMs:这个Batch从创建开始算起,最多等待多久就必须去发送,如果是在重试的阶段,这个时间就是重试间隔,但是在非重试的初始阶段,就是linger.ms的时间(100ms)。full:Batch是否已满,如果说Dequeue里超过一个Batch了,说明这个peekFirst返回的Batch就一定是已经满的,另外就是如果假设Dequeue里只有一个Batch,但是判断发现这个Batch达到了16kb的大小,也是已满。expired:当前Batch已经等待的时间大于等于前面计算的最多只能等待的时间。如果linger.ms默认是0,就意味着说,只要Batch创建出来了,在这个地方一定是expired = true。sendable:综合上述所有条件来判断,这个Batch是否需要发送出去,如果Bach已满必须得发送,如果Batch没有写满但是expired也必须得发送出去,如果说Batch没有写满而且也没有expired,但是内存已经消耗完毕也要发送,flushInProgress()就是客户端关闭了,此时也会发送。这里判断成功,是将Node即broker加入到readNodes中。
判定readyNodes里哪些node是可以发送数据过去的
(1)有一个Broker连接状态的缓存,先查一下这个缓存,当前这个Broker是否已经建立了连接了,如果是的话,才可以继续判断其他的条件。
(2)Selector,你大概可以认为底层封装的就是Java NIO的 Selector,但凡是看过我的NIO课程,跟着做NIO研发分布式文件系统,Selector上要注册很多Channel,每个Channel就代表了跟一个Broker建立的连接。
(3)inFlightRequests,有一个参数可以设置这个东西,默认是对同一个Broker同一时间最多容忍5个请求发送过去但是还没有收到响应,所以如果对一个Broker已经发送了5个请求,都没收到响应,此时就不可以继续发送了。
Producer与broker连接
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
SelectionKey key = null;
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
// 异步的,不一定真一下连上了
boolean connected = doConnect(socketChannel, address);
// 向selector表示对连接感兴趣,如果上一步此时没连上,在selector有个事件出来的
key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", id);
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
if (key != null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();
throw e;
}
}
工业级NIO底层应该设置哪些网络参数
// 非阻塞
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
// 长连接
socket.setKeepAlive(true);
// 发送缓冲区
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
// 接受缓冲区
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
// 非延迟,送出去的数据包立马就是通过网络传输过去
socket.setTcpNoDelay(true);
在不断轮询的poll方法中如何将request发送出去
如果说已经发送完毕数据了,那么就可以取消对OP_WRITE事件的关注,否则如果一个Request的数据都没发送完毕,此时还需要保持对OP_WRITE事件的关注,而且如果发送完毕了,就会放到completedSends里面去。
对于已经发送给Broker的请求会进行什么样的后续处理
expectResponse应该是通过acks计算出来的,如果说acks = 0的话,也就是不需要对一个请求接收响应,此时expectResponse应该就是false,这个时候直接就会把这个Request从inFlightRequests里面移出去,直接就可以返回一个响应了,其实就是做一个回调。
Kafka的生产端发送时如何处理拆包问题
如果说一个请求对应的ByteBuffer中的二进制字节数据一次write没有全部发送完毕,如果说一次请求没有发送完毕,此时肯定remaining是大于0,此时就不会取消对OP_WRITE事件的监听,假设此时针对某个Broker是说,此时是可以再次发送一个Request了,必须得先判断一下,这个Broker上一次发送的Request请求是否发送完毕了,那个request中的数据是否发送完了呢?即使发送完毕了,还得限制为最多只发送5个request是没有收到响应的,如果说上一次 request出现了类似拆包的问题,一次请求没有发送完毕,此时下次就不会继续往这个broker发送请求了,但是此时针对这个broker还是保持着OP_WRITE的监听,下次调用poll,会发现对这个broker可以再次执行WRITABLE事件,最终会再次对SocketChannel调用write方法,把ByteBuffer里剩余的数据继续往Broker去写,上述的过程重复多次,一定会把这个请求发送完毕的。
Kafka的生产端在读取数据时如何解决粘包问题
要解决粘包问题,就是每个响应中间必须插入一个特殊的几个字节的分隔符,一般来说用作分隔符比如很经典的就是在响应消息前面先插入4个字节(integer类型的)代表响应消息自己本身数据大小的数字
响应消息1,199个字节;响应消息2,238个字节;响应消息3,355个字节
199响应消息(1)238响应消息(2)355响应消息(3)
此时会从channel中读取4个字节的数字,写入到一个叫变量名称为size的ByteBuffer(4个字节),就是如果已经读取到了4个字节,position就会变成4,就会跟limit是一样的,此时就代表着size ByteBuffer的4个字节已经读满了,意味着size读到了,可以直接读实际数据了。ByteBuffer.rewind,把position设置为0,一个ByteBuffer写满之后,调用rewind,把position重置为0,此时就可以从ByteBuffer里读取数据了。ByteBuffer.getInt(),就会默认从ByteBuffer当前position的位置获取4个字节,转换为一个int类型的数字返回给你。接下来就会直接把channel里的一条响应消息的数据读取到一个跟他的大小一致的ByteBuffer中去,粘包问题的解决,就是完美的通过每条消息基于一个4个字节的int数字(他们自己的大小)来进行分割。拆包,假如说size是4个字节,你一次read就读取到了2个字节,连size都没有读取完毕,出现了拆包,此时怎么办呢?或者你读取到了一个size,199个字节,但是在读取响应消息的时候,就读取到了162个字节,拆包问题,响应消息没有读取完毕。对于前者szie buffer还没读满,会接着读size buffer,对于后者,buffer没读完,也要接着读。
如果broker响应为异常,producer将如何处理
判断重试次数,将batch重新加入给放回到Accumulator里的Queue去,会直接放到头部去。
重新在内存缓冲里入队的Batch在什么时机下会判定可以重试
其实就是前面判断batch能否发送的代码,已经考虑了重试的情况了。
如果一个inFlightRequest一直没有收到响应,会怎么处理
如果说发现有节点对请求是超时响应的,过了60s还没响应,此时会关闭掉跟那个Broker的连接,认为那个Broker已经故障了 ,做很多内存数据结构的清理,再次标记为需要去重新拉取元数据
整体流程
以上是关于从KafkaProducer源码学习异步发送,缓冲区管理,NIO编程。的主要内容,如果未能解决你的问题,请参考以下文章