合并多个 PySpark DataFrame 行以将基于事件的数据转换为基于人员的数据

Posted

技术标签:

【中文标题】合并多个 PySpark DataFrame 行以将基于事件的数据转换为基于人员的数据【英文标题】:Merging Multiple PySpark DataFrame rows to convert from event based to person based data 【发布时间】:2017-05-22 14:23:28 【问题描述】:

假设我有一个基于事件的顺序的 DataFrame。基本上每次发生事情时,我都会收到一个新事件,说有人改变了位置或工作。以下是示例输入的样子:

+--------+----+----------------+---------------+
|event_id|name|             job|       location|
+--------+----+----------------+---------------+
|      10| Bob|         Manager|               |
|       9| Joe|                |             HQ|
|       8| Tim|                |New York Office|
|       7| Joe|                |New York Office|
|       6| Joe| Head Programmer|               |
|       5| Bob|                |      LA Office|
|       4| Tim|         Manager|             HQ|
|       3| Bob|                |New York Office|
|       2| Bob|DB Administrator|             HQ|
|       1| Joe|      Programmer|             HQ|
+--------+----+----------------+---------------+

在本例中,10 是最新事件,1 是最旧事件。现在我想获得关于每个人的最新信息。这是我想要的输出:

+----+---------------+---------------+
|name|            job|       location|
+----+---------------+---------------+
| Bob|        Manager|      LA Office|
| Joe|Head Programmer|             HQ|
| Tim|        Manager|New York Office|
+----+---------------+---------------+

我目前进行这种重组的方式是收集数据,然后循环浏览事件,从最新到最旧,以便找到每个人的信息。这种方法的问题在于,对于大型 DataFrame 来说速度非常慢,并且最终无法全部放入一台计算机的内存中。使用 spark 执行此操作的正确方法是什么?

【问题讨论】:

类似this 【参考方案1】:

根据你的问题,我认为这就是你想要的

 val spark =
    SparkSession.builder().master("local").appName("test").getOrCreate()

  import spark.implicits._

  val data = spark.sparkContext.parallelize(
    Seq(
      (10, "Bob", "Manager", ""),
      (9, "Joe", "", "HQ"),
      (8, "Tim", "", "New York Office"),
      (7, "Joe", "", "New York Office"),
      (6, "Joe", "Head Programmer", ""),
      (5, "Bob", "", "LA Office"),
      (4, "Tim", "Manager", "HQ"),
      (3, "Bob", "", "New York Office"),
      (2, "Bob", "DB Administrator", "HQ"),
      (1, "Joe", "Programmer", "HQ")
    )).toDF("event_id", "name", "job", "location")

  val latest = data.groupBy("name").agg(max(data("event_id")).alias("event_id"))

  latest.join(data, "event_id").drop("event_id").show

这是一个scala代码,希望你能用Python转换它

【讨论】:

这并不能解决问题。这将为我提供最新的参考,但它不会获得所有更新的字段。我想要每个人的最新工作和位置,而不是最新记录。

以上是关于合并多个 PySpark DataFrame 行以将基于事件的数据转换为基于人员的数据的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 如何将转换后的列与原始 DataFrame 合并?

Pyspark Dataframe 通过消除空值来合并行

pyspark列合并为一行

Pandas - 合并 DataFrame 中的行 [重复]

在 PySpark DataFrame 中添加多个空列

通过列 [PySpark] 连接两个 DataFrame