火花联合不按预期工作,添加新行
Posted
技术标签:
【中文标题】火花联合不按预期工作,添加新行【英文标题】:spark union doesn't work as expect, add new rows 【发布时间】:2019-08-06 02:06:06 【问题描述】:我不能使用简单的测试集来重现这个问题,它只发生在我的数据集上。所以只能说情况。
df
有很多不同的 store_id,product_id 组,每个组有很多行。
df1
有许多不同的 store_id,product_id 组,只有一行或没有。
df 是订单历史表,我需要从中获取历史价格,并从 df1 获取当前价格。联合他们构建一条全程价格变化线。
但奇怪的是
sid = '00fbb2a6-f2de-42f1-a07b-163e3a050ddb'
pid = '66e06f08-dec2-498d-883f-24771da18358'
filtersp = lambda df: df.filter(col('store_id')==sid).filter(col('product_id')==pid)
filtersp(df).show()
+----------------+--------+----------+-----------+---+
|store_product_id|store_id|product_id|price_guide| ds|
+----------------+--------+----------+-----------+---+
+----------------+--------+----------+-----------+---+
filtersp(df1).show()
+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+
filtersp(df1).union(filtersp(df)).show()
+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+
filtersp(df1.union(df)).show()
+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+
filtersp(df.union(df1)).show()
+--------------------+--------------------+--------------------+-----------+-------------------+
| store_product_id| store_id| product_id|price_guide| ds|
+--------------------+--------------------+--------------------+-----------+-------------------+
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|
+--------------------+--------------------+--------------------+-----------+-------------------+
然后我添加一个新列来跟踪这些行的来源
df = df.withColumn('c', lit('df'))
df1 = df1.withColumn('c', lit('df1'))
filtersp(df.union(df1)).show()
+--------------------+--------------------+--------------------+-----------+-------------------+---+
| store_product_id| store_id| product_id|price_guide| ds| c|
+--------------------+--------------------+--------------------+-----------+-------------------+---+
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...| 480|2019-08-06 09:00:00|df1|
+--------------------+--------------------+--------------------+-----------+-------------------+---+
查找来自 df1 的行。
我不明白filtersp(df.union(df1)).show()
在什么情况下会显示结果,这是不可能的。
【问题讨论】:
【参考方案1】:敲自己。虽然我找到了答案https://***.com/a/55310670/1637673:
def unionByName(other: Dataset[T]): Dataset[T]
这个函数和union的区别在于这个函数是按名称解析列的(不是按位置):
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 4| 5| 6|
// +----+----+----+
但我不认为我有这个问题,经过一番努力,最终发现列顺序不同。
df 是
+----------------+--------+----------+-----------+---+---+
|store_product_id|store_id|product_id|price_guide| ds| c|
+----------------+--------+----------+-----------+---+---+
+----------------+--------+----------+-----------+---+---+
df1 是
+----------------+----------+--------+-----------+---+---+
|store_product_id|product_id|store_id|price_guide| ds| c|
+----------------+----------+--------+-----------+---+---+
+----------------+----------+--------+-----------+---+---+
product_id|store_id
的位置不同。
【讨论】:
以上是关于火花联合不按预期工作,添加新行的主要内容,如果未能解决你的问题,请参考以下文章