如何将行rdd转换为键入的rdd

Posted

技术标签:

【中文标题】如何将行rdd转换为键入的rdd【英文标题】:How to convert row rdd to typed rdd 【发布时间】:2016-10-15 18:35:30 【问题描述】:

是否可以将 Row RDD 转换为 Typed RDD。在下面的代码中,我可以将行 JavaRDD 转换为 Counter 类型的 JavaRDD

代码:

JavaRDD<Counter> rdd = sc.parallelize(counters);
Dataset<Counter> ds = sqlContext.createDataset(rdd.rdd(), encoder);

DataFrame df = ds.toDF();
df.show()

df.write().parquet(path);
DataFrame newDataDF = sqlContext.read().parquet(path);

newDataDF.toJavaRDD(); // This gives a row type rdd

在 Scala 中:

case class A(countId: Long, bytes: Array[Byte], blist: List[B])
case class B(id: String, count: Long)

val b1 = B("a", 1L)
val b2 = B("b", 2L)

val a1 = A(1L, Array(1.toByte,2.toByte), List(a1, a2))
val rdd = sc.parallelize(List(a1))

val dataSet: Dataset[A] = sqlContext.createDataset(rdd)
val df = dataSet.toDF()

 // this shows, so this last entry is for List[B] in which it is storing string as null
 |1|[01 02]| [[null,3984726108...|]
 df.show

df.write.parquet(path)
val roundTripRDD = sqlContext.read.parquet(path).as[A].rdd

//throws error here when run show on df
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java',
Line 300, Column 68: 
No applicable constructor/method found for actual parameters 
"long, byte[], scala.collection.Seq"; candidates are:
"test.data.A(long, byte[], scala.collection.immutable.List)"


roundTripRDD.toDF.show

assertEquals(roundTripRDD, rdd)

是否需要为案例类提供某种构造函数?

【问题讨论】:

你必须在Dataset中使用toJavaRDD,而不是在Dataframe中。 我没听懂,什么意思? DataFrame to toJavaRDD() 你会得到 JavaRDD 但是如果你有 Dataset toJavaRDD 你会得到 JavaRDD 哦,好的。我更改了我的代码以反映这一点,并且我能够让它在 scala 中工作。我也试图让它在 scala 中工作,但它不工作。知道我在那里想念什么吗?我提到的那一行是抛出错误 sqlContext.read.parquet(path).as[A].show 【参考方案1】:

试试:

sqlContext.read().parquet(path).as(encoder).rdd().toJavaRDD();

【讨论】:

sqlContext.read().parquet(path).as(encoder).rdd().toJavaRDD(); LostInOverflow:在 scala 中编写类似代码是否需要包含其他内容,我已更新问题以反映 scala 代码? 我试过了,我认为问题出在案例类上,当我尝试在控制台中打印它时它显示了这个模式更新了问题

以上是关于如何将行rdd转换为键入的rdd的主要内容,如果未能解决你的问题,请参考以下文章

如何将列表的 RDD 转换为压缩列表的 RDD?

如何将 RDD[Row] 转换为 RDD[Vector]

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

如何在 ipython 中将 Spark RDD 转换为 pandas 数据帧?

如何在 ipython 中将 Spark RDD 转换为 pandas 数据帧?

如何在Scala中将rdd对象转换为数据框