Kafka Source - 理解 Selector.poll() 的语义

Posted

技术标签:

【中文标题】Kafka Source - 理解 Selector.poll() 的语义【英文标题】:Kafka Source - Understanding the semantics of Selector.poll() 【发布时间】:2017-09-12 21:15:51 【问题描述】:

我正在研究 Kafka 的网络层代码,有几个关于 Selector 的问题 类,特别是 poll() 方法是如何实现的。 poll() 方法是这样的:

void poll(int timeout)
....
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) 
        pollSelectionKeys(this.nioselector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    

...

是否有特定要求,因此我们首先调用pollSelectionKeys() 方法 对于select() 方法返回的键,然后在立即连接的键上?是 只是为了清楚起见,我们分别执行这些操作,或者是否有一些特定的 涉及的要求?

其次,在pollSelectionKeys() 方法中,我们有:

void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                       boolean isImmediatelyConnected,
                                       long currentTimeNanos)
...
    /* if channel is ready write to any sockets that have space in their buffer and for which
    we have data */
    if (channel.ready() && key.isWritable()) 
        Send send = channel.write();
        if (send != null) 
            this.completedSends.add(send);
            this.sensors.recordBytesSent(channel.id(), send.size());
        
    
...

据我了解,我们只会在 KafkaChannel 属于任何一个时写信给它 我们从之前调用select() 方法获得的keySet,或者如果KafkaChannelimmediatelyConnectedKeys 之一相关联。我的问题是,我们为什么要这样做 以这种方式写信给KafkaChannels 的业务?更具体地说,我们不只是迭代 遍历所有已连接的KafkaChannels,如果他们有Send,请写信给他们 与他们关联的对象?这样,我们尽快写信给KafkaChannel, 无需等待它属于immediatelyConnectedKeysreadyKeys

【问题讨论】:

【参考方案1】:

答案就在Selector类的connect方法中(相关部分如下)

 connected = socketChannel.connect(address);
..............................
................................

 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);

根据 NIO SocketChannelconnect 的文档解释

如果此通道处于非阻塞模式,则调用此 方法启动非阻塞连接操作。如果 立即建立连接,这可能发生在本地 连接,则此方法返回 true。否则这个方法 返回 false 并且连接操作必须稍后由 调用 finishConnect 方法。

所以一个典型的交互工作流程如下(解释的很好here)

如果您在非阻塞模式下连接,您应该:

为 OP_CONNECT 注册通道 当它触发调用 finishConnect() 时 如果返回 true,请取消注册 OP_CONNECT 并注册 OP_READ 或 OP_WRITE,具体取决于您接下来要执行的操作 如果返回 false,什么也不做,继续选择 如果 connect() 或 finishConnect() 抛出异常,请关闭通道并重试或忘记它或告诉用户或 任何合适的。

如果您在频道连接之前不想做任何事情,请执行 以阻塞模式连接并进入非阻塞模式时 连接成功。

此连接方法可能立即连接,就像本地连接一样,并且可能不会触发为此连接 socketChannel 注册的 OP_CONNECT 事件(连接调用后的几行),所以当使用典型的 java NIO 时注册码,我们可能会错过它。我们最终需要在此类通道上调用 finishConnect(请参阅工作流程中的第二个要点)。所以我们将这样的频道密钥添加到另一个 Set immediatelyConnectedKeys 以便它们可以被处理得太晚,否则我们会完全错过它们。

 if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) 
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        

稍后在pollSelectionKeys 方法中(注意finishConnect 的使用,这是对底层finishConnect 的调用SocketChannel

 /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) 
                    if (channel.finishConnect()) 
.........................
.........................

总而言之,Kafka 代码看起来像标准的 NIO 代码,除非 Kafka 团队可以解释更多。关于这个主题的更多好读物可以找到here。与此相关的一个有趣的误解(bug 提交并最终被 JDK 团队拒绝)可以找到here

对于问题的第二部分,您可能会询问以下代码。为什么要两次单独调用键

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) 
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        

看到我们现在维护了两组键。虽然selector.keys() 提供了一个整体键视图,但键集不能直接修改,所以它是一种只读视图。此密钥集中的密钥只有在它被取消并且它的通道已被注销后才会被删除。所以通常selector.selectedKeys() 用于访问就绪通道。同样selector.selectedKeys() 显然不会从immediatelyConnectedKeys 返回密钥。处理从 selector.selectedKeys() 获得的这些键的通常模式是迭代 set ,测试由 钥匙准备好了,做你的东西,然后从集合中删除它。这个删除部分是非常必要的。Selector 不会从选定的键集本身中删除 SelectionKey 实例。当您处理完频道后,您必须这样做。 下次通道变为“就绪”时,选择器将再次将其添加到选定的键集中。所以这就是两者都被处理的原因,pollSelectionKeys 方法旨在同时处理这两者。

【讨论】:

这更有意义。您对上述第二个问题有什么回应吗? 我补充了我对第二个问题的看法。【参考方案2】:

在 I/O 连接完成之前,TCP 连接不可用

【讨论】:

以上是关于Kafka Source - 理解 Selector.poll() 的语义的主要内容,如果未能解决你的问题,请参考以下文章

Kafka详细配置

10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis

10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis

kafka source type

Failed to find data source: kafka

004- Flume Source之Kafka