使用 Spark DataFrame 中的多列更改行的值

Posted

技术标签:

【中文标题】使用 Spark DataFrame 中的多列更改行的值【英文标题】:Change value of a row using multiple columns in Spark DataFrame 【发布时间】:2019-02-05 05:47:04 【问题描述】:

我得到了这种格式的数据框(df)。

df.show()
********************
X1 | x2  | X3 | ..... | Xn   | id_1 | id_2 | .... id_23
1  |  ok |good|         john | null | null |     |null
2  |rick |good|       | ryan | null | null |     |null
....

我有一个数据框,其中有很多列,数据框名为 df。我需要编辑此数据框(df)中的列。我有 2 个映射,m1 (Integer->Integer) 和 m2 (Integer->String) 映射。

我需要查看每一行并获取列 X1 的值,并查看 m1 中 X1 的映射值,该值将在 [1,23] 范围内,让它为 5,并在其中找到 X1 的映射值m2 类似于 X8。我需要将 X8 列的值添加到 id_5。我有以下代码,但我无法让它工作。

val dfEdited = df.map( (row) => 
  val mapValue = row.getAs("X1")
  row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue)
)

【问题讨论】:

【参考方案1】:

您在row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue) 中所做的事情没有意义。

首先,您正在为操作getAs("id_"+m1.get(mapValue)) 的结果分配一个值,这为您提供了一个不可变的值。其次,您没有正确使用方法getAs,因为您需要指定该方法返回的数据类型。

我不确定我是否正确理解了你想要做什么,我猜你错过了一些细节。无论如何,这是我得到的,它工作正常。

当然,我对每一行代码都做了注释,方便大家理解。

// First of all we need to create a case class to wrap the content of each row.
case class Schema(X1: Int, X2: String, X3: String, X4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String])


val dfEdited = ds.map( row => 
  // We use the getInt method to get the value of a field which is expected to be Int
  val mapValue = row.getInt(row.fieldIndex("X1"))

  // fieldIndex gives you the position inside the row fo the field you are looking for. 
  // Regarding m1(mapValue), NullPointer might be thrown if mapValue is not in that Map. 
  // You need to implement mechanisms to deal with it (for example, an if...else clause, or using the method getOrElse)
  val indexToModify = row.fieldIndex("id_" + m1(mapValue)) 

  // We convert the row to a sequence, and pair each element with its index.
  // Then, with the map method we generate a new sequence.
  // We replace the element situated in the position indexToModify.
  // In addition, if there are null values, we have to convert it to an object of type Option.
  // It is necessary for the next step.
  val seq = row.toSeq.zipWithIndex.map(x => if (x._2 == indexToModify) Some(m2(mapValue)) else if(x._1 == null) None else x._1)


  // Finally, you have to create the Schema object by using pattern matching
  seq match 
    case Seq(x1: Int, x2: String, x3: String, x4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String]) => Schema(x1, x2,x3,x4, id_1, id_2, id_3)
  
)

一些cmets:

ds 对象是一个数据集。数据集必须具有结构。您无法修改 map 方法中的行并返回它们,因为 Spark 将不知道数据集的结构是否已更改。出于这个原因,我返回了一个案例类对象,因为它为 Dataset 对象提供了一个结构。

请记住,您可能会遇到空值问题。如果您不建立机制来处理例如 X1 的值不在 m1 中的情况,则此代码可能会向您抛出空指针。

希望它有效。

【讨论】:

以上是关于使用 Spark DataFrame 中的多列更改行的值的主要内容,如果未能解决你的问题,请参考以下文章

从 Spark DataFrame 中的单列派生多列

使用多列作为存储在 Apache Spark 中的数组中的键来连接两个 Dataframe

在分区 Spark DataFrame 中使用多列是不是会使读取速度变慢?

使用 Scala 将多列转换为 Spark Dataframe 上的一列地图

将 Spark Dataframe 字符串列拆分为多列

将 Spark Dataframe 字符串列拆分为多列