Spark 数据框 Scala 棘手
Posted
技术标签:
【中文标题】Spark 数据框 Scala 棘手【英文标题】:Spark dataframe Scala tricky 【发布时间】:2019-02-07 16:43:52 【问题描述】:数据框 1
Person Work_order email
P1 111 123@gmail.com
P2 222 123@gmail.com
P3 111 123@gmail.com
P4 -1 123@gmail.com
P5 444 999@gmail.com
val person = Seq(
("P1", "111", "123@gmail.com"),
("P2", "222", "123@gmail.com"),
("P3", "111", "123@gmail.com"),
("P4", "-1", "123@gmail.com"),
("P5", "444", "999@gmail.com")).toDF("person", "work_order_person", "email_person")
数据框 2
Work_order email
111 123@gmail.com
222 123@gmail.com
444 999@gmail.com
val workOrder = Seq(
("111", "123@gmail.com"),
("222", "123@gmail.com"),
("444", "999@gmail.com")).toDF("work_order", "email")
输出
Work_order email Count_excluding_the_self_work_order_id
111 123@gmail.com 2
222 123@gmail.com 3
444 999@gmail.com 0
我想创建一个与上面相同的输出。例如,对于第一行:计数应不包括转储 [111] 中存在的自我工作订单 ID。此外,我们不需要计算工单 id 444,因为它有不同的电子邮件地址。提前感谢您的帮助。
【问题讨论】:
想解释一下输出的逻辑吗?我不太明白 @mrbolichi 我已经更新了问题中的更多解释。 【参考方案1】:使用 Spark 2.4 filter() 函数。由于应该排除 444,我在 SQL 中已经提到过。希望,步骤是解释性的
scala> val person = Seq(
| ("P1", "111", "123@gmail.com"),
| ("P2", "222", "123@gmail.com"),
| ("P3", "111", "123@gmail.com"),
| ("P4", "-1", "123@gmail.com"),
| ("P5", "444", "999@gmail.com")).toDF("person", "work_order_person", "email_person")
person: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 1 more field]
scala> val workOrder = Seq(
| ("111", "123@gmail.com"),
| ("222", "123@gmail.com"),
| ("444", "999@gmail.com")).toDF("work_order", "email")
workOrder: org.apache.spark.sql.DataFrame = [work_order: string, email: string]
scala> val person_grp = person.groupBy().agg( collect_list('work_order_person) as "wo_group" )
person_grp: org.apache.spark.sql.DataFrame = [wo_group: array<string>]
scala> person.crossJoin(person_grp).show(false)
+------+-----------------+-------------+------------------------+
|person|work_order_person|email_person |wo_group |
+------+-----------------+-------------+------------------------+
|P1 |111 |123@gmail.com|[111, 222, 111, -1, 444]|
|P2 |222 |123@gmail.com|[111, 222, 111, -1, 444]|
|P3 |111 |123@gmail.com|[111, 222, 111, -1, 444]|
|P4 |-1 |123@gmail.com|[111, 222, 111, -1, 444]|
|P5 |444 |999@gmail.com|[111, 222, 111, -1, 444]|
+------+-----------------+-------------+------------------------+
scala> val df = person.crossJoin(person_grp)
df: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 2 more fields]
scala> df.createOrReplaceTempView("ansip")
scala> spark.sql(" select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip ").show(false)
+------+-----------------+-------------------+
|person|work_order_person|res1 |
+------+-----------------+-------------------+
|P1 |111 |[222, -1] |
|P2 |222 |[111, 111, -1] |
|P3 |111 |[222, -1] |
|P4 |-1 |[111, 222, 111] |
|P5 |444 |[111, 222, 111, -1]|
+------+-----------------+-------------------+
scala> workOrder.createOrReplaceTempView("wo_tab")
scala> val df2 = spark.sql(" with t1 (select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip) select work_order_person
,res1 from t1 where work_order_person!=444 group by work_order_person, res1 ")
df2: org.apache.spark.sql.DataFrame = [work_order_person: string, res1: array<string>]
scala> df2.show(false)
+-----------------+---------------+
|work_order_person|res1 |
+-----------------+---------------+
|111 |[222, -1] |
|222 |[111, 111, -1] |
|-1 |[111, 222, 111]|
+-----------------+---------------+
scala> df2.createOrReplaceTempView("ansib2")
scala> spark.sql(" select work_order, email, case when size(res1)>0 then size(res1) else 0 end res2 from wo_tab left join ansib2 on work_order=work_order_person ").show
(false)
+----------+-------------+----+
|work_order|email |res2|
+----------+-------------+----+
|111 |123@gmail.com|2 |
|222 |123@gmail.com|3 |
|444 |999@gmail.com|0 |
+----------+-------------+----+
scala>
【讨论】:
感谢您的回答。万分感激。我可以看到 x!=444 在这里是硬编码的。这可能不适用于不同的数据集。寻找需要与 spark 2.2 一起使用的通用代码。但绝对感谢您的努力。 person.withColumn("wo_group", collect_list("work_order_person").over(Window.partitionBy("email_person"))).orderBy($"person") show(false)。有了这个,我可以对工作订单进行分组。现在我必须从收集的组中删除自我工作订单 ID。例如:111 工作订单 ID 需要从此集中删除。 [111、222、111、-1]。请帮忙。 您的问题没有提供以通用方式排除 444 的清晰度,所以我已经对其进行了硬编码。即使我们用 spark 2.2 编写它,它也将是 udf 并且必须进行硬编码。 . 请添加更多信息以上是关于Spark 数据框 Scala 棘手的主要内容,如果未能解决你的问题,请参考以下文章
使用数据框的子集和 spark/scala 中的两个特定字段过滤数据框 [关闭]