如何将新列和相应的行特定值添加到火花数据帧?
Posted
技术标签:
【中文标题】如何将新列和相应的行特定值添加到火花数据帧?【英文标题】:How to add new columns and the corresponding row specific values to a spark dataframe? 【发布时间】:2020-02-27 12:46:25 【问题描述】:我是 Scala/Spark 世界的新手。
我有一个名为 person
的 spark 数据集(带有案例类的 df)。
scala> val person_with_contact = person.map(r => (
| r.id,
| r.name,
| r.age
| )).toDF()
现在,我想向该数据集的每条记录添加一个地址属性列表(如 apt_no、street、city、zip)。获取地址属性列表,我有一个函数,它将人员的 id 作为输入并返回一个包含所有地址属性及其对应值的映射。
我尝试了以下方法和其他一些 Stack Overflow 建议的方法,但我还不能解决它。 (参考 - 静态 col ex - Spark, add new Column with the same value in Scala)
scala> val person_with_contact = person.map(r => (
| r.id,
| r.name,
| r.age,
| getAddress(r.id)
| )).toDF()
最终的数据框应具有以下列。
id, name, age, apt_no, street, city, zip
【问题讨论】:
这能回答你的问题吗? Spark Build Custom Column Function, user defined function @Shaido,感谢您的回复。我已经有一个 UDF 函数。我不确定如何从这个 UDF 返回地址属性列表,以便将它们作为单独的列添加到新数据帧中。 @HristoIliev,感谢您的回复。每个人只有一个地址,由 4 个属性表示。我有一个 UDF 函数,它将一个人的 id 作为输入并将 4 个属性作为地图返回。我想用地址字段加入id, name, age
,即。 apt_no, street, city, zip
。最后,它应该是一个包含所有 7 个属性的单个数据框。
@ManasMukherjee,在第二次阅读您的问题时,我得知您正在添加 属性列表,这就是我删除评论的原因。 person
是 DataFrame 还是 RDD?
person
是使用 case class
创建的数据集,其中 id、name 和 age 作为属性。
【参考方案1】:
使用 udf
package yourpackage
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object MainDemo
def getAddress(id: Int): String =
//do your things
"address id:" + id
def getCity(id: String): String =
//do your things
"your city :" + id
def getZip(id: String): String =
//do your things
"your zip :" + id
def main(args: Array[String]): Unit =
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[3]").enableHiveSupport().getOrCreate()
val person = Seq(Person(1, "name_m", 21), Person(2, "name_w", 40))
import spark.implicits._
val person_with_contact = person.map(r => (r.id, r.name, r.age, getAddress(r.id))).toDF("id", "name", "age", "street")
person_with_contact.printSchema()
//root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
// |-- street: string (nullable = true)
val result = person_with_contact.select(
col("id"),
col("name"),
col("age"),
col("street"),
udf id: String =>
val city = getCity(id)
city
.apply(col("id")).as("city"),
udf id: String =>
val city = getZip(id)
city
.apply(col("id")).as("zip")
)
result.printSchema()
//root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
// |-- street: string (nullable = true)
// |-- city: string (nullable = true)
// |-- zip: string (nullable = true)
result.show()
//+---+------+---+------------+------------+-----------+
//| id| name|age| street| city| zip|
//+---+------+---+------------+------------+-----------+
//| 1|name_m| 21|address id:1|your city :1|your zip :1|
//| 2|name_w| 40|address id:2|your city :2|your zip :2|
//+---+------+---+------------+------------+-----------+
【讨论】:
感谢您撰写分享自定义代码和解决方案。与其为地址的每个组件维护单独的 UDF,我更愿意像 Hristo 共享的那样拥有一个更统一的 UDF。我今天学了些新东西。再次感谢。【参考方案2】:鉴于您已经有一个将地址作为地图返回的函数,您可以创建一个 UDF,将该地图转换为结构,然后选择所有地图字段:
import org.apache.spark.sql.functions.*
// For demo only
def getAddress(id: Int): Option[Map[String, String]] =
id match
case 1 => Some(Map("apt_no" -> "12", "street" -> "Main Street", "city" -> "NY", "zip" -> "1234"))
case 2 => Some(Map("apt_no" -> "1", "street" -> "Back Street", "city" -> "Gotham", "zip" -> "G123"))
case _ => None
case class Address(apt_no: String, street: String, city: String, zip: String)
def getAddressUdf = udf((id: Int) =>
getAddress(id) map (m =>
Address(m("apt_no"), m("street"), m("city"), m("zip"))
)
)
udf()
将返回案例类实例的函数转换为返回具有相应架构的结构列的 UDF。 Option[_]
返回类型会自动解析为 null
-able 数据类型。然后可以通过$"struct_col_name.*"
选择将结构列的字段扩展为多个列:
scala> val df = Seq(Person(1, "John", 32), Person(2, "Cloe", 27), Person(3, "Pete", 55)).toDS()
df: org.apache.spark.sql.Dataset[Person] = [id: int, name: string ... 1 more field]
scala> df.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 1|John| 32|
| 2|Cloe| 27|
| 3|Pete| 55|
+---+----+---+
scala> df
| .withColumn("addr", getAddressUdf($"id"))
| .select($"id", $"name", $"age", $"addr.*")
| .show()
+---+----+---+------+------------+------+-----+
| id|name|age|apt_no| street| city| zip|
+---+----+---+------+------------+------+-----+
| 1|John| 32| 12| Main Street| NY| 1234|
| 2|Cloe| 27| 1| Back Street|Gotham| G123|
| 3|Pete| 55| null| null| null| null|
+---+----+---+------+------------+------+-----+
【讨论】:
一个相关的qts。有没有办法在不知道地址结构的情况下生成 7 列?就我而言,我不控制地址的元素。客户端可以从地址中添加/删除属性。我只是想在不知道地址结构的情况下取地址组件的***元素。 太棒了。谢谢你。非常感谢您的帮助 @ManasMukherjee,可以返回org.apache.spark.sql.Row
而不是案例类,但是您必须构建一个与行模式匹配的 org.apache.spark.sql.types.StructType
实例并将其作为第二个参数提供给 @ 987654330@。您可能应该使用有效的id
调用一次getAddress
,检查返回的映射的键并将它们存储在一个数组中。然后以完全相同的顺序使用键来构造StructType
的字段并为Row
构造函数构建参数列表。
谢谢阿根,***.com/users/1374437/hristo-iliev以上是关于如何将新列和相应的行特定值添加到火花数据帧?的主要内容,如果未能解决你的问题,请参考以下文章