KafkaKafka NIO

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KafkaKafka NIO相关的知识,希望对你有一定的参考价值。

1.概述

转载:《kafka producer学习笔记8》-NIO

2.一 序

首先感谢王昊@二代的书,他要去阿里了,祝他早日大富大贵。回到正题,kafka关于NIO这一块的代码,我大概断断续续看了几天,觉得还是看起来吃力,主要是NIO这块底层的都忘了,所以一般都使用像netty的框架屏蔽底层的细节,让上层业务只需要监听端口,建立连接、接受请求、处理请求、写返回就可以了。kafka为啥没用现成的netty而是自己封装接口呢?网上看了段介绍:Performance and no dependency-hell is great! 。大概就是性能跟不想引入太多依赖。当然这也体现出kafka作者的牛逼之处。

Java 的NIO基本的有Selector、Channel、ByteBuffer。kafka对应的封装NIO如下。


如果对于NIO忘了,推荐先看看:https://www.jianshu.com/p/0d497fe5484a

https://segmentfault.com/a/1190000012316621

熟悉的可以无视,跳过了。

二 network相关类

Selectable是其中的网络操作的接口, Selector是具体的实现(后面细讲), 包括了发送请求、接收返回、建立连接、断开连接等操作。

/**
 * An interface for asynchronous, multi-channel network I/O
 * Selectable是kafka NIO的网络操作的接口.
 * Selector是具体的实现, 包括了发送请求、接收返回、建立连接、断开连接等操作。
 */
public interface Selectable 
...

/* 
* Kafka 对 Java NIO 相关接口的封装,实现了Selectable 接口。负责具体的连接、写入、读取等操作。
 */
public class Selector implements Selectable, AutoCloseable 
 
    public static final long NO_IDLE_TIMEOUT_MS = -1;
    public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
 
    private enum CloseMode 
        GRACEFUL(true),            // process outstanding staged receives, notify disconnect
        NOTIFY_ONLY(true),         // discard any outstanding receives, notify disconnect
        DISCARD_NO_NOTIFY(false);  // discard any outstanding receives, no disconnect notification
 
        boolean notifyDisconnect;
 
        CloseMode(boolean notifyDisconnect) 
            this.notifyDisconnect = notifyDisconnect;
        
    
 
    private final Logger log;
    //用来监听网络类型
    private final java.nio.channels.Selector nioselector;
    //维护了NodeId与KafkaChannel之间的映射关系
    private final Map<String, KafkaChannel> channels;
    private final Set<KafkaChannel> explicitlyMutedChannels;
    //内存溢出标识
    private boolean outOfMemory;
    //记录发出去的请求
    private final List<Send> completedSends;
    //记录接收到的请求
    private final List<NetworkReceive> completedReceives;
    //暂存一次OP_READ事件处理过程中读取到的全部请求
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
    //作为client,在调用SocketChannel#connect方法连接远端时返回true的连接
    private final Set<SelectionKey> immediatelyConnectedKeys;
    //关闭中
    private final Map<String, KafkaChannel> closingChannels;
    
    private Set<SelectionKey> keysWithBufferedRead;
    //记录一次poll过程中发现的断开的连接
    private final Map<String, ChannelState> disconnected;
    //记录一次poll过程中发现的新建立的连接
    private final List<String> connected;
    //记录向哪些Node发送的请求失败了
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    //用于创建KafkaChannel的Builder
    private final ChannelBuilder channelBuilder;
    //一次读取的最大大小
    private final int maxReceiveSize;
    private final boolean recordTimePerConnection;
    private final IdleExpiryManager idleExpiryManager;
    private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels;
    private final MemoryPool memoryPool;
    private final long lowMemThreshold;
    private final int failedAuthenticationDelayMs;
 
    //indicates if the previous call to poll was able to make progress in reading already-buffered data.
    //this is used to prevent tight loops when memory is not available to read any more data
    private boolean madeReadProgressLastPoll = true;

Send作为要发送数据的接口, 子类实现complete()方法用于判断是否已经发送完成 ,实现writeTo()方法来实现写入到Channel中

/**
 * This interface models the in-progress sending of data to a specific destination
 * 发送数据接口
 */
public interface Send 
 
    /**
     * The id for the destination of this send
     */
    String destination();
 
    /**
     * Is this send complete?
     */
    boolean completed();
 
    /**
     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
     * to be completely written
     * @param channel The Channel to write to
     * @return The number of bytes written
     * @throws IOException If the write fails
     */
    long writeTo(GatheringByteChannel channel) throws IOException;
 
    /**
     * Size of the send
     */
    long size();
 

send接口有多个子类RecordsSend、MultiRecordsSend、ByteBufferSend,以ByteBufferSend实现为例,

/**
 * A send backed by an array of byte buffers
 */
public class ByteBufferSend implements Send 
 
	//目标集群地址
    private final String destination;
    //计算的buffer的字节大小
    private final int size;
    //要发送的内容(说明kafka一次最大传输字节是有限定的)
    protected final ByteBuffer[] buffers;
    private int remaining;
    private boolean pending = false;
    
    //构造方法
    public ByteBufferSend(String destination, ByteBuffer... buffers) 
        this.destination = destination;
        this.buffers = buffers;
        for (ByteBuffer buffer : buffers)
            remaining += buffer.remaining();
        //size是bytebuffer的大小
        this.size = remaining;
    
 
    @Override
    public String destination() 
        return destination;
    
 
    //没有要发送的数据了
    @Override
    public boolean completed() 
        return remaining <= 0 && !pending;
    
 
    @Override
    public long size() 
        return this.size;
    
 
    //将字节流写入到channel中
    @Override
    public long writeTo(GatheringByteChannel channel) throws IOException 
    	//发送有channel.write实现,实现了kafka的send与channel的解耦(具体实现类如何,相互不干扰。 )
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        remaining -= written;
        //每次发送 都检查是否
        pending = TransportLayers.hasPendingWrites(channel);
        return written;
    

NetworkSend类继承了ByteBufferSend,增加了4字节表示内容大小(不包含这4byte),详见注释。

/**
 * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
 * 继承了ByteBufferSend,增加了4字节表示内容大小(不包含这4byte)。
 */
public class NetworkSend extends ByteBufferSend 
 
    public NetworkSend(String destination, ByteBuffer buffer) 
    	//destination是broker地址
    	//buffer是需要传输的消息
        super(destination, sizeDelimit(buffer));
    
 
    private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) 
    	//生成了一个新的buffer,buffer内容是参数buffer大小。
        return new ByteBuffer[] sizeBuffer(buffer.remaining()), buffer;
    
 
    //可理解kafka字节流的为:buffer=buffer1+buffer2 
    //其中buffer2是我们发送到broker端消息,buffer1是一个4字节的大小的数字,其含义是消息buffer2的大小
    //最终在broker读取字节流时候,先去取出头四个字节,感知接下来需要读取多少字节。然后读取。
    private static ByteBuffer sizeBuffer(int size) 
    	//
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        //将 4 个包含给定 size 值的字节按照当前的字节顺序写入到此缓冲区的当前位置,然后将该位置增加 4。 
        sizeBuffer.putInt(size);
        sizeBuffer.rewind();
        return sizeBuffer;
    
 

与Send对应的是Receive,表示从Channel中读取的数据, NetworkReceive 实现了receive接口,是ByteBuffer的封装,表示一次请求的数据包。NetworkReceive从连接读取数据的时候,先读消息的头部4字节,其中封装了消息的长度,再按照其长度创建合适大小的ByteBuffer,然后读取消息体。

/**
 * This interface models the in-progress reading of data from a channel to a source identified by an integer id
 * 从Channel中读取的数据
 */
public interface Receive extends Closeable 
...

 
/**
 * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
 */
public class NetworkReceive implements Receive 

前面说了kafka的selector是基于nioSelector的封装,而创建连接的一系列操作都是由Channel去完成,KafkaChannel不仅封装了SocketChannel,还封装了Kafka自己的认证器Authenticator,和读写相关的NetworkReceive、Send。这个中间多个一个间接层:TransportLayer,为了封装普通和加密的Channel(TransportLayer根据网络协议的不同,提供不同的子类)而对于KafkaChannel提供统一的接口,这是策略模式很好的应用。

public class KafkaChannel 
    /**
     * Mute States for KafkaChannel:
     */
   ....
 
 
    private final String id;
    //封装了底层的SocketChannel及SelectionKey,TransportLayer根据网络协议的不同,
    //提供不同的子类PlaintextTransportLayer,SslTransportLayer 实现,而对于KafkaChannel提供统一的接口,这是策略模式很好的应用
    private final TransportLayer transportLayer;
    private final Authenticator authenticator;
    // Tracks accumulated network thread time. This is updated on the network thread.
    // The values are read and reset after each response is sent.
    private long networkThreadTimeNanos;
    private final int maxReceiveSize;
    private final MemoryPool memoryPool;
    private NetworkReceive receive;
    private Send send;
    // Track connection and mute state of channels to enable outstanding requests on channels to be
    // processed after the channel is disconnected.
    private boolean disconnected;
    private ChannelMuteState muteState;
    private ChannelState state;
    private SocketAddress remoteAddress;

network层主要的类就是这些。图片来自:https://blog.csdn.net/chunlongyu/article/details/52651960

对着这个图,有清晰很多,一开始看的时候,觉得类怎么这么多,封装的好复杂,调用层级太多。多看看就好了。感谢原作者。

三 发送流程

KafkaProducer.dosend()Sender.run()NetworkClient.poll()NetworkClient.dosend());
Selector.poll()

如果从发送消息的角度来看,应该涉及的主要流程如上所示,前面介绍了RecordAccumulator(3),sender1,按理应该从networkclient开始,限于篇幅,重点看selector。

3.1 selector.connect()

NetworkClient的请求一般都是交给Selector去完成的。Selector使用NIO异步非阻塞模式负责具体的连接、写入、读取等操作。

先看下连接过程。客户端在和节点连接的时候,会创建和服务端的SocketChannel连接通道。Selector维护了每个目标节点对应的KafkaChannel

//selector.java 
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException 
        ensureNotRegistered(id);
        // 创建一个SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        try //设置参数
            configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
            boolean connected = doConnect(socketChannel, address);
            // 对CONNECT事件进行注册
            SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
            if (connected) 
                // OP_CONNECT won't trigger for immediately connected channels
                log.debug("Immediately connected to node ", id);
                // 加入到连接上的集合中
                immediatelyConnectedKeys.add(key);
                key.interestOps(0); // 取消对该连接的CONNECT事件的监听
            
         catch (IOException | RuntimeException e) 
            socketChannel.close();
            throw e;
        
    
 
    private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
            throws IOException 
    	//配置成非阻塞模式
        socketChannel.configureBlocking(false);
        // 创建socket并设置相关属性(这里是客户端)
        Socket socket = socketChannel.socket();
        //设置成长连接
        socket.setKeepAlive(true);
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);//设置SO_SNDBUF大小
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);//设置SO_RCVBUF大小
        socket.setTcpNoDelay(true);
    
    // Visible to allow test cases to override. In particular, we use this to implement a blocking connect
    // in order to simulate "immediately connected" sockets.
    protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException 
        try 
            // 调用SocketChannel的connect方法,该方法会向远端发起tcp建连请求
            // 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。
        	//一般来说server和client在一台机器上,该方法可能返回true。在后面会通过KSelector.finishConnect() 方法确认连接是否真正建立了。
            return channel.connect(address);
         catch (UnresolvedAddressException e) 
            throw new IOException("Can't resolve address: " + address, e);
        
    
   private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException 
    	 //将这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件。
        SelectionKey key = socketChannel.register(nioSelector, interestedOps);
        // 构造一个KafkaChannel并把它注册到key上
        KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
      //将NodeId和KafkaChannel绑定,放到channels中管理。
        this.channels.put(id, channel);
        if (idleExpiryManager != null)
            idleExpiryManager.update(channel.id(), time.nanoseconds());
        return key;
    
 
    private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException 
        try 
        	//创建 KafkaChannel
            KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
            key.attach(channel);//将KafkaChannel注册到key上
            return channel;
         catch (Exception e) 
            try 
                socketChannel.close();
             finally 
                key.cancel();
            
            throw new IOException("Channel could not be created for socket " + socketChannel, e);
        
    

注意下: 因为是非阻塞方式,所以socketChannel.connect()方法是发起一个连接,connect()方法在正式建立连接前就可能返回,为了确定连接是否建立,需要再调用finishConnect()确认完全连接上了。

selector.java    
//finishConnect会作为key.isConnectable的处理方法,
    public boolean finishConnect() throws IOException 
        //we need to grab remoteAddr before finishConnect() is called otherwise
        //it becomes inaccessible if the connection was refused.
        SocketChannel socketChannel = transportLayer.socketChannel();
        if (socketChannel != null) 
            remoteAddress = socketChannel.getRemoteAddress();
        
        //是否建立完成
        boolean connected = transportLayer.finishConnect();
        if (connected) //建立完成,更改state
            if (ready()) 
                state = ChannelState.READY;
             else if (remoteAddress != null) 
                state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
             else 
                state = ChannelState.AUTHENTICATE;
            
        
        return connected;
    

3.2 Selector.send()

Selector.send()方法是将之前创建的RequestSend对象缓存到KafkaChannel的send字段中,并开始关注此连接的OP_WRITE事件,并没有发生网络I/O。在下次调用KSelector.poll()时,才会将RequestSend对象发送出去。如果此KafkaChannel的send字段上还保存着一个未完全发送成功的RequestSend请求,为了防止覆盖,会抛出异常。每个KafkaChannel一次poll过程中只能发送一个Send请求。

selector.java
  /**
     * Queue the given request for sending in the subsequent @link #poll(long) calls
     * @param send The request to send
     */
    public void send(Send send) 
        String connectionId = send.destination();
        //找到数据包相对应的channel
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        //如果所在的连接正在关闭中,则加入到失败集合failedSends中
        if (closingChannels.containsKey(connectionId)) 
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            this.failedSends.add(connectionId);//失败
         else 
            try //暂存在这个channel里面,没有真正的发送,一次只能发送一个
                channel.setSend(send);
             catch (Exception e) 
                // update the state for consistency, the channel will be discarded after `close`
                channel.state(ChannelState.FAILED_SEND);
                // ensure notification via `disconnected` when `failedSends` are processed in the next poll
                this.failedSends.add(connectionId);
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if (!(e instanceof CancelledKeyException)) 
                    log.error("Unexpected exception during send, closing connection  and rethrowing exception ",
                            connectionId, e);
                    throw e;
                
            
        
    

客户端的请求Send设置到KafkaChannel中,KafkaChannel的TransportLayer会为SelectionKey注册WRITE事件。Channel的SelectionKey有了Connect和Write事件,在Selector的轮询过程中当发现这些事件到来,就开始执行真正的操作。基本流程就是:开始发送一个Send请求->注册OP_WRITE-> 发送请求… ->Send请求发送完成->取消OP_WRITE

KafkaChannel以上是关于KafkaKafka NIO的主要内容,如果未能解决你的问题,请参考以下文章

KafkaKafka官网翻译 kafka 实现 网络层 消息协议

kafkaKafka coordinator coordinator 原理剖析

Netty相关面试题

问题精选-Netty+Tomcat

conceive of NIO

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题