Spark 数据框 Scala 棘手

Posted

技术标签:

【中文标题】Spark 数据框 Scala 棘手【英文标题】:Spark dataframe Scala tricky 【发布时间】:2019-02-07 16:43:52 【问题描述】:

数据框 1

Person  Work_order  email  
P1  111 123@gmail.com  
P2  222 123@gmail.com   
P3  111 123@gmail.com   
P4  -1  123@gmail.com   
P5  444 999@gmail.com   

val person = Seq(
 ("P1", "111", "123@gmail.com"),
("P2", "222", "123@gmail.com"),
("P3", "111", "123@gmail.com"),
("P4", "-1", "123@gmail.com"),
("P5", "444", "999@gmail.com")).toDF("person", "work_order_person", "email_person")

数据框 2

Work_order  email   
111 123@gmail.com   
222 123@gmail.com    
444 999@gmail.com 

val workOrder = Seq(
("111", "123@gmail.com"),
("222", "123@gmail.com"),
("444", "999@gmail.com")).toDF("work_order", "email")

输出

Work_order email Count_excluding_the_self_work_order_id111 123@gmail.com 2222 123@gmail.com 3444 999@gmail.com 0

我想创建一个与上面相同的输出。例如,对于第一行:计数应不包括转储 [111] 中存在的自我工作订单 ID。此外,我们不需要计算工单 id 444,因为它有不同的电子邮件地址。提前感谢您的帮助。

【问题讨论】:

想解释一下输出的逻辑吗?我不太明白 @mrbolichi 我已经更新了问题中的更多解释。 【参考方案1】:

使用 Spark 2.4 filter() 函数。由于应该排除 444,我在 SQL 中已经提到过。希望,步骤是解释性的

scala> val person = Seq(
     |  ("P1", "111", "123@gmail.com"),
     | ("P2", "222", "123@gmail.com"),
     | ("P3", "111", "123@gmail.com"),
     | ("P4", "-1", "123@gmail.com"),
     | ("P5", "444", "999@gmail.com")).toDF("person", "work_order_person", "email_person")
person: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 1 more field]

scala> val workOrder = Seq(
     | ("111", "123@gmail.com"),
     | ("222", "123@gmail.com"),
     | ("444", "999@gmail.com")).toDF("work_order", "email")
workOrder: org.apache.spark.sql.DataFrame = [work_order: string, email: string]

scala> val person_grp = person.groupBy().agg( collect_list('work_order_person) as "wo_group" )
person_grp: org.apache.spark.sql.DataFrame = [wo_group: array<string>]

scala> person.crossJoin(person_grp).show(false)
+------+-----------------+-------------+------------------------+
|person|work_order_person|email_person |wo_group                |
+------+-----------------+-------------+------------------------+
|P1    |111              |123@gmail.com|[111, 222, 111, -1, 444]|
|P2    |222              |123@gmail.com|[111, 222, 111, -1, 444]|
|P3    |111              |123@gmail.com|[111, 222, 111, -1, 444]|
|P4    |-1               |123@gmail.com|[111, 222, 111, -1, 444]|
|P5    |444              |999@gmail.com|[111, 222, 111, -1, 444]|
+------+-----------------+-------------+------------------------+


scala> val df = person.crossJoin(person_grp)
df: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 2 more fields]

scala> df.createOrReplaceTempView("ansip")

scala> spark.sql(" select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip ").show(false)
+------+-----------------+-------------------+
|person|work_order_person|res1               |
+------+-----------------+-------------------+
|P1    |111              |[222, -1]          |
|P2    |222              |[111, 111, -1]     |
|P3    |111              |[222, -1]          |
|P4    |-1               |[111, 222, 111]    |
|P5    |444              |[111, 222, 111, -1]|
+------+-----------------+-------------------+


scala> workOrder.createOrReplaceTempView("wo_tab")

scala> val df2 = spark.sql(" with t1 (select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip) select work_order_person
,res1 from t1 where work_order_person!=444 group by work_order_person, res1  ")
df2: org.apache.spark.sql.DataFrame = [work_order_person: string, res1: array<string>]

scala> df2.show(false)
+-----------------+---------------+
|work_order_person|res1           |
+-----------------+---------------+
|111              |[222, -1]      |
|222              |[111, 111, -1] |
|-1               |[111, 222, 111]|
+-----------------+---------------+

scala> df2.createOrReplaceTempView("ansib2")

scala> spark.sql(" select work_order, email, case when size(res1)>0 then size(res1) else 0 end res2 from wo_tab left join ansib2 on work_order=work_order_person ").show
(false)
+----------+-------------+----+
|work_order|email        |res2|
+----------+-------------+----+
|111       |123@gmail.com|2   |
|222       |123@gmail.com|3   |
|444       |999@gmail.com|0   |
+----------+-------------+----+


scala>

【讨论】:

感谢您的回答。万分感激。我可以看到 x!=444 在这里是硬编码的。这可能不适用于不同的数据集。寻找需要与 spark 2.2 一起使用的通用代码。但绝对感谢您的努力。 person.withColumn("wo_group", collect_list("work_order_person").over(Window.partitionBy("email_person"))).orderBy($"person") show(false)。有了这个,我可以对工作订单进行分组。现在我必须从收集的组中删除自我工作订单 ID。例如:111 工作订单 ID 需要从此集中删除。 [111、222、111、-1]。请帮忙。 您的问题没有提供以通用方式排除 444 的清晰度,所以我已经对其进行了硬编码。即使我们用 spark 2.2 编写它,它也将是 udf 并且必须进行硬编码。 . 请添加更多信息

以上是关于Spark 数据框 Scala 棘手的主要内容,如果未能解决你的问题,请参考以下文章

使用数据框的子集和 spark/scala 中的两个特定字段过滤数据框 [关闭]

如何在 Zeppelin/Spark/Scala 中漂亮地打印数据框?

使用 spark scala 向空数据框添加一行

Scala(Spark)连接数据框中的列[重复]

在 scala spark 数据框中提取时间间隔

数据框 Spark scala 爆炸 json 数组