启用 SSL 后 Kafka Connect 超出 Java 堆空间

Posted

技术标签:

【中文标题】启用 SSL 后 Kafka Connect 超出 Java 堆空间【英文标题】:Kafka Connect Out of Java heap space after enabling SSL 【发布时间】:2019-03-14 08:20:42 【问题描述】:

我最近启用了 SSL 并尝试以分布式模式启动 Kafka 连接。 运行时

connect-distributed connect-distributed.properties

我收到以下错误:

[2018-10-09 16:50:57,190] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:106)
[2018-10-09 16:50:55,471] ERROR WorkerSinkTaskid=sink-mariadb-test Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:694)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我也尝试通过运行增加max and initial heap size by setting the KAFKA_HEAP_OPTS environment variable

KAFKA_HEAP_OPTS="-Xms4g -Xmx6g" connect-distributed connect-distributed.properties

但还是不行。

我的问题是:

    SSL 身份验证是否会影响内存使用? 如何解决此问题?

编辑: 我试图禁用 SSL,一切正常,没有任何问题。

【问题讨论】:

我正在运行 Connect with 10g,但没有主动监控堆,不知道该告诉您将其设置为什么 @cricket_007 我在我的开发环境中运行 Confluent CLI,总共有大约 12GB 的内存,所以将 Xmx 设置为 10g 对我来说不是一个选项。 如果使用 Confluent CLI,那么您将直接运行 confluent startconfluent load 而不是 connect-distributed,不是吗?在这种情况下,我认为KAFKA_HEAP_OPTS 不会以相同的方式应用于这些脚本,除非您实际上将导出添加到您的bashrc,例如 @cricket_007 确实如此。虽然我正在运行 Confluent CLI,但我正在分布式模式下运行 Kafka Connect。我也将KAFKA_HEAP_OPTS 导出到我的bashrc 文件,但也不起作用。 正如我所提到的,如果您可以启用 JMX 监控并在发生此类错误时获取堆转储,这将很有用。只有这样我们才能真正确定导致问题的原因 【参考方案1】:

我在 Kafka Connect 中启用 SASL_SSL 时遇到了这个问题:

[2018-10-12 12:33:36,426] ERROR WorkerSinkTaskid=test-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) 
java.lang.OutOfMemoryError: Java heap space

检查 ConsumerConfig 值显示我的配置未应用:

[2018-10-12 12:33:35,573] INFO ConsumerConfig values: 
...
security.protocol = PLAINTEXT

我found out,您必须在属性文件中为配置添加producer.consumer. 前缀。

consumer.security.protocol=SASL_SSL

【讨论】:

也可以在这里查看我的答案:***.com/a/61362427/1446479

以上是关于启用 SSL 后 Kafka Connect 超出 Java 堆空间的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 如何使用 SSL 连接 Elasticsearch?

Kafka-connect,Bootstrap 代理断开连接

Kafka SSL 握手失败问题

WARN 发送 SSL 关闭消息失败(Kafka SSL 配置问题)

Kafka:从启用更改跟踪的 SQL 服务器读取

使用本地 kafka-connect 集群连接远程数据库的连接超时