Pyspark 使用 ArrayWritable

Posted

技术标签:

【中文标题】Pyspark 使用 ArrayWritable【英文标题】:Pyspark usage of ArrayWritable 【发布时间】:2015-12-01 10:03:28 【问题描述】:

我尝试在 pyspark 上保存一个键值 RDD。 RDD 的每个单元格都有类型,用伪代码编写:

((str,str),(int,[(int,int),...]))`

我想把它保存在 hadoop 文件系统上。为此,我将列表转换为元组并调用.saveAsSequenceFile。但是使用sc.sequenceFile 重新加载rdd 失败并出现java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.io.ArrayWritable.<init>()

这是一个尝试保存 (int,int) 的 RDD 的最小示例。

import pyspark as spark, math

scConf = spark.SparkConf().setAppName('minimal_example')
sc = spark.SparkContext( conf = scConf )

def divs( x ):
    for n in xrange(1, int(math.sqrt(x))+1 ):
        if x % n == 0: yield n
def constructor( i ):
    return ( i, tuple(divs(i)) )

rdd = sc.parallelize(map(constructor,range(2,61)))
rdd.saveAsSequenceFile("min.seq")

当我使用 sc.sequenceFile 通过交互式 pyspark 加载它时,它也失败了。怎么了?为什么它在 python 中尝试保存数组,而我实际上有元组。以及如何在 pyspark 中扩展 ArrayWritable 以获得默认构造函数?

【问题讨论】:

你能发布 sc.parallelize(map(constructor,range(2,61))).take(2) 吗? 【参考方案1】:

你真的需要序列文件吗?您可以保存AsTextFile、加载文本文件和映射以恢复值。

rdd.saveAsTextFile('test') 

sc.textFile('test').map(lambda row: ast.literal_eval(row))

【讨论】:

以上是关于Pyspark 使用 ArrayWritable的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark-SQL 与 Pyspark 使用 Delta 格式的查询表有啥区别?

Pyspark - 使用 python 或 pyspark 转换 excel 文件的行和列

避免在 pyspark 代码中使用 collect() 函数的最佳方法是啥?编写优化pyspark代码的最佳方法?

pyspark使用ipython

无法使用 pyspark 写入 hdfs

使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧