PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值
Posted
技术标签:
【中文标题】PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值【英文标题】:PySpark adding values to one DataFrame based on columns of 2nd DataFrame 【发布时间】:2017-08-15 23:34:34 【问题描述】:我有两个 PySpark DataFrame,如下所示:
数据帧 A:
+-----+------+
|nodes|counts|
+-----+------+
| [0]| 1|
| [1]| 0|
| [2]| 1|
| [3]| 0|
| [4]| 0|
| [5]| 0|
| [6]| 1|
| [7]| 0|
| [8]| 0|
| [9]| 0|
| [10]| 0|
和 DataFrame B:
+----+------+
|nodes|counts|
+----+------+
|[0] | 1|
|[1] | 0|
|[2] | 3|
|[6] | 0|
|[8] | 2|
+----+------+
我想创建一个新的 DataFrame C,这样 DataFrame A 中的“counts”列中的值与 DataFrame B 的“counts”列中的值相加,其中“nodes”列相等,这样 DataFrame C 看起来喜欢:
+-----+------+
|nodes|counts|
+-----+------+
| [0]| 2|
| [1]| 0|
| [2]| 4|
| [3]| 0|
| [4]| 0|
| [5]| 0|
| [6]| 1|
| [7]| 0|
| [8]| 2|
| [9]| 0|
| [10]| 0|
感谢您的帮助!我已经使用 lambda 函数和 sql 语句尝试了一些不同的技巧,但我没有找到解决方案。
【问题讨论】:
【参考方案1】:可能有一种更有效的方法,但这应该可行:
import pyspark.sql.functions as func
dfA = spark.createDataFrame([([0], 1),([1], 0),([2], 1),([3], 0), ([4], 0),([5], 0),([6], 1),([7], 0), ([8], 0),([9], 0),([10], 0)], ["nodes", "counts"])
dfB = spark.createDataFrame([([0], 1),([1], 0),([2], 3),([6], 0), ([8], 2)], ["nodes", "counts"])
dfC = dfA.join(dfB, dfA.nodes == dfB.nodes, "left")\
.withColumn("sum",func.when(dfB.nodes.isNull(), dfA.counts).otherwise(dfA.counts+ dfB.counts))\
.select(dfA.nodes.alias("nodes"), func.col("sum").alias("counts"))
dfC.orderBy("nodes").show()
+-----+------+
|nodes|counts|
+-----+------+
| [0]| 2|
| [1]| 0|
| [2]| 4|
| [3]| 0|
| [4]| 0|
| [5]| 0|
| [6]| 1|
| [7]| 0|
| [8]| 2|
| [9]| 0|
| [10]| 0|
+-----+------+
【讨论】:
【参考方案2】:您可以join
这两个数据框如下,并将null
替换为0
并添加两列以获得sum
A.join(B.withColumnRenamed("count", "countB"), Seq("nodes"), "left")
.na.fill(0)
.withColumn("count", $"count" + $"countB")
.drop("countB")
.show(false)
您还可以使用 union
将这些数据框合并为单个,然后使用 groupBy 节点并计算 sum
如下
A.union(B).groupBy("nodes").agg(sum($"count").alias("count"))
.orderBy("nodes")
.show(false)
这是在 scala 中,希望你可以在 pyspark 中编写。
希望这会有所帮助!
【讨论】:
以上是关于PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值的主要内容,如果未能解决你的问题,请参考以下文章
为每组 pyspark RDD/dataframe 选择随机列
Pyspark Dataframe - 如何过滤掉另一个数据框中匹配的记录?
PySpark:是不是有可能根据非 Null 值创建动态数量的 DataFrame