将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时

Posted

技术标签:

【中文标题】将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时【英文标题】:Writing large DataFrame from PySpark to Kafka runs into timeout 【发布时间】:2019-05-14 20:43:06 【问题描述】:

我正在尝试将包含大约 2.3 亿条记录的数据帧写入 Kafka。更具体地说是Kafka-enable Azure Event Hub,但我不确定这是否真的是我的问题的根源。

EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

这可以很好地启动并成功(并且非常快)将大约 3-4 百万条记录写入队列。但是几分钟后工作就停止了,并显示如下消息:

org.apache.spark.SparkException:作业因阶段故障而中止:阶段 7.0 中的任务 6 失败 4 次,最近一次失败:阶段 7.0 中丢失任务 6.3(TID 248、10.139.64.5、执行程序 1):kafkashaded .org.apache.kafka.common.errors.TimeoutException:mytopic-18 的 61 条记录到期:自上次追加以来已过去 32839 毫秒

org.apache.spark.SparkException:作业因阶段故障而中止:阶段 8.0 中的任务 13 失败 4 次,最近一次失败:阶段 8.0 中丢失任务 13.3(TID 348、10.139.64.5、执行程序 1):kafkashaded .org.apache.kafka.common.errors.TimeoutException: 请求超时。

另外,我从来没有看到正在创建/写入检查点文件。 我也玩过.option("kafka.delivery.timeout.ms", 30000) 和不同的值,但这似乎没有任何效果。

我在 Azure Databricks 集群 5.0 版(包括 Apache Spark 2.4.0、Scala 2.11)中运行它

我没有在我的 Event Hub 上看到任何错误,例如限制,所以应该没问题。

【问题讨论】:

你能分享一些你的 Spark UI 的截图(特别是执行者) 你是一一发消息到kafka还是批量发消息到kafka...尝试批量发消息到kafka 我分批发帖。只是用减少的批量再次测试。默认值约。 16000 可能太高了 谢谢大家,现在想通了。请参阅下面的答案。 【参考方案1】:

终于想通了(大部分):

原来大约 16000 条消息的默认批量大小对于端点来说太大了。在我将 batch.size 参数设置为 5000 后,它工作并以每分钟大约 700k 条消息写入事件中心。此外,上面的超时参数是错误的,只是被忽略了。是kafka.request.timeout.ms

唯一的问题是它仍然随机运行超时并且显然又从头开始,所以我最终得到了重复。将为此打开another question。

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

【讨论】:

以上是关于将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时的主要内容,如果未能解决你的问题,请参考以下文章

pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()

在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)

将 pyspark groupedData 转换为 pandas DataFrame

如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告

PySpark - 从 Numpy 矩阵创建 DataFrame

将 Pyspark Dataframe 列从数组转换为新列