火花数据框将每个ID的多条记录减少到最常见的值[重复]

Posted

技术标签:

【中文标题】火花数据框将每个ID的多条记录减少到最常见的值[重复]【英文标题】:spark dataframe reducing multiple records per id to just one by most frequent value [duplicate] 【发布时间】:2020-07-06 02:04:46 【问题描述】:

给定如下表格:

+--+------------------+-----------+
|id|     diagnosis_age|  diagnosis|
+--+------------------+-----------+
| 1|2.1843037179180302| 315.320000|
| 1|  2.80033330216659| 315.320000|
| 1|   2.8222365762732| 315.320000|
| 1|  5.64822705794013| 325.320000|
| 1| 5.686557787521759| 335.320000|
| 2|  5.70572315231258| 315.320000|
| 2| 5.724888517103389| 315.320000|
| 3| 5.744053881894209| 315.320000|
| 3|5.7604813374292005| 315.320000|
| 3|  5.77993740687426| 315.320000|
+--+------------------+-----------+

我正在尝试通过对该 id 进行最频繁的诊断来将每个 id 的记录减少到一个。

如果它是一个 rdd,类似的东西会这样做:

rdd.map(lambda x: (x["id"], [(x["diagnosis_age"], x["diagnosis"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

在 sql 中:

select id, diagnosis, diagnosis_age
from (select id, diagnosis, diagnosis_age, count(*) as cnt,
             row_number() over (partition by id order by count(*) desc) as seqnum
      from t
      group by id, diagnosis, age
     ) da
where seqnum = 1;

想要的输出:

+--+------------------+-----------+
|id|     diagnosis_age|  diagnosis|
+--+------------------+-----------+
| 1|2.1843037179180302| 315.320000|
| 2|  5.70572315231258| 315.320000|
| 3| 5.744053881894209| 315.320000|
+--+------------------+-----------+

如果可能,我如何仅使用 spark 数据帧操作来实现相同的效果?特别是不使用任何 rdd 操作/sql。

谢谢

【问题讨论】:

如果我错了请纠正我,你想要每个 id 的诊断年龄的最小值和每个 id 的最常见的诊断年龄? @Mohammad Murtaza Hashmi 我只想要每个 id 最频繁的诊断,无论诊断年龄如何,我只是假设在示例表中也会返回最短诊断年龄记录。 这能回答你的问题吗? How to select the first row of each group? 【参考方案1】:

Python:这是我的scala代码的转换。

from pyspark.sql.functions import col, first, count, desc, row_number
from pyspark.sql import Window

df.groupBy("id", "diagnosis").agg(first(col("diagnosis_age")).alias("diagnosis_age"), count(col("diagnosis_age")).alias("cnt")) \
  .withColumn("seqnum", row_number().over(Window.partitionBy("id").orderBy(col("cnt").desc()))) \
  .where("seqnum = 1") \
  .select("id", "diagnosis_age", "diagnosis", "cnt") \
  .orderBy("id") \
  .show(10, False)

Scala:您的查询对我来说没有意义。 groupBy 条件导致记录的计数始终为 1。我在数据框表达式中做了一些修改,例如

import org.apache.spark.sql.expressions.Window

df.groupBy("id", "diagnosis").agg(first(col("diagnosis_age")).as("diagnosis_age"), count(col("diagnosis_age")).as("cnt"))
  .withColumn("seqnum", row_number.over(Window.partitionBy("id").orderBy(col("cnt").desc)))
  .where("seqnum = 1")
  .select("id", "diagnosis_age", "diagnosis", "cnt")
  .orderBy("id")
  .show(false)

结果在哪里:

+---+------------------+---------+---+
|id |diagnosis_age     |diagnosis|cnt|
+---+------------------+---------+---+
|1  |2.1843037179180302|315.32   |3  |
|2  |5.70572315231258  |315.32   |2  |
|3  |5.744053881894209 |315.32   |3  |
+---+------------------+---------+---+

【讨论】:

我实际上无法运行您的代码,我将 .as 更改为 .alias,并在将代码添加到新行的位置添加了 \,但出现与 row_number 相关的错误:NameError: name 'row_number' 未定义,当我将 row_number 修改为 F.row_number 时,由于 import pyspark.sql.functions as FI get: AttributeError: 'function' object has no attribute 'over'。这是否与不同版本有关,因为我使用的是 1.6? @mad-a,对不起,这是一个scala代码,我会更新python代码。【参考方案2】:

您可以将countmaxfirst窗口函数一起使用并过滤count=max

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id","diagnosis").orderBy("diagnosis_age")
w2=Window().partitionBy("id")
df.withColumn("count", F.count("diagnosis").over(w))\
  .withColumn("max", F.max("count").over(w2))\
  .filter("count=max")\
  .groupBy("id").agg(F.first("diagnosis_age").alias("diagnosis_age"),F.first("diagnosis").alias("diagnosis"))\
  .orderBy("id").show()

+---+------------------+---------+
| id|     diagnosis_age|diagnosis|
+---+------------------+---------+
|  1|2.1843037179180302|   315.32|
|  2|  5.70572315231258|   315.32|
|  3| 5.744053881894209|   315.32|
+---+------------------+---------+

【讨论】:

虽然您的代码可以运行,但我认为它不会将每个 id 的记录减少到 1 条,从而使每个 id 都不同。当我运行时:df.select("id").distinct().count() 我得到 154957,当我在你的输出上运行 count() 时我得到 240438。 @mad-a 我明白了,我根据您的反馈更新了解决方案。如果你试试,请告诉我。

以上是关于火花数据框将每个ID的多条记录减少到最常见的值[重复]的主要内容,如果未能解决你的问题,请参考以下文章

根据 id 列表为多条记录插入相同的值

PHP 批量修改多条记录的Sql语句写法

火花数据框用其他行的值替换值

限制火花上下文中的记录数量

如何让组合框将正确的值传递给表?

从自定义数据格式创建火花数据框