在 RDD PySpark 上执行操作

Posted

技术标签:

【中文标题】在 RDD PySpark 上执行操作【英文标题】:Perofrming the operations on RDD PySpark 【发布时间】:2016-07-26 14:02:51 【问题描述】:

我在获取文件名时使用 Python Spark API 时遇到问题。例如

recordDataFrame=originalDataFrame \
                    .withColumn('file_name',input_file_name()) 

将file_name列添加到dataFrame。该列添加到dataFrame

recordDataFrame.take(1)

上面显示了具有值的列 但是当我将 dataFrame 转换为 RDD 或循环 RDD file_name 列时没有任何价值。

例如,

rdd_data=recordDataFrame.rdd
print(rdd_data.take(1))

这将显示带有空白值的文件名列 或者,如果我直接循环数据帧,那么文件名也没有任何价值

recordDataFrame.foreach(process_data)

但是,如果我在添加列时将静态值传递给 file_name 而不是使用 input_file_name(),那么一切正常

【问题讨论】:

这是一个错误。当数据传递给 Python 执行器时,用于获取文件名的所需上下文丢失。应该在 2.0.0 中修复 谢谢@zero323。你知道我们如何使用当前的spark 1.6解决这个问题吗?基本上需要dataFrame中每条记录的文件名。有没有办法使用 pySpark 实现这一点? 是的,但我怀疑你会喜欢它,而且它是一个严重的黑客攻击。一会儿我会发布一些想法。 【参考方案1】:

这是一个已在 2.0.0 中解决的错误。

免责声明

这些都是严重的黑客行为,除非你绝望,否则应该避免。这些也没有经过适当的测试。如果可以的话最好更新一下。

    加载数据后触发随机播放,例如:

    recordDataFrame.repartition("file_name")
    

    recordDataFrame.orderBy("file_name")
    

    截断血统,如high-performance-spark/high-performance-spark-examples 所示(代码已获得 GPL 许可,因此无法在此处复制,但主要思想是访问内部 Java RDD,对其进行缓存并重新创建 DataFrame):

    cutLineage(recordDataFrame)
    

【讨论】:

感谢 zero323 的回答。您建议的 hack 可以正常工作。到目前为止,将使用上述 hack,一旦 2.0 发布,就会更新到那个。 @Yogesh It has been released :) 但它仍处于测试阶段。还不是稳定版本,对吧? 昨天刚刚检查了它的发布,只有 Spark 2.0.0(2016 年 7 月 26 日)感谢您提供的信息。

以上是关于在 RDD PySpark 上执行操作的主要内容,如果未能解决你的问题,请参考以下文章

如何更改pyspark中的列元数据?

PySpark|RDD编程基础

在 pyspark RDD 上显示分区

Spark(PySpark)如何同步多个worker节点更新RDD

PySpark 重新分区 RDD 元素

Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame