如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row
Posted
技术标签:
【中文标题】如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row【英文标题】:How to add columns into org.apache.spark.sql.Row inside of mapPartitions 【发布时间】:2015-11-23 16:42:45 【问题描述】:我是 scala 和 spark 的新手,请记住这一点 :)
其实我有三个问题
-
我应该如何定义函数以将其传递到 df.rdd.mapPartitions,如果我想创建具有少量附加列的新行
如何在 Row 对象中添加几列(或新建一个)
如何从创建的 RDD 创建 DataFrame
提前谢谢你
【问题讨论】:
请问您为什么需要这个?也许一些示例代码/输入/输出。这是可能的,但通常有更好的方法。 当然,我有两组不同的元素,一组很大(以数据框的形式),另一组非常小,我在这两组之间找到了一些最小值。我的想法是我将较小的集合放入一些非常优化的结构中,将其传递给 mapPartitions,为每个项目计算一些值并将它们“靠近”到其他值。 应该不需要mapPartitions
。
【参考方案1】:
通常不需要这样做,最好使用 UDF,但您可以:
我应该如何定义函数以将其传递到 df.rdd.mapPartitions,如果我想创建带有少量附加列的新行
它应该采用 Iterator[Row]
并返回 Iterator[T]
所以在你的情况下你应该使用这样的东西
import org.apache.spark.sql.Row
def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
如何在 Row 对象中添加几列(或创建一个新的)
有多种方法可以访问Row
值,包括Row.get*
方法、Row.toSeq
等。可以使用Row.apply
、Row.fromSeq
、Row.fromTuple
或RowFactory
创建新的Row
。例如:
def transformRow(row: Row): Row = Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
如何从创建的 RDD 中创建 DataFrame
如果您有RDD[Row]
,您可以使用SQLContext.createDataFrame
并提供架构。
把这一切放在一起:
import org.apache.spark.sql.types.IntegerType, StructField, StructType
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
val newSchema = StructType(df.schema.fields ++ Array(
StructField("z", IntegerType, false), StructField("v", IntegerType, false)))
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
// +---+----+---+---+
// | x| y| z| v|
// +---+----+---+---+
// |1.0| 2.0| -1| 1|
// |0.0|-1.0| -1| 1|
// |3.0| 4.0| -1| 1|
// |6.0|-2.3| -1| 1|
// +---+----+---+---+
【讨论】:
我喜欢 scala 代码的编写方式 :) 谢谢! @AzatFazulzyanov 更准确地说:你喜欢 zero323 编写 scala (/spark) 代码的方式! v 使用newSchema
做得很好。节省我去重新设计构造
这怎么能用 Java 写?
@zero323 您能否提供一个使用 Pyspark 执行此操作的示例?我需要一个示例来向 mapPartitions 中的 Row 添加新列。以上是关于如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row的主要内容,如果未能解决你的问题,请参考以下文章