在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO 读取:SSL 握手失败

Posted

技术标签:

【中文标题】在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO 读取:SSL 握手失败【英文标题】:Apache Beam pipeline running on Dataflow failed to read from KafkaIO: SSL handshake failed 【发布时间】:2019-04-08 13:38:28 【问题描述】:

我正在构建一个 Apache Beam 管道,以从 Kafka 作为无限源读取。

我能够使用直接运行器在本地运行它。

但是,当在云端使用 Google Cloud Dataflow 运行程序运行时,管道会失败并附加异常堆栈跟踪。

似乎最终是 Conscrypt Java library 正在抛出 javax.net.ssl.SSLException: Unable to parse TLS packet header。我不确定如何解决这个问题。

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@33b5ff70
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:126)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        java.util.concurrent.FutureTask.report(FutureTask.java:122)
        java.util.concurrent.FutureTask.get(FutureTask.java:206)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:112)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLException: Unable to parse TLS packet header
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:782)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:723)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:688)
        org.conscrypt.Java8EngineWrapper.unwrap(Java8EngineWrapper.java:236)
        org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:464)
        org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328)
        org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255)
        org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79)
        org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:460)
        org.apache.kafka.common.network.Selector.poll(Selector.java:398)
        org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:468)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:450)
        org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1772)
        org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1411)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.setupInitialOffset(KafkaUnboundedReader.java:641)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.lambda$start$0(KafkaUnboundedReader.java:106)
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

【问题讨论】:

这看起来更像是网络问题。你有没有设置任何非默认的network parameters? @LefterisS 谢谢!不过,我似乎已将管道设置为与我的 Kafka 服务器位于同一网络/子网中。 我有谷歌云平台支持,我可以告诉你,这些通常是持续时间和影响相对较小的暂时性问题。你还在处理这个问题吗? @LefterisS 刚刚试了一下,不幸的是问题仍然存在。 您是在使用 Java8 而不是更新版本构建的吗? 【参考方案1】:

看起来 Conscrypt 在许多这样的 cotexts 中会导致 SSL 错误。 Beam 2.9.0 中的数据流工作者可以选择禁用此功能。请尝试。 --experiment=disable_conscrypt_security_provider。或者,您可以尝试不启用 Conscrypt 的 Beam 2.4.x。

【讨论】:

谢谢拉古!我可以通过升级到使用Beam SDK 2.9.0 来解决这个问题。发布文档中提到:“数据流运行器已更新为使用 Conscrypt 作为默认安全提供程序。” 对于 2.9.0,最终版本默认禁用 Conscrypt。有一个标志可以启用它--experiments=enable_conscrypt_security_provider 那么 2.9.0 中默认的安全提供程序是什么?

以上是关于在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO 读取:SSL 握手失败的主要内容,如果未能解决你的问题,请参考以下文章

请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam

Apache Beam on Dataflow - 加载外部文件

我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?

Dataflow 中的自定义 Apache Beam Python 版本

防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈

使用 Apache Beam 的 Dataflow 批量加载的性能问题