转载:《kafka producer学习笔记8》-NIO

2.一 序

首先感谢王昊@二代的书,他要去阿里了,祝他早日大富大贵。回到正题,kafka关于NIO这一块的代码,我大概断断续续看了几天,觉得还是看起来吃力,主要是NIO这块底层的都忘了,所以一般都使用像netty的框架屏蔽底层的细节,让上层业务只需要监听端口,建立连接、接受请求、处理请求、写返回就可以了。kafka为啥没用现成的netty而是自己封装接口呢?网上看了段介绍:Performance and no dependency-hell is great! 。大概就是性能跟不想引入太多依赖。当然这也体现出kafka作者的牛逼之处。

Java 的NIO基本的有Selector、Channel、ByteBuffer。kafka对应的封装NIO如下。




二 network相关类

Selectable是其中的网络操作的接口, Selector是具体的实现(后面细讲), 包括了发送请求、接收返回、建立连接、断开连接等操作。

 * An interface for asynchronous, multi-channel network I/O
 * Selectable是kafka NIO的网络操作的接口.
 * Selector是具体的实现, 包括了发送请求、接收返回、建立连接、断开连接等操作。
public interface Selectable 

* Kafka 对 Java NIO 相关接口的封装,实现了Selectable 接口。负责具体的连接、写入、读取等操作。
public class Selector implements Selectable, AutoCloseable 
    public static final long NO_IDLE_TIMEOUT_MS = -1;
    public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
    private enum CloseMode 
        GRACEFUL(true),            // process outstanding staged receives, notify disconnect
        NOTIFY_ONLY(true),         // discard any outstanding receives, notify disconnect
        DISCARD_NO_NOTIFY(false);  // discard any outstanding receives, no disconnect notification
        boolean notifyDisconnect;
        CloseMode(boolean notifyDisconnect) 
            this.notifyDisconnect = notifyDisconnect;
    private final Logger log;
    private final java.nio.channels.Selector nioselector;
    private final Map<String, KafkaChannel> channels;
    private final Set<KafkaChannel> explicitlyMutedChannels;
    private boolean outOfMemory;
    private final List<Send> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
    private final Set<SelectionKey> immediatelyConnectedKeys;
    private final Map<String, KafkaChannel> closingChannels;
    private Set<SelectionKey> keysWithBufferedRead;
    private final Map<String, ChannelState> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final ChannelBuilder channelBuilder;
    private final int maxReceiveSize;
    private final boolean recordTimePerConnection;
    private final IdleExpiryManager idleExpiryManager;
    private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels;
    private final MemoryPool memoryPool;
    private final long lowMemThreshold;
    private final int failedAuthenticationDelayMs;
    //indicates if the previous call to poll was able to make progress in reading already-buffered data.
    //this is used to prevent tight loops when memory is not available to read any more data
    private boolean madeReadProgressLastPoll = true;

Send作为要发送数据的接口, 子类实现complete()方法用于判断是否已经发送完成 ,实现writeTo()方法来实现写入到Channel中

 * This interface models the in-progress sending of data to a specific destination
 * 发送数据接口
public interface Send 
     * The id for the destination of this send
    String destination();
     * Is this send complete?
    boolean completed();
     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
     * to be completely written
     * @param channel The Channel to write to
     * @return The number of bytes written
     * @throws IOException If the write fails
    long writeTo(GatheringByteChannel channel) throws IOException;
     * Size of the send
    long size();


 * A send backed by an array of byte buffers
public class ByteBufferSend implements Send 
    private final String destination;
    private final int size;
    protected final ByteBuffer[] buffers;
    private int remaining;
    private boolean pending = false;
    public ByteBufferSend(String destination, ByteBuffer... buffers) 
        this.destination = destination;
        this.buffers = buffers;
        for (ByteBuffer buffer : buffers)
            remaining += buffer.remaining();
        this.size = remaining;
    public String destination() 
        return destination;
    public boolean completed() 
        return remaining <= 0 && !pending;
    public long size() 
        return this.size;
    public long writeTo(GatheringByteChannel channel) throws IOException 
    	//发送有channel.write实现,实现了kafka的send与channel的解耦(具体实现类如何,相互不干扰。 )
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        remaining -= written;
        //每次发送 都检查是否
        pending = TransportLayers.hasPendingWrites(channel);
        return written;


 * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
 * 继承了ByteBufferSend,增加了4字节表示内容大小(不包含这4byte)。
public class NetworkSend extends ByteBufferSend 
    public NetworkSend(String destination, ByteBuffer buffer) 
        super(destination, sizeDelimit(buffer));
    private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) 
        return new ByteBuffer[] sizeBuffer(buffer.remaining()), buffer;
    private static ByteBuffer sizeBuffer(int size) 
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        //将 4 个包含给定 size 值的字节按照当前的字节顺序写入到此缓冲区的当前位置,然后将该位置增加 4。 
        return sizeBuffer;

与Send对应的是Receive,表示从Channel中读取的数据, NetworkReceive 实现了receive接口,是ByteBuffer的封装,表示一次请求的数据包。NetworkReceive从连接读取数据的时候,先读消息的头部4字节,其中封装了消息的长度,再按照其长度创建合适大小的ByteBuffer,然后读取消息体。

 * This interface models the in-progress reading of data from a channel to a source identified by an integer id
 * 从Channel中读取的数据
public interface Receive extends Closeable 

 * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
public class NetworkReceive implements Receive 


public class KafkaChannel 
     * Mute States for KafkaChannel:
    private final String id;
    //提供不同的子类PlaintextTransportLayer,SslTransportLayer 实现,而对于KafkaChannel提供统一的接口,这是策略模式很好的应用
    private final TransportLayer transportLayer;
    private final Authenticator authenticator;
    // Tracks accumulated network thread time. This is updated on the network thread.
    // The values are read and reset after each response is sent.
    private long networkThreadTimeNanos;
    private final int maxReceiveSize;
    private final MemoryPool memoryPool;
    private NetworkReceive receive;
    private Send send;
    // Track connection and mute state of channels to enable outstanding requests on channels to be
    // processed after the channel is disconnected.
    private boolean disconnected;
    private ChannelMuteState muteState;
    private ChannelState state;
    private SocketAddress remoteAddress;



三 发送流程



3.1 selector.connect()



public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException 
        // 创建一个SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        try //设置参数
            configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
            boolean connected = doConnect(socketChannel, address);
            // 对CONNECT事件进行注册
            SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
            if (connected) 
                // OP_CONNECT won't trigger for immediately connected channels
                log.debug("Immediately connected to node ", id);
                // 加入到连接上的集合中
                key.interestOps(0); // 取消对该连接的CONNECT事件的监听
         catch (IOException | RuntimeException e) 
            throw e;
    private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
            throws IOException 
        // 创建socket并设置相关属性(这里是客户端)
        Socket socket = socketChannel.socket();
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    // Visible to allow test cases to override. In particular, we use this to implement a blocking connect
    // in order to simulate "immediately connected" sockets.
    protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException 
            // 调用SocketChannel的connect方法,该方法会向远端发起tcp建连请求
            // 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。
        	//一般来说server和client在一台机器上,该方法可能返回true。在后面会通过KSelector.finishConnect() 方法确认连接是否真正建立了。
            return channel.connect(address);
         catch (UnresolvedAddressException e) 
            throw new IOException("Can't resolve address: " + address, e);
   private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException 
        SelectionKey key = socketChannel.register(nioSelector, interestedOps);
        // 构造一个KafkaChannel并把它注册到key上
        KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
        this.channels.put(id, channel);
        if (idleExpiryManager != null)
            idleExpiryManager.update(channel.id(), time.nanoseconds());
        return key;
    private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException 
        	//创建 KafkaChannel
            KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
            return channel;
         catch (Exception e) 
            throw new IOException("Channel could not be created for socket " + socketChannel, e);

注意下: 因为是非阻塞方式,所以socketChannel.connect()方法是发起一个连接,connect()方法在正式建立连接前就可能返回,为了确定连接是否建立,需要再调用finishConnect()确认完全连接上了。

    public boolean finishConnect() throws IOException 
        //we need to grab remoteAddr before finishConnect() is called otherwise
        //it becomes inaccessible if the connection was refused.
        SocketChannel socketChannel = transportLayer.socketChannel();
        if (socketChannel != null) 
            remoteAddress = socketChannel.getRemoteAddress();
        boolean connected = transportLayer.finishConnect();
        if (connected) //建立完成,更改state
            if (ready()) 
                state = ChannelState.READY;
             else if (remoteAddress != null) 
                state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
                state = ChannelState.AUTHENTICATE;
        return connected;

3.2 Selector.send()


     * Queue the given request for sending in the subsequent @link #poll(long) calls
     * @param send The request to send
    public void send(Send send) 
        String connectionId = send.destination();
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        if (closingChannels.containsKey(connectionId)) 
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            try //暂存在这个channel里面,没有真正的发送,一次只能发送一个
             catch (Exception e) 
                // update the state for consistency, the channel will be discarded after `close`
                // ensure notification via `disconnected` when `failedSends` are processed in the next poll
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if (!(e instanceof CancelledKeyException)) 
                    log.error("Unexpected exception during send, closing connection  and rethrowing exception ",
                            connectionId, e);
                    throw e;

客户端的请求Send设置到KafkaChannel中,KafkaChannel的TransportLayer会为SelectionKey注册WRITE事件。Channel的SelectionKey有了Connect和Write事件,在Selector的轮询过程中当发现这些事件到来,就开始执行真正的操作。基本流程就是:开始发送一个Send请求->注册OP_WRITE-> 发送请求… ->Send请求发送完成->取消OP_WRITE

