KafkaKafka NIO
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KafkaKafka NIO相关的知识,希望对你有一定的参考价值。
1.概述
2.一 序
首先感谢王昊@二代的书,他要去阿里了,祝他早日大富大贵。回到正题,kafka关于NIO这一块的代码,我大概断断续续看了几天,觉得还是看起来吃力,主要是NIO这块底层的都忘了,所以一般都使用像netty的框架屏蔽底层的细节,让上层业务只需要监听端口,建立连接、接受请求、处理请求、写返回就可以了。kafka为啥没用现成的netty而是自己封装接口呢?网上看了段介绍:Performance and no dependency-hell is great! 。大概就是性能跟不想引入太多依赖。当然这也体现出kafka作者的牛逼之处。
Java 的NIO基本的有Selector、Channel、ByteBuffer。kafka对应的封装NIO如下。
如果对于NIO忘了,推荐先看看:https://www.jianshu.com/p/0d497fe5484a
https://segmentfault.com/a/1190000012316621
熟悉的可以无视,跳过了。
二 network相关类
Selectable是其中的网络操作的接口, Selector是具体的实现(后面细讲), 包括了发送请求、接收返回、建立连接、断开连接等操作。
/**
* An interface for asynchronous, multi-channel network I/O
* Selectable是kafka NIO的网络操作的接口.
* Selector是具体的实现, 包括了发送请求、接收返回、建立连接、断开连接等操作。
*/
public interface Selectable
...
/*
* Kafka 对 Java NIO 相关接口的封装,实现了Selectable 接口。负责具体的连接、写入、读取等操作。
*/
public class Selector implements Selectable, AutoCloseable
public static final long NO_IDLE_TIMEOUT_MS = -1;
public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
private enum CloseMode
GRACEFUL(true), // process outstanding staged receives, notify disconnect
NOTIFY_ONLY(true), // discard any outstanding receives, notify disconnect
DISCARD_NO_NOTIFY(false); // discard any outstanding receives, no disconnect notification
boolean notifyDisconnect;
CloseMode(boolean notifyDisconnect)
this.notifyDisconnect = notifyDisconnect;
private final Logger log;
//用来监听网络类型
private final java.nio.channels.Selector nioselector;
//维护了NodeId与KafkaChannel之间的映射关系
private final Map<String, KafkaChannel> channels;
private final Set<KafkaChannel> explicitlyMutedChannels;
//内存溢出标识
private boolean outOfMemory;
//记录发出去的请求
private final List<Send> completedSends;
//记录接收到的请求
private final List<NetworkReceive> completedReceives;
//暂存一次OP_READ事件处理过程中读取到的全部请求
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
//作为client,在调用SocketChannel#connect方法连接远端时返回true的连接
private final Set<SelectionKey> immediatelyConnectedKeys;
//关闭中
private final Map<String, KafkaChannel> closingChannels;
private Set<SelectionKey> keysWithBufferedRead;
//记录一次poll过程中发现的断开的连接
private final Map<String, ChannelState> disconnected;
//记录一次poll过程中发现的新建立的连接
private final List<String> connected;
//记录向哪些Node发送的请求失败了
private final List<String> failedSends;
private final Time time;
private final SelectorMetrics sensors;
//用于创建KafkaChannel的Builder
private final ChannelBuilder channelBuilder;
//一次读取的最大大小
private final int maxReceiveSize;
private final boolean recordTimePerConnection;
private final IdleExpiryManager idleExpiryManager;
private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels;
private final MemoryPool memoryPool;
private final long lowMemThreshold;
private final int failedAuthenticationDelayMs;
//indicates if the previous call to poll was able to make progress in reading already-buffered data.
//this is used to prevent tight loops when memory is not available to read any more data
private boolean madeReadProgressLastPoll = true;
Send作为要发送数据的接口, 子类实现complete()方法用于判断是否已经发送完成 ,实现writeTo()方法来实现写入到Channel中
/**
* This interface models the in-progress sending of data to a specific destination
* 发送数据接口
*/
public interface Send
/**
* The id for the destination of this send
*/
String destination();
/**
* Is this send complete?
*/
boolean completed();
/**
* Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
* to be completely written
* @param channel The Channel to write to
* @return The number of bytes written
* @throws IOException If the write fails
*/
long writeTo(GatheringByteChannel channel) throws IOException;
/**
* Size of the send
*/
long size();
send接口有多个子类RecordsSend、MultiRecordsSend、ByteBufferSend,以ByteBufferSend实现为例,
/**
* A send backed by an array of byte buffers
*/
public class ByteBufferSend implements Send
//目标集群地址
private final String destination;
//计算的buffer的字节大小
private final int size;
//要发送的内容(说明kafka一次最大传输字节是有限定的)
protected final ByteBuffer[] buffers;
private int remaining;
private boolean pending = false;
//构造方法
public ByteBufferSend(String destination, ByteBuffer... buffers)
this.destination = destination;
this.buffers = buffers;
for (ByteBuffer buffer : buffers)
remaining += buffer.remaining();
//size是bytebuffer的大小
this.size = remaining;
@Override
public String destination()
return destination;
//没有要发送的数据了
@Override
public boolean completed()
return remaining <= 0 && !pending;
@Override
public long size()
return this.size;
//将字节流写入到channel中
@Override
public long writeTo(GatheringByteChannel channel) throws IOException
//发送有channel.write实现,实现了kafka的send与channel的解耦(具体实现类如何,相互不干扰。 )
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
//每次发送 都检查是否
pending = TransportLayers.hasPendingWrites(channel);
return written;
NetworkSend类继承了ByteBufferSend,增加了4字节表示内容大小(不包含这4byte),详见注释。
/**
* A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
* 继承了ByteBufferSend,增加了4字节表示内容大小(不包含这4byte)。
*/
public class NetworkSend extends ByteBufferSend
public NetworkSend(String destination, ByteBuffer buffer)
//destination是broker地址
//buffer是需要传输的消息
super(destination, sizeDelimit(buffer));
private static ByteBuffer[] sizeDelimit(ByteBuffer buffer)
//生成了一个新的buffer,buffer内容是参数buffer大小。
return new ByteBuffer[] sizeBuffer(buffer.remaining()), buffer;
//可理解kafka字节流的为:buffer=buffer1+buffer2
//其中buffer2是我们发送到broker端消息,buffer1是一个4字节的大小的数字,其含义是消息buffer2的大小
//最终在broker读取字节流时候,先去取出头四个字节,感知接下来需要读取多少字节。然后读取。
private static ByteBuffer sizeBuffer(int size)
//
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
//将 4 个包含给定 size 值的字节按照当前的字节顺序写入到此缓冲区的当前位置,然后将该位置增加 4。
sizeBuffer.putInt(size);
sizeBuffer.rewind();
return sizeBuffer;
与Send对应的是Receive,表示从Channel中读取的数据, NetworkReceive 实现了receive接口,是ByteBuffer的封装,表示一次请求的数据包。NetworkReceive从连接读取数据的时候,先读消息的头部4字节,其中封装了消息的长度,再按照其长度创建合适大小的ByteBuffer,然后读取消息体。
/**
* This interface models the in-progress reading of data from a channel to a source identified by an integer id
* 从Channel中读取的数据
*/
public interface Receive extends Closeable
...
/**
* A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
*/
public class NetworkReceive implements Receive
前面说了kafka的selector是基于nioSelector的封装,而创建连接的一系列操作都是由Channel去完成,KafkaChannel不仅封装了SocketChannel,还封装了Kafka自己的认证器Authenticator,和读写相关的NetworkReceive、Send。这个中间多个一个间接层:TransportLayer,为了封装普通和加密的Channel(TransportLayer根据网络协议的不同,提供不同的子类)而对于KafkaChannel提供统一的接口,这是策略模式很好的应用。
public class KafkaChannel
/**
* Mute States for KafkaChannel:
*/
....
private final String id;
//封装了底层的SocketChannel及SelectionKey,TransportLayer根据网络协议的不同,
//提供不同的子类PlaintextTransportLayer,SslTransportLayer 实现,而对于KafkaChannel提供统一的接口,这是策略模式很好的应用
private final TransportLayer transportLayer;
private final Authenticator authenticator;
// Tracks accumulated network thread time. This is updated on the network thread.
// The values are read and reset after each response is sent.
private long networkThreadTimeNanos;
private final int maxReceiveSize;
private final MemoryPool memoryPool;
private NetworkReceive receive;
private Send send;
// Track connection and mute state of channels to enable outstanding requests on channels to be
// processed after the channel is disconnected.
private boolean disconnected;
private ChannelMuteState muteState;
private ChannelState state;
private SocketAddress remoteAddress;
network层主要的类就是这些。图片来自:https://blog.csdn.net/chunlongyu/article/details/52651960
对着这个图,有清晰很多,一开始看的时候,觉得类怎么这么多,封装的好复杂,调用层级太多。多看看就好了。感谢原作者。
三 发送流程
KafkaProducer.dosend();
Sender.run();
NetworkClient.poll()(NetworkClient.dosend());
Selector.poll();
如果从发送消息的角度来看,应该涉及的主要流程如上所示,前面介绍了RecordAccumulator(3),sender1,按理应该从networkclient开始,限于篇幅,重点看selector。
3.1 selector.connect()
NetworkClient的请求一般都是交给Selector去完成的。Selector使用NIO异步非阻塞模式负责具体的连接、写入、读取等操作。
先看下连接过程。客户端在和节点连接的时候,会创建和服务端的SocketChannel连接通道。Selector维护了每个目标节点对应的KafkaChannel
//selector.java
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException
ensureNotRegistered(id);
// 创建一个SocketChannel
SocketChannel socketChannel = SocketChannel.open();
try //设置参数
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
boolean connected = doConnect(socketChannel, address);
// 对CONNECT事件进行注册
SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected)
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node ", id);
// 加入到连接上的集合中
immediatelyConnectedKeys.add(key);
key.interestOps(0); // 取消对该连接的CONNECT事件的监听
catch (IOException | RuntimeException e)
socketChannel.close();
throw e;
private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
throws IOException
//配置成非阻塞模式
socketChannel.configureBlocking(false);
// 创建socket并设置相关属性(这里是客户端)
Socket socket = socketChannel.socket();
//设置成长连接
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);//设置SO_SNDBUF大小
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);//设置SO_RCVBUF大小
socket.setTcpNoDelay(true);
// Visible to allow test cases to override. In particular, we use this to implement a blocking connect
// in order to simulate "immediately connected" sockets.
protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException
try
// 调用SocketChannel的connect方法,该方法会向远端发起tcp建连请求
// 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。
//一般来说server和client在一台机器上,该方法可能返回true。在后面会通过KSelector.finishConnect() 方法确认连接是否真正建立了。
return channel.connect(address);
catch (UnresolvedAddressException e)
throw new IOException("Can't resolve address: " + address, e);
private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException
//将这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件。
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
// 构造一个KafkaChannel并把它注册到key上
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
//将NodeId和KafkaChannel绑定,放到channels中管理。
this.channels.put(id, channel);
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), time.nanoseconds());
return key;
private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException
try
//创建 KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
key.attach(channel);//将KafkaChannel注册到key上
return channel;
catch (Exception e)
try
socketChannel.close();
finally
key.cancel();
throw new IOException("Channel could not be created for socket " + socketChannel, e);
注意下: 因为是非阻塞方式,所以socketChannel.connect()方法是发起一个连接,connect()方法在正式建立连接前就可能返回,为了确定连接是否建立,需要再调用finishConnect()确认完全连接上了。
selector.java
//finishConnect会作为key.isConnectable的处理方法,
public boolean finishConnect() throws IOException
//we need to grab remoteAddr before finishConnect() is called otherwise
//it becomes inaccessible if the connection was refused.
SocketChannel socketChannel = transportLayer.socketChannel();
if (socketChannel != null)
remoteAddress = socketChannel.getRemoteAddress();
//是否建立完成
boolean connected = transportLayer.finishConnect();
if (connected) //建立完成,更改state
if (ready())
state = ChannelState.READY;
else if (remoteAddress != null)
state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
else
state = ChannelState.AUTHENTICATE;
return connected;
3.2 Selector.send()
Selector.send()方法是将之前创建的RequestSend对象缓存到KafkaChannel的send字段中,并开始关注此连接的OP_WRITE事件,并没有发生网络I/O。在下次调用KSelector.poll()时,才会将RequestSend对象发送出去。如果此KafkaChannel的send字段上还保存着一个未完全发送成功的RequestSend请求,为了防止覆盖,会抛出异常。每个KafkaChannel一次poll过程中只能发送一个Send请求。
selector.java
/**
* Queue the given request for sending in the subsequent @link #poll(long) calls
* @param send The request to send
*/
public void send(Send send)
String connectionId = send.destination();
//找到数据包相对应的channel
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
//如果所在的连接正在关闭中,则加入到失败集合failedSends中
if (closingChannels.containsKey(connectionId))
// ensure notification via `disconnected`, leave channel in the state in which closing was triggered
this.failedSends.add(connectionId);//失败
else
try //暂存在这个channel里面,没有真正的发送,一次只能发送一个
channel.setSend(send);
catch (Exception e)
// update the state for consistency, the channel will be discarded after `close`
channel.state(ChannelState.FAILED_SEND);
// ensure notification via `disconnected` when `failedSends` are processed in the next poll
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException))
log.error("Unexpected exception during send, closing connection and rethrowing exception ",
connectionId, e);
throw e;
客户端的请求Send设置到KafkaChannel中,KafkaChannel的TransportLayer会为SelectionKey注册WRITE事件。Channel的SelectionKey有了Connect和Write事件,在Selector的轮询过程中当发现这些事件到来,就开始执行真正的操作。基本流程就是:开始发送一个Send请求->注册OP_WRITE-> 发送请求… ->Send请求发送完成->取消OP_WRITE
KafkaChannel以上是关于KafkaKafka NIO的主要内容,如果未能解决你的问题,请参考以下文章
KafkaKafka官网翻译 kafka 实现 网络层 消息协议