如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row

Posted

技术标签:

【中文标题】如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row【英文标题】:How to add columns into org.apache.spark.sql.Row inside of mapPartitions 【发布时间】:2015-11-23 16:42:45 【问题描述】:

我是 scala 和 spark 的新手,请记住这一点 :)

其实我有三个问题

    我应该如何定义函数以将其传递到 df.rdd.mapPartitions,如果我想创建具有少量附加列的新行 如何在 Row 对象中添加几列(或新建一个) 如何从创建的 RDD 创建 DataFrame

提前谢谢你

【问题讨论】:

请问您为什么需要这个?也许一些示例代码/输入/输出。这是可能的,但通常有更好的方法。 当然,我有两组不同的元素,一组很大(以数据框的形式),另一组非常小,我在这两组之间找到了一些最小值。我的想法是我将较小的集合放入一些非常优化的结构中,将其传递给 mapPartitions,为每个项目计算一些值并将它们“靠近”到其他值。 应该不需要mapPartitions 【参考方案1】:

通常不需要这样做,最好使用 UDF,但您可以:

我应该如何定义函数以将其传递到 df.rdd.mapPartitions,如果我想创建带有少量附加列的新行

它应该采用 Iterator[Row] 并返回 Iterator[T] 所以在你的情况下你应该使用这样的东西

import org.apache.spark.sql.Row

def transformRows(iter: Iterator[Row]): Iterator[Row] = ???

如何在 Row 对象中添加几列(或创建一个新的)

有多种方法可以访问Row 值,包括Row.get* 方法、Row.toSeq 等。可以使用Row.applyRow.fromSeqRow.fromTupleRowFactory 创建新的Row。例如:

def transformRow(row: Row): Row =  Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))

如何从创建的 RDD 中创建 DataFrame

如果您有RDD[Row],您可以使用SQLContext.createDataFrame 并提供架构。

把这一切放在一起:

import org.apache.spark.sql.types.IntegerType, StructField, StructType

val  df = sc.parallelize(Seq(
    (1.0, 2.0), (0.0, -1.0),
    (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)

val newSchema = StructType(df.schema.fields ++ Array(
  StructField("z", IntegerType, false), StructField("v", IntegerType, false)))

sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

// +---+----+---+---+
// |  x|   y|  z|  v|
// +---+----+---+---+
// |1.0| 2.0| -1|  1|
// |0.0|-1.0| -1|  1|
// |3.0| 4.0| -1|  1|
// |6.0|-2.3| -1|  1|
// +---+----+---+---+

【讨论】:

我喜欢 scala 代码的编写方式 :) 谢谢! @AzatFazulzyanov 更准确地说:你喜欢 zero323 编写 scala (/spark) 代码的方式! v 使用newSchema 做得很好。节省我去重新设计构造 这怎么能用 Java 写? @zero323 您能否提供一个使用 Pyspark 执行此操作的示例?我需要一个示例来向 mapPartitions 中的 Row 添加新列。

以上是关于如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row的主要内容,如果未能解决你的问题,请参考以下文章

如何动态地将列添加到 DataFrame?

如何将列添加到预先存在的 uipickerview

如何使用 DAO 将列添加到 FoxPro 文件

熊猫(python):如何将列添加到数据框以进行索引?

如何将列添加到已经存在的超网格?

如何在 BigQuery 上展开数组以将列添加到现有表