PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值

Posted

技术标签:

【中文标题】PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值【英文标题】:PySpark adding values to one DataFrame based on columns of 2nd DataFrame 【发布时间】:2017-08-15 23:34:34 【问题描述】:

我有两个 PySpark DataFrame,如下所示:

数据帧 A:

+-----+------+
|nodes|counts|
+-----+------+
|  [0]|     1|
|  [1]|     0|
|  [2]|     1|
|  [3]|     0|
|  [4]|     0|
|  [5]|     0|
|  [6]|     1|
|  [7]|     0|
|  [8]|     0|
|  [9]|     0|
| [10]|     0|

和 DataFrame B:

+----+------+
|nodes|counts|
+----+------+
|[0] |     1|
|[1] |     0|
|[2] |     3|
|[6] |     0|
|[8] |     2|
+----+------+

我想创建一个新的 DataFrame C,这样 DataFrame A 中的“counts”列中的值与 DataFrame B 的“counts”列中的值相加,其中“nodes”列相等,这样 DataFrame C 看起来喜欢:

+-----+------+
|nodes|counts|
+-----+------+
|  [0]|     2|
|  [1]|     0|
|  [2]|     4|
|  [3]|     0|
|  [4]|     0|
|  [5]|     0|
|  [6]|     1|
|  [7]|     0|
|  [8]|     2|
|  [9]|     0|
| [10]|     0|

感谢您的帮助!我已经使用 lambda 函数和 sql 语句尝试了一些不同的技巧,但我没有找到解决方案。

【问题讨论】:

【参考方案1】:

可能有一种更有效的方法,但这应该可行:

import pyspark.sql.functions as func

dfA = spark.createDataFrame([([0], 1),([1], 0),([2], 1),([3], 0), ([4], 0),([5], 0),([6], 1),([7], 0), ([8], 0),([9], 0),([10], 0)], ["nodes", "counts"])
dfB = spark.createDataFrame([([0], 1),([1], 0),([2], 3),([6], 0), ([8], 2)], ["nodes", "counts"])

dfC = dfA.join(dfB, dfA.nodes == dfB.nodes, "left")\
    .withColumn("sum",func.when(dfB.nodes.isNull(), dfA.counts).otherwise(dfA.counts+ dfB.counts))\
    .select(dfA.nodes.alias("nodes"), func.col("sum").alias("counts"))

dfC.orderBy("nodes").show()
+-----+------+
|nodes|counts|
+-----+------+
|  [0]|     2|
|  [1]|     0|
|  [2]|     4|
|  [3]|     0|
|  [4]|     0|
|  [5]|     0|
|  [6]|     1|
|  [7]|     0|
|  [8]|     2|
|  [9]|     0|
| [10]|     0|
+-----+------+

【讨论】:

【参考方案2】:

您可以join这两个数据框如下,并将null替换为0并添加两列以获得sum

A.join(B.withColumnRenamed("count", "countB"), Seq("nodes"), "left")
  .na.fill(0)
  .withColumn("count", $"count" + $"countB")
  .drop("countB")
  .show(false)

您还可以使用 union 将这些数据框合并为单个,然后使用 groupBy 节点并计算 sum 如下

A.union(B).groupBy("nodes").agg(sum($"count").alias("count"))
  .orderBy("nodes")
  .show(false)

这是在 scala 中,希望你可以在 pyspark 中编写。

希望这会有所帮助!

【讨论】:

以上是关于PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值的主要内容,如果未能解决你的问题,请参考以下文章

为每组 pyspark RDD/dataframe 选择随机列

Pyspark Dataframe - 如何过滤掉另一个数据框中匹配的记录?

PySpark:是不是有可能根据非 Null 值创建动态数量的 DataFrame

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?

如何根据pyspark中的条件组合dataFrame中的行

如何在PySpark中调用python函数?