火花联合不按预期工作,添加新行

Posted

技术标签:

【中文标题】火花联合不按预期工作,添加新行【英文标题】:spark union doesn't work as expect, add new rows 【发布时间】:2019-08-06 02:06:06 【问题描述】:

我不能使用简单的测试集来重现这个问题,它只发生在我的数据集上。所以只能说情况。

df 有很多不同的 store_id,product_id 组,每个组有很多行。

df1 有许多不同的 store_id,product_id 组,只有一行或没有。

df 是订单历史表,我需要从中获取历史价格,并从 df1 获取当前价格。联合他们构建一条全程价格变化线。

但奇怪的是


sid = '00fbb2a6-f2de-42f1-a07b-163e3a050ddb'
pid = '66e06f08-dec2-498d-883f-24771da18358'

filtersp = lambda df: df.filter(col('store_id')==sid).filter(col('product_id')==pid)

filtersp(df).show()

+----------------+--------+----------+-----------+---+
|store_product_id|store_id|product_id|price_guide| ds|
+----------------+--------+----------+-----------+---+
+----------------+--------+----------+-----------+---+

filtersp(df1).show()

+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+


filtersp(df1).union(filtersp(df)).show()

+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+


filtersp(df1.union(df)).show()

+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+

filtersp(df.union(df1)).show()

+--------------------+--------------------+--------------------+-----------+-------------------+
|    store_product_id|            store_id|          product_id|price_guide|                 ds|
+--------------------+--------------------+--------------------+-----------+-------------------+
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
+--------------------+--------------------+--------------------+-----------+-------------------+


然后我添加一个新列来跟踪这些行的来源

df = df.withColumn('c', lit('df'))

df1 = df1.withColumn('c', lit('df1'))

filtersp(df.union(df1)).show()

+--------------------+--------------------+--------------------+-----------+-------------------+---+
|    store_product_id|            store_id|          product_id|price_guide|                 ds|  c|
+--------------------+--------------------+--------------------+-----------+-------------------+---+
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
+--------------------+--------------------+--------------------+-----------+-------------------+---+

查找来自 df1 的行。

我不明白filtersp(df.union(df1)).show()在什么情况下会显示结果,这是不可能的。

【问题讨论】:

【参考方案1】:

敲自己。虽然我找到了答案https://***.com/a/55310670/1637673:

def unionByName(other: Dataset[T]): Dataset[T]

这个函数和union的区别在于这个函数是按名称解析列的(不是按位置):

val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+----+----+

但我不认为我有这个问题,经过一番努力,最终发现列顺序不同。

df 是

+----------------+--------+----------+-----------+---+---+
|store_product_id|store_id|product_id|price_guide| ds|  c|
+----------------+--------+----------+-----------+---+---+
+----------------+--------+----------+-----------+---+---+

df1 是

+----------------+----------+--------+-----------+---+---+
|store_product_id|product_id|store_id|price_guide| ds|  c|
+----------------+----------+--------+-----------+---+---+
+----------------+----------+--------+-----------+---+---+

product_id|store_id的位置不同。

【讨论】:

以上是关于火花联合不按预期工作,添加新行的主要内容,如果未能解决你的问题,请参考以下文章

在火花中联合后再次排序的蜂巢表排序

自动驾驶+物流,会产生怎样的火花?

复制当前行,修改它并在火花中添加一个新行

PHP,PDO,表单,更新用户类不按预期工作

联合初始化程序“错误:‘。’标记之前的预期主表达式”

带有 COUNT() 函数的 SQL 联合未返回预期结果