Spark sql连接两个没有主键的数据帧

Posted

技术标签:

【中文标题】Spark sql连接两个没有主键的数据帧【英文标题】:Spark sql to join two dataframes with no primary keys 【发布时间】:2020-06-13 11:40:51 【问题描述】:

我想加入以下 2 个数据框以创建新的架构数据:

df = sqlContext.createDataFrame([("A011021","15","2020-01-01","2020-12-31","4"),("A011021","15","2020-01-01","2020-12-31","4"),("A011021","15","2020-01-01","2020-12-31","4"),("A011021","15","2020-01-01","2020-12-31","3")], ["rep_id","sales_target","start_date","end_date","st_new"])
df2.createOrReplaceTempView('df')
+--------------+------------+----------+----------+------+
rep_id         |sales_target|start_date|end_date  |st_new|
+--------------+------------+----------+----------+-------
|A011021       |15          |2020-01-01|2020-12-31|4     |
|A011021       |15          |2020-01-01|2020-12-31|4     |
|A011021       |15          |2020-01-01|2020-12-31|4     |
|A011021       |15          |2020-01-01|2020-12-31|3     |
|A011022       |6           |2020-01-01|2020-12-31|3     |
|A011022       |6           |2020-01-01|2020-12-31|3     |
+--------------+------------+----------+----------+-------

df2 = sqlContext.createDataFrame([("A011021","15","2020-01-01","2020-12-31","2020-01-01","2020-03-31"),("A011021","15","2020-01-01","2020-12-31","2020-04-01","2020-06-30"),("A011021","15","2020-01-01","2020-12-31","2020-07-01","2020-09-30"),("A011021","15","2020-01-01","2020-12-31","2020-10-01","2020-12-31")], ["rep_id","sales_target","start_date","end_date","new_sdt","new_edt"])
df2.createOrReplaceTempView('df2')
+--------------+------------+----------+----------+-----------+----------+
rep_id         |sales_target|start_date|end_date  |new_sdt    |new_edt   |
+--------------+------------+----------+----------------------+----------+
|A011021       |15          |2020-01-01|2020-12-31|2020-01-01 |2020-03-31|
|A011021       |15          |2020-01-01|2020-12-31|2020-04-01 |2020-06-30|
|A011021       |15          |2020-01-01|2020-12-31|2020-07-01 |2020-09-30|
|A011021       |15          |2020-01-01|2020-12-31|2020-10-01 |2020-12-31|
|A011022       |6           |2020-01-01|2020-06-30|2020-01-01 |2020-03-31|
|A011022       |6           |2020-01-01|2020-06-30|2020-04-01 |2020-06-30|
+--------------+------------+----------+----------------------+----------+

当我运行查询以加入两个表时,我得到如下重复的结果:

select ds1.*,ds2.st_new from df2 ds2
inner join df1 ds1
on ds2.rep_id=ds1.rep_id
where ds2.rep_id='A011021'

+--------------+------------+----------+----------+------+-----------+----------+
rep_id         |sales_target|start_date|end_date  |st_new|new_sdt    |new_edate |
+--------------+------------+----------+----------+------------------+----------+
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-01-01 |2019-12-31|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-01-01 |2019-12-31|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-01-01 |2019-12-31|
|A011021       |15          |2020-01-01|2020-12-31|3     |2020-01-01 |2020-03-31|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-04-01 |2020-03-31|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-04-01 |2020-03-31|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-04-01 |2020-03-31|
|A011021       |15          |2020-01-01|2020-12-31|3     |2020-04-01 |2020-06-30|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-07-01 |2020-06-30|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-07-01 |2020-06-30|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-07-01 |2020-06-30|
|A011021       |15          |2020-01-01|2020-12-31|3     |2020-07-01 |2020-09-30|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-10-01 |2020-09-30|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-10-01 |2020-09-30|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-10-01 |2020-09-30|
|A011021       |15          |2020-01-01|2020-12-31|3     |2020-10-01 |2020-12-30|
+--------------+------------+----------+----------+------------------+----------+

有没有办法只获取给定 rep_id 的不同 new_sdt、new_edt、使用 spark_sql 或 pyspark 函数的季度数据,请帮助。

预期结果是:

select ds1.*,ds2.st_new from df2 ds2
inner join df1 ds1
on ds2.rep_id=ds1.rep_id

+--------------+------------+----------+----------+------+-----------+----------+
rep_id         |sales_target|start_date|end_date  |st_new|new_sdt    |new_edt |
+--------------+------------+----------+----------+------------------+----------+
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-01-01 |2020-03-31|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-04-01 |2020-06-30|
|A011021       |15          |2020-01-01|2020-12-31|4     |2020-07-01 |2020-09-30|
|A011021       |15          |2020-01-01|2020-12-31|3     |2020-10-01 |2020-12-31|
|A011022       |6           |2020-01-01|2020-12-31|3     |2020-01-01 |2020-03-31|
|A011022       |6           |2020-01-01|2020-12-31|3     |2020-04-01 |2020-06-30|
+--------------+------------+----------+----------+------------------+----------+

【问题讨论】:

customer_number 列是什么? 抱歉,应该是 rep_id,更新了查询。谢谢 【参考方案1】:
    分配唯一 ID 进行内部连接 删除多余的列
import org.apache.spark.sql.functions._

object InnerJoin 

  def main(args: Array[String]): Unit = 
    val spark = Constant.getSparkSess

    import spark.implicits._

    val df1 = List(
      ("A011021", "15", "2020-01-01", "2020-12-31", "4"),
      ("A011021", "15", "2020-01-01", "2020-12-31", "4"),
      ("A011021", "15", "2020-01-01", "2020-12-31", "4"),
      ("A011021", "15", "2020-01-01", "2020-12-31", "3"),
      ("A011022", "6" , "2020-01-01", "2020-12-31", "3"),
      ("A011022", "6" , "2020-01-01", "2020-12-31", "3"))
      .toDF("rep_id","sales_target","start_date","end_date","st_new")
      .withColumn("rowid",monotonically_increasing_id())

    val df2 = List(
      ("A011021","15","2020-01-01","2020-12-31","2020-01-01","2020-03-31"),
      ("A011021","15","2020-01-01","2020-12-31","2020-04-01","2020-06-30"),
      ("A011021","15","2020-01-01","2020-12-31","2020-07-01","2020-09-30"),
      ("A011021","15","2020-01-01","2020-12-31","2020-10-01","2020-12-31"),
      ("A011022","6" ,"2020-01-01","2020-06-30","2020-01-01","2020-03-31"),
      ("A011022","6" ,"2020-01-01","2020-06-30","2020-04-01","2020-06-30"))
      .toDF("rep_id","sales_target","start_date","end_date","new_sdt","new_edt")
      .withColumn("rowid",monotonically_increasing_id())


    df1.as("ds1").join(df2.as("ds2"),
      col("ds1.rowid") === col("ds2.rowid"),
      "inner")
      .orderBy(col("ds1.rep_id"),col("ds1.sales_target"),col("st_new").desc)
      .drop("rowid")
      .show()
  



期望的输出

+-------+------------+----------+----------+------+-------+------------+----------+----------+----------+----------+
| rep_id|sales_target|start_date|  end_date|st_new| rep_id|sales_target|start_date|  end_date|   new_sdt|   new_edt|
+-------+------------+----------+----------+------+-------+------------+----------+----------+----------+----------+
|A011021|          15|2020-01-01|2020-12-31|     4|A011021|          15|2020-01-01|2020-12-31|2020-04-01|2020-06-30|
|A011021|          15|2020-01-01|2020-12-31|     4|A011021|          15|2020-01-01|2020-12-31|2020-01-01|2020-03-31|
|A011021|          15|2020-01-01|2020-12-31|     4|A011021|          15|2020-01-01|2020-12-31|2020-07-01|2020-09-30|
|A011021|          15|2020-01-01|2020-12-31|     3|A011021|          15|2020-01-01|2020-12-31|2020-10-01|2020-12-31|
|A011022|           6|2020-01-01|2020-12-31|     3|A011022|           6|2020-01-01|2020-06-30|2020-04-01|2020-06-30|
|A011022|           6|2020-01-01|2020-12-31|     3|A011022|           6|2020-01-01|2020-06-30|2020-01-01|2020-03-31|
+-------+------------+----------+----------+------+-------+------------+----------+----------+----------+----------+

【讨论】:

谢谢,您也可以打印输出吗?将在我最后检查相同的内容。 也添加了输出@Yuva 谢谢,如果您看到 st_new,它应该按 4、4、4、3 的顺序排列,这就是我发现困难的地方,有什么建议可以让它正确。 st_new & new sdt、new edt 是分解数据。谢谢 并且我无法添加任何一对一的列级别验证,因为记录数会根据季度数而有所不同,请参阅我的示例,rep_id A011022。谢谢 @Yuva 请验证您的输入。我无法在任何地方看到A011022

以上是关于Spark sql连接两个没有主键的数据帧的主要内容,如果未能解决你的问题,请参考以下文章

SQL:选择另一个表中没有复合主键的条目

sql怎么设置外键

SQL查询删除没有主键的重复记录,保留最新的[重复]

SQL中有主外键的两表到底那这是主表

如何创建到没有主键的 SQL 表的映射?

SQL Server 中自增主键的上限