如何将新列和相应的行特定值添加到火花数据帧?

Posted

技术标签:

【中文标题】如何将新列和相应的行特定值添加到火花数据帧?【英文标题】:How to add new columns and the corresponding row specific values to a spark dataframe? 【发布时间】:2020-02-27 12:46:25 【问题描述】:

我是 Scala/Spark 世界的新手。

我有一个名为 person 的 spark 数据集(带有案例类的 df)。

scala> val person_with_contact = person.map(r => (
     | r.id,
     | r.name,
     | r.age
     | )).toDF()

现在,我想向该数据集的每条记录添加一个地址属性列表(如 apt_no、street、city、zip)。获取地址属性列表,我有一个函数,它将人员的 id 作为输入并返回一个包含所有地址属性及其对应值的映射。

我尝试了以下方法和其他一些 Stack Overflow 建议的方法,但我还不能解决它。 (参考 - 静态 col ex - Spark, add new Column with the same value in Scala)

scala> val person_with_contact = person.map(r => (
    | r.id,
    | r.name,
    | r.age,
    | getAddress(r.id) 
    | )).toDF()

最终的数据框应具有以下列。

id, name, age, apt_no, street, city, zip

【问题讨论】:

这能回答你的问题吗? Spark Build Custom Column Function, user defined function @Shaido,感谢您的回复。我已经有一个 UDF 函数。我不确定如何从这个 UDF 返回地址属性列表,以便将它们作为单独的列添加到新数据帧中。 @HristoIliev,感谢您的回复。每个人只有一个地址,由 4 个属性表示。我有一个 UDF 函数,它将一个人的 id 作为输入并将 4 个属性作为地图返回。我想用地址字段加入id, name, age,即。 apt_no, street, city, zip。最后,它应该是一个包含所有 7 个属性的单个数据框。 @ManasMukherjee,在第二次阅读您的问题时,我得知您正在添加 属性列表,这就是我删除评论的原因。 person 是 DataFrame 还是 RDD? person 是使用 case class 创建的数据集,其中 id、name 和 age 作为属性。 【参考方案1】:

使用 udf

package yourpackage

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object MainDemo 

  def getAddress(id: Int): String = 
    //do your things
    "address id:" + id
  

  def getCity(id: String): String = 
    //do your things
    "your city :" + id
  

  def getZip(id: String): String = 
    //do your things
    "your zip :" + id
  

  def main(args: Array[String]): Unit = 
    val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[3]").enableHiveSupport().getOrCreate()
    val person = Seq(Person(1, "name_m", 21), Person(2, "name_w", 40))
    import spark.implicits._
    val person_with_contact = person.map(r => (r.id, r.name, r.age, getAddress(r.id))).toDF("id", "name", "age", "street")
    person_with_contact.printSchema()
    //root
    // |-- id: integer (nullable = false)
    // |-- name: string (nullable = true)
    // |-- age: integer (nullable = false)
    // |-- street: string (nullable = true)
    val result = person_with_contact.select(
      col("id"),
      col("name"),
      col("age"),
      col("street"),
      udf  id: String =>
        val city = getCity(id)
        city
      .apply(col("id")).as("city"),
      udf  id: String =>
        val city = getZip(id)
        city
      .apply(col("id")).as("zip")
    )
    result.printSchema()
    //root
    // |-- id: integer (nullable = false)
    // |-- name: string (nullable = true)
    // |-- age: integer (nullable = false)
    // |-- street: string (nullable = true)
    // |-- city: string (nullable = true)
    // |-- zip: string (nullable = true)
    result.show()
    //+---+------+---+------------+------------+-----------+
    //| id|  name|age|      street|        city|        zip|
    //+---+------+---+------------+------------+-----------+
    //|  1|name_m| 21|address id:1|your city :1|your zip :1|
    //|  2|name_w| 40|address id:2|your city :2|your zip :2|
    //+---+------+---+------------+------------+-----------+
  


【讨论】:

感谢您撰写分享自定义代码和解决方案。与其为地址的每个组件维护单独的 UDF,我更愿意像 Hristo 共享的那样拥有一个更统一的 UDF。我今天学了些新东西。再次感谢。【参考方案2】:

鉴于您已经有一个将地址作为地图返回的函数,您可以创建一个 UDF,将该地图转换为结构,然后选择所有地图字段:

import org.apache.spark.sql.functions.*

// For demo only
def getAddress(id: Int): Option[Map[String, String]] = 
  id match 
    case 1 => Some(Map("apt_no" -> "12", "street" -> "Main Street", "city" -> "NY", "zip" -> "1234"))
    case 2 => Some(Map("apt_no" -> "1", "street" -> "Back Street", "city" -> "Gotham", "zip" -> "G123"))
    case _ => None
  


case class Address(apt_no: String, street: String, city: String, zip: String)

def getAddressUdf = udf((id: Int) => 
  getAddress(id) map (m =>
    Address(m("apt_no"), m("street"), m("city"), m("zip"))
  )
)

udf() 将返回案例类实例的函数转换为返回具有相应架构的结构列的 UDF。 Option[_] 返回类型会自动解析为 null-able 数据类型。然后可以通过$"struct_col_name.*" 选择将结构列的字段扩展为多个列:

scala> val df = Seq(Person(1, "John", 32), Person(2, "Cloe", 27), Person(3, "Pete", 55)).toDS()
df: org.apache.spark.sql.Dataset[Person] = [id: int, name: string ... 1 more field]

scala> df.show()
+---+----+---+
| id|name|age|
+---+----+---+
|  1|John| 32|
|  2|Cloe| 27|
|  3|Pete| 55|
+---+----+---+

scala> df
     | .withColumn("addr", getAddressUdf($"id"))
     | .select($"id", $"name", $"age", $"addr.*")
     | .show()
+---+----+---+------+------------+------+-----+
| id|name|age|apt_no|      street|  city|  zip|
+---+----+---+------+------------+------+-----+
|  1|John| 32|    12| Main Street|    NY| 1234|
|  2|Cloe| 27|     1| Back Street|Gotham| G123|
|  3|Pete| 55|  null|        null|  null| null|
+---+----+---+------+------------+------+-----+

【讨论】:

一个相关的qts。有没有办法在不知道地址结构的情况下生成 7 列?就我而言,我不控制地址的元素。客户端可以从地址中添加/删除属性。我只是想在不知道地址结构的情况下取地址组件的***元素。 太棒了。谢谢你。非常感谢您的帮助 @ManasMukherjee,可以返回 org.apache.spark.sql.Row 而不是案例类,但是您必须构建一个与行模式匹配的 org.apache.spark.sql.types.StructType 实例并将其作为第二个参数提供给 @ 987654330@。您可能应该使用有效的id 调用一次getAddress,检查返回的映射的键并将它们存储在一个数组中。然后以完全相同的顺序使用键来构造StructType 的字段并为Row 构造函数构建参数列表。 谢谢阿根,***.com/users/1374437/hristo-iliev

以上是关于如何将新列和相应的行特定值添加到火花数据帧?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Java UDF将新列添加到Spark数据帧

Pandas:使用 apply 将特定列中的行值复制到新列中

如何将具有值的新列添加到现有数据表?

如何将新列添加到按 groupby 分组的分层数据框中

如何将新列添加到 Android SQLite 数据库?

如何将特定的行和列值乘以常数来创建新列?