依赖于公共列的两个数据框之间的交叉连接

Posted

技术标签:

【中文标题】依赖于公共列的两个数据框之间的交叉连接【英文标题】:Crossjoin between two dataframes that is dependent on a common column 【发布时间】:2019-04-19 12:43:17 【问题描述】:

可以按如下方式进行交叉连接:

df1 = pd.DataFrame('subgroup':['A','B','C','D'])
df2 = pd.DataFrame('dates':pd.date_range(date_today, date_today + timedelta(3), freq='D'))
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)

sdf1.crossJoin(sdf2).toPandas()

在此示例中,有两个数据框,每个数据框包含 4 行,最后我得到 16 行。

但是,对于我的问题,我想对每个用户进行交叉连接,并且用户是两个数据框中的另一列,例如:

df1 = pd.DataFrame('user':[1,1,1,1,2,2,2,2],'subgroup':['A','B','C','D','A','B','D','E'])
df2 = pd.DataFrame('user':[1,1,1,1,2,2,2,2],'dates':np.hstack([np.array(pd.date_range(date_today, date_today + timedelta(3), freq='D')),np.array(pd.date_range(date_today+timedelta(1), date_today + timedelta(4), freq='D'))]))

应用 per-user crossJoin 的结果应该是一个 32 行的数据框。这在 pyspark 中是否可行?如何做到这一点?

【问题讨论】:

将每个数据帧过滤成两个,每个用户一个,交叉连接对应的子数据帧(得到2个数据帧,每个16行)然后合并两个交叉连接的数据帧 @sramalingam24 ,我应该提到这应该推广到 n 个用户(即超过 2 个) 那么你将不得不编写一个函数,在用户 ID 的范围内执行此操作,最好是在 map (filter&join) - reduce (union) 方式中 在一系列用户 ID 上执行此操作确实是一个解决方案,如果这可以通过 map reduce 方式完成的话。如何做到这一点? 如果您可以将您的问题减少到最小的实例并提供示例数据和示例输出,以便我们准确了解您想要实现的目标,我会非常有帮助。 【参考方案1】:

交叉连接是一种生成行相乘的连接,因为连接键不能唯一标识行(在我们的例子中,连接键是微不足道的,或者根本没有连接键)

让我们从示例数据框开始:

import pyspark.sql.functions as psf
import pyspark.sql.types as pst
df1 = spark.createDataFrame(
    [[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())], 
    schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value1']]))
df2 = spark.createDataFrame(
    [[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())], 
    schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value2']]))

        +----+------+
        |user|value1|
        +----+------+
        |   0|    76|
        |   1|    59|
        |   0|    14|
        |   1|    71|
        |   0|    66|
        |   1|    61|
        |   0|     2|
        |   1|    22|
        |   0|    16|
        |   1|    83|
        +----+------+

        +----+------+
        |user|value2|
        +----+------+
        |   0|    65|
        |   1|    81|
        |   0|    60|
        |   1|    69|
        |   0|    21|
        |   1|    61|
        |   0|    98|
        |   1|    76|
        |   0|    40|
        |   1|    21|
        +----+------+

让我们尝试在常量列上连接数据框,以查看在常量(平凡)列上交叉连接和常规连接之间的等价性:

df = df1.withColumn('key', psf.lit(1)) \
    .join(df2.withColumn('key', psf.lit(1)), on=['key'])

我们从 spark > 2 得到一个错误,因为它意识到我们正在尝试进行交叉连接(笛卡尔积)

Py4JJavaError:调用 o1865.showString 时出错。 :org.apache.spark.sql.AnalysisException:检测到逻辑计划之间INNER连接的隐式笛卡尔积 逻辑RDD [user#1538, value1#1539], false 和 逻辑RDD [user#1542, value2#1543], false 连接条件缺失或微不足道。 要么:使用 CROSS JOIN 语法来允许这些之间的笛卡尔积 关系,或者:通过设置配置启用隐式笛卡尔积 变量 spark.sql.crossJoin.enabled=true;

如果您的加入键(此处为user)不是唯一标识行的列,您也会得到行的乘法,但在每个user 组内:

df = df1.join(df2, on='user')
print("Number of rows : \tdf1:  \tdf2:  \tdf: ".format(df1.count(), df2.count(), df.count()))

        Number of rows :    df1: 10     df2: 10     df: 50

        +----+------+------+
        |user|value1|value2|
        +----+------+------+
        |   1|    59|    81|
        |   1|    59|    69|
        |   1|    59|    61|
        |   1|    59|    76|
        |   1|    59|    21|
        |   1|    71|    81|
        |   1|    71|    69|
        |   1|    71|    61|
        |   1|    71|    76|
        |   1|    71|    21|
        |   1|    61|    81|
        |   1|    61|    69|
        |   1|    61|    61|
        |   1|    61|    76|
        |   1|    61|    21|
        |   1|    22|    81|
        |   1|    22|    69|
        |   1|    22|    61|
        |   1|    22|    76|
        |   1|    22|    21|
        +----+------+------+

用户 0 5 * 5 行 + 用户 1 5 * 5 行,因此为 50

注意:使用self join 后跟filter 通常意味着您应该使用窗口函数

【讨论】:

以上是关于依赖于公共列的两个数据框之间的交叉连接的主要内容,如果未能解决你的问题,请参考以下文章

如何在没有直接连接列的两个数据框之间找到最接近的匹配行?

熊猫两个数据框交叉连接[重复]

Spark数据框左连接应在右侧添加默认行而不是null的连接

使用公共列映射两个数据框

连接/加入/合并两个缺少一列的数据框

错题纠正