在 RDD PySpark 上执行操作
Posted
技术标签:
【中文标题】在 RDD PySpark 上执行操作【英文标题】:Perofrming the operations on RDD PySpark 【发布时间】:2016-07-26 14:02:51 【问题描述】:我在获取文件名时使用 Python Spark API 时遇到问题。例如
recordDataFrame=originalDataFrame \
.withColumn('file_name',input_file_name())
将file_name列添加到dataFrame。该列添加到dataFrame
recordDataFrame.take(1)
上面显示了具有值的列 但是当我将 dataFrame 转换为 RDD 或循环 RDD file_name 列时没有任何价值。
例如,
rdd_data=recordDataFrame.rdd
print(rdd_data.take(1))
这将显示带有空白值的文件名列 或者,如果我直接循环数据帧,那么文件名也没有任何价值
recordDataFrame.foreach(process_data)
但是,如果我在添加列时将静态值传递给 file_name 而不是使用 input_file_name(),那么一切正常
【问题讨论】:
这是一个错误。当数据传递给 Python 执行器时,用于获取文件名的所需上下文丢失。应该在 2.0.0 中修复 谢谢@zero323。你知道我们如何使用当前的spark 1.6解决这个问题吗?基本上需要dataFrame中每条记录的文件名。有没有办法使用 pySpark 实现这一点? 是的,但我怀疑你会喜欢它,而且它是一个严重的黑客攻击。一会儿我会发布一些想法。 【参考方案1】:这是一个已在 2.0.0 中解决的错误。
免责声明:
这些都是严重的黑客行为,除非你绝望,否则应该避免。这些也没有经过适当的测试。如果可以的话最好更新一下。
加载数据后触发随机播放,例如:
recordDataFrame.repartition("file_name")
或
recordDataFrame.orderBy("file_name")
截断血统,如high-performance-spark/high-performance-spark-examples 所示(代码已获得 GPL 许可,因此无法在此处复制,但主要思想是访问内部 Java RDD,对其进行缓存并重新创建 DataFrame):
cutLineage(recordDataFrame)
【讨论】:
感谢 zero323 的回答。您建议的 hack 可以正常工作。到目前为止,将使用上述 hack,一旦 2.0 发布,就会更新到那个。 @Yogesh It has been released :) 但它仍处于测试阶段。还不是稳定版本,对吧? 昨天刚刚检查了它的发布,只有 Spark 2.0.0(2016 年 7 月 26 日)感谢您提供的信息。以上是关于在 RDD PySpark 上执行操作的主要内容,如果未能解决你的问题,请参考以下文章