使用 pyspark 进行多次连续连接
Posted
技术标签:
【中文标题】使用 pyspark 进行多次连续连接【英文标题】:Multiple consecutive join with pyspark 【发布时间】:2016-07-19 21:08:51 【问题描述】:我正在尝试将多个 DF 连接在一起。因为 join 是如何工作的,所以我得到了相同的列名。
在 (K, V) 和 (K, W) 类型的数据集上调用时,返回一个数据集 (K, (V, W)) 对与每个键的所有元素对。
# Join Min and Max to S1
joinned_s1 = (minTime.join(maxTime, minTime["UserId"] == maxTime["UserId"]))
# Join S1 and sum to s2
joinned_s2 = (joinned_s1.join(sumTime, joinned_s1["UserId"] == sumTime["UserId"]))
我收到此错误:“”引用'UserId'不明确,可能是:UserId#1578,UserId#3014.;”
一旦成功加入,从我的数据集中删除 W 的正确方法是什么?
【问题讨论】:
【参考方案1】:您可以使用等值连接:
minTime.join(maxTime, ["UserId"]).join(sumTime, ["UserId"])
别名:
minTime.alias("minTime").join(
maxTime.alias("maxTime"),
col("minTime.UserId") == col("maxTime.UserId")
)
或参考父表:
(minTime
.join(maxTime, minTime["UserId"] == maxTime["UserId"])
.join(sumTime, minTime["UserId"] == sumTime["UserId"]))
作为旁注,您引用的是 RDD
文档,而不是 DataFrame
文档。这些是不同的数据结构,操作方式不同。
而且看起来你在这里做了一些奇怪的事情。假设您有一个单独的父表 min
、max
和 sum
可以计算为没有 join
的简单聚合。
【讨论】:
谢谢!那行得通。 @ zero323 - 我试过类似 dfg = df.groupBy(df.UserId).agg("Timer": "sum","Timer": "min","Timer": "max","Timer" : "avg", "Actions": "count") 但只有 AVG 出现......因此使用了连接。有什么建议吗? 创建了一个新线程来解决上述问题:***.com/questions/38488817/…【参考方案2】:如果您在列上连接两个数据框,则列将被复制。所以尝试使用数组或字符串来连接两个或多个数据帧。
例如,如果加入列:
df = left.join(right, left.name == right.name)
输出将包含带有“名称”的两列。
现在如果你使用:
df = left.join(right, "name") OR df=left.join(right,["name"])
那么输出不会有重复的列。
【讨论】:
以上是关于使用 pyspark 进行多次连续连接的主要内容,如果未能解决你的问题,请参考以下文章
SpringSecurity3无法拦截连续多次点击出起的请求
使用电脑登陆CMCC,输入账号和密码后总是出现‘您已达到多终端上限限额’是啥原因连续登陆好多次都失败,