使用 pyspark 进行多次连续连接

Posted

技术标签:

【中文标题】使用 pyspark 进行多次连续连接【英文标题】:Multiple consecutive join with pyspark 【发布时间】:2016-07-19 21:08:51 【问题描述】:

我正在尝试将多个 DF 连接在一起。因为 join 是如何工作的,所以我得到了相同的列名。

在 (K, V) 和 (K, W) 类型的数据集上调用时,返回一个数据集 (K, (V, W)) 对与每个键的所有元素对。

# Join Min and Max to S1
joinned_s1 = (minTime.join(maxTime, minTime["UserId"] == maxTime["UserId"]))

# Join S1 and sum to s2
joinned_s2 = (joinned_s1.join(sumTime, joinned_s1["UserId"] == sumTime["UserId"]))

我收到此错误:“”引用'UserId'不明确,可能是:UserId#1578,UserId#3014.;”

一旦成功加入,从我的数据集中删除 W 的正确方法是什么?

【问题讨论】:

【参考方案1】:

您可以使用等值连接:

 minTime.join(maxTime, ["UserId"]).join(sumTime, ["UserId"])

别名:

minTime.alias("minTime").join(
    maxTime.alias("maxTime"), 
    col("minTime.UserId") == col("maxTime.UserId")
)

或参考父表:

(minTime
  .join(maxTime, minTime["UserId"] == maxTime["UserId"])
  .join(sumTime, minTime["UserId"] == sumTime["UserId"]))

作为旁注,您引用的是 RDD 文档,而不是 DataFrame 文档。这些是不同的数据结构,操作方式不同。

而且看起来你在这里做了一些奇怪的事情。假设您有一个单独的父表 minmaxsum 可以计算为没有 join 的简单聚合。

【讨论】:

谢谢!那行得通。 @ zero323 - 我试过类似 dfg = df.groupBy(df.UserId).agg("Timer": "sum","Timer": "min","Timer": "max","Timer" : "avg", "Actions": "count") 但只有 AVG 出现......因此使用了连接。有什么建议吗? 创建了一个新线程来解决上述问题:***.com/questions/38488817/…【参考方案2】:

如果您在列上连接两个数据框,则列将被复制。所以尝试使用数组或字符串来连接两个或多个数据帧。

例如,如果加入列:

df = left.join(right, left.name == right.name)

输出将包含带有“名称”的两列。

现在如果你使用:

df = left.join(right, "name") OR df=left.join(right,["name"])

那么输出不会有重复的列。

【讨论】:

以上是关于使用 pyspark 进行多次连续连接的主要内容,如果未能解决你的问题,请参考以下文章

带有点“。”的数据框的 pyspark 访问列

Pyspark:使用 udf 多次加载模型

如何在pyspark中将GUID转换为整数

如何更改pyspark中的列元数据?

Pyspark 有条件的累积和

我们如何在 pyspark 的不同模块中使用相同的连接数据框用法