Pyspark 使用 ArrayWritable
Posted
技术标签:
【中文标题】Pyspark 使用 ArrayWritable【英文标题】:Pyspark usage of ArrayWritable 【发布时间】:2015-12-01 10:03:28 【问题描述】:我尝试在 pyspark 上保存一个键值 RDD。 RDD 的每个单元格都有类型,用伪代码编写:
((str,str),(int,[(int,int),...]))`
我想把它保存在 hadoop 文件系统上。为此,我将列表转换为元组并调用.saveAsSequenceFile
。但是使用sc.sequenceFile
重新加载rdd 失败并出现java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.io.ArrayWritable.<init>()
。
这是一个尝试保存 (int,int)
的 RDD 的最小示例。
import pyspark as spark, math
scConf = spark.SparkConf().setAppName('minimal_example')
sc = spark.SparkContext( conf = scConf )
def divs( x ):
for n in xrange(1, int(math.sqrt(x))+1 ):
if x % n == 0: yield n
def constructor( i ):
return ( i, tuple(divs(i)) )
rdd = sc.parallelize(map(constructor,range(2,61)))
rdd.saveAsSequenceFile("min.seq")
当我使用 sc.sequenceFile 通过交互式 pyspark
加载它时,它也失败了。怎么了?为什么它在 python 中尝试保存数组,而我实际上有元组。以及如何在 pyspark 中扩展 ArrayWritable 以获得默认构造函数?
【问题讨论】:
你能发布 sc.parallelize(map(constructor,range(2,61))).take(2) 吗? 【参考方案1】:你真的需要序列文件吗?您可以保存AsTextFile、加载文本文件和映射以恢复值。
rdd.saveAsTextFile('test')
sc.textFile('test').map(lambda row: ast.literal_eval(row))
【讨论】:
以上是关于Pyspark 使用 ArrayWritable的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark-SQL 与 Pyspark 使用 Delta 格式的查询表有啥区别?
Pyspark - 使用 python 或 pyspark 转换 excel 文件的行和列