启用 SSL 后 Kafka Connect 超出 Java 堆空间
Posted
技术标签:
【中文标题】启用 SSL 后 Kafka Connect 超出 Java 堆空间【英文标题】:Kafka Connect Out of Java heap space after enabling SSL 【发布时间】:2019-03-14 08:20:42 【问题描述】:我最近启用了 SSL 并尝试以分布式模式启动 Kafka 连接。 运行时
connect-distributed connect-distributed.properties
我收到以下错误:
[2018-10-09 16:50:57,190] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:106)
[2018-10-09 16:50:55,471] ERROR WorkerSinkTaskid=sink-mariadb-test Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
和
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:694)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我也尝试通过运行增加max and initial heap size by setting the KAFKA_HEAP_OPTS environment variable
KAFKA_HEAP_OPTS="-Xms4g -Xmx6g" connect-distributed connect-distributed.properties
但还是不行。
我的问题是:
-
SSL 身份验证是否会影响内存使用?
如何解决此问题?
编辑: 我试图禁用 SSL,一切正常,没有任何问题。
【问题讨论】:
我正在运行 Connect with10g
,但没有主动监控堆,不知道该告诉您将其设置为什么
@cricket_007 我在我的开发环境中运行 Confluent CLI,总共有大约 12GB 的内存,所以将 Xmx 设置为 10g 对我来说不是一个选项。
如果使用 Confluent CLI,那么您将直接运行 confluent start
和 confluent load
而不是 connect-distributed
,不是吗?在这种情况下,我认为KAFKA_HEAP_OPTS
不会以相同的方式应用于这些脚本,除非您实际上将导出添加到您的bashrc
,例如
@cricket_007 确实如此。虽然我正在运行 Confluent CLI,但我正在分布式模式下运行 Kafka Connect。我也将KAFKA_HEAP_OPTS
导出到我的bashrc
文件,但也不起作用。
正如我所提到的,如果您可以启用 JMX 监控并在发生此类错误时获取堆转储,这将很有用。只有这样我们才能真正确定导致问题的原因
【参考方案1】:
我在 Kafka Connect 中启用 SASL_SSL
时遇到了这个问题:
[2018-10-12 12:33:36,426] ERROR WorkerSinkTaskid=test-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Java heap space
检查 ConsumerConfig 值显示我的配置未应用:
[2018-10-12 12:33:35,573] INFO ConsumerConfig values:
...
security.protocol = PLAINTEXT
我found out,您必须在属性文件中为配置添加producer.
或consumer.
前缀。
consumer.security.protocol=SASL_SSL
【讨论】:
也可以在这里查看我的答案:***.com/a/61362427/1446479以上是关于启用 SSL 后 Kafka Connect 超出 Java 堆空间的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 如何使用 SSL 连接 Elasticsearch?
Kafka-connect,Bootstrap 代理断开连接