如何捕获 pyspark foreachPartition 的日志输出?

Posted

技术标签:

【中文标题】如何捕获 pyspark foreachPartition 的日志输出?【英文标题】:How can I catch the log output of pyspark foreachPartition? 【发布时间】:2016-12-20 06:59:10 【问题描述】:

pyspark

当我在 foreachRdd 方法中使用 print() 时,它起作用了!

def echo(data):
print data
....
lines = MQTTUtils.createStream(ssc, brokerUrl, topics)

topic_rdd = lines.map(lambda x: get_topic_rdd(x)).filter(lambda x: x[0]!= None)

topic_rdd.foreachRDD(lambda x: echo(x))

我可以使用 spark-on-yarn 查看登录控制台

但是如果我使用 foreachPartition 的方法,它看不到任何 print() 的日志

topic_rdd = lines.map(lambda x: get_topic_rdd(x)).filter(lambda x: x[0]!= None)

topic_rdd.foreachRDD(lambda x: x.foreachPartition(lambda y: echo(y)))

如果我想看日志,我需要进入不同的分区看日志吗?我可以在 Single 控制台中看到日志吗?顺便说一下,我可以在使用 scala 但 python 的 Single 控制台中看到日志。

【问题讨论】:

【参考方案1】:

rdd.foreachRDD 在 driver 节点上运行,该节点向您的终端发送消息

rdd.foreachPartition 在 worker 节点上运行,该节点向 worker 的终端发送消息,您看不到

如果您想查看日志,只需将它们保存为文件

【讨论】:

以上是关于如何捕获 pyspark foreachPartition 的日志输出?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark Delta Lake 捕获表不是 delta 表异常

在pyspark数据框的列中使用正则表达式捕获两个字符串之间的第一次出现的字符串

如何在pyspark中将rdd行转换为带有json结构的数据框?

Pyspark 结构化流处理

将 DataFrame show() 的结果保存到 pyspark 中的字符串

在 PySpark 中提取多个正则表达式匹配项