比较两个数据集并获取更改了哪些字段

Posted

技术标签:

【中文标题】比较两个数据集并获取更改了哪些字段【英文标题】:Compare two dataset and get what fields are changed 【发布时间】:2019-08-20 11:37:12 【问题描述】:

我正在使用 Java 开发 spark,我将从 api 下载数据并与 mongodb 数据进行比较,而下载的 json 有 15-20 个字段,但数据库有 300 个字段。

现在我的任务是将下载的 jsons 与 mongodb 数据进行比较,并获取过去数据更改的任何字段。

样本数据集

从 API 下载的数据

StudentId,Name,Phone,Email
1,tony,123,a@g.com
2,stark,456,b@g.com
3,spidy,789,c@g.com

MongoDB 数据

StudentId,Name,Phone,Email,State,City
1,tony,1234,a@g.com,NY,Nowhere
2,stark,456,bg@g.com,NY,Nowhere
3,spidy,789,c@g.com,OH,Nowhere

由于列长,我不能使用 except。

预期输出

StudentId,Name,Phone,Email,Past_Phone,Past_Email
1,tony,1234,a@g.com,1234, //phone number only changed 
2,stark,456,b@g.com,,bg@g.com //Email only changed
3,spidy,789,c@g.com,,

【问题讨论】:

【参考方案1】:

考虑您的数据位于 2 个数据帧中。我们可以为它们创建临时视图,如下图,

api_df.createOrReplaceTempView("api_data")
mongo_df.createOrReplaceTempView("mongo_data")

接下来我们可以使用 Spark SQL。在这里,我们使用 StudentId 列连接这两个视图,然后在它们之上使用 case 语句来计算过去的电话号码和电子邮件。

spark.sql("""
select a.*
, case when a.Phone = b.Phone then '' else b.Phone end as Past_phone
, case when a.Email = b.Email then '' else b.Email end as Past_Email
from api_data a
join mongo_data b
on a.StudentId = b.StudentId
order by a.StudentId""").show()

输出:

+---------+-----+-----+-------+----------+----------+
|StudentId| Name|Phone|  Email|Past_phone|Past_Email|
+---------+-----+-----+-------+----------+----------+
|        1| tony|  123|a@g.com|      1234|          |
|        2|stark|  456|b@g.com|          |  bg@g.com|
|        3|spidy|  789|c@g.com|          |          |
+---------+-----+-----+-------+----------+----------+

【讨论】:

【参考方案2】:

请在下面找到相同的源代码。这里我以唯一的电话号码条件为例。

val list = List((1,"tony",123,"a@g.com"), (2,"stark",456,"b@g.com") 
       (3,"spidy",789,"c@g.com"))
val df1 = list.toDF("StudentId","Name","Phone","Email")
     .select('StudentId as "StudentId_1", 'Name as "Name_1",'Phone as "Phone_1", 
     'Email as "Email_1")

df1.show()

val list1 = List((1,"tony",1234,"a@g.com","NY","Nowhere"), 
     (2,"stark",456,"bg@g.com", "NY", "Nowhere"),
     (3,"spidy",789,"c@g.com","OH","Nowhere"))

val df2 = list1.toDF("StudentId","Name","Phone","Email","State","City")
      .select('StudentId as "StudentId_2", 'Name as "Name_2", 'Phone as "Phone_2",
         'Email as "Email_2", 'State as "State_2", 'City as "City_2")

df2.show()

val df3 = df1.join(df2, df1("StudentId_1") === 
         df2("StudentId_2")).where(df1("Phone_1") =!= df2("Phone_2"))

df3.withColumnRenamed("Phone_1", "Past_Phone").show()

+-----------+------+-------+-------+
|StudentId_1|Name_1|Phone_1|Email_1|
+-----------+------+-------+-------+
|          1|  tony|    123|a@g.com|
|          2| stark|    456|b@g.com|
|          3| spidy|    789|c@g.com|
+-----------+------+-------+-------+

+-----------+------+-------+--------+-------+-------+
|StudentId_2|Name_2|Phone_2| Email_2|State_2| City_2|
+-----------+------+-------+--------+-------+-------+
|          1|  tony|   1234| a@g.com|     NY|Nowhere|
|          2| stark|    456|bg@g.com|     NY|Nowhere|
|          3| spidy|    789| c@g.com|     OH|Nowhere|
+-----------+------+-------+--------+-------+-------+

+-----------+------+----------+-------+-----------+------+-------+-------+-------+-------+
|StudentId_1|Name_1|Past_Phone|Email_1|StudentId_2|Name_2|Phone_2|Email_2|State_2| City_2|
+-----------+------+----------+-------+-----------+------+-------+-------+-------+-------+
|          1|  tony|       123|a@g.com|          1|  tony|   1234|a@g.com|     NY|Nowhere|
+-----------+------+----------+-------+-----------+------+-------+-------+-------+-------+

【讨论】:

谢谢您的快速回答,但是,我的预期表不应该有状态,城市。如果不为空,应该只显示更改的字段。我提前我不知道 df1 中的哪些字段正在更改,所以它应该是动态的 好的。我们是否有任何特定的列将永远存在?【参考方案3】:

我们有:

df1.show
+-----------+------+-------+-------+
|StudentId_1|Name_1|Phone_1|Email_1|
+-----------+------+-------+-------+
|          1|  tony|    123|a@g.com|
|          2| stark|    456|b@g.com|
|          3| spidy|    789|c@g.com|
+-----------+------+-------+-------+

df2.show

+-----------+------+-------+--------+-------+-------+
|StudentId_2|Name_2|Phone_2| Email_2|State_2| City_2|
+-----------+------+-------+--------+-------+-------+
|          1|  tony|   1234| a@g.com|     NY|Nowhere|
|          2| stark|    456|bg@g.com|     NY|Nowhere|
|          3| spidy|    789| c@g.com|     OH|Nowhere|
+-----------+------+-------+--------+-------+-------+

加入后:

var jn = df2.join(df1,df1("StudentId_1")===df2("StudentId_2"))

然后

var ans = jn.withColumn("Past_Phone", when(jn("Phone_2").notEqual(jn("Phone_1")),jn("Phone_1")).otherwise("")).withColumn("Past_Email", when(jn("Email_2").notEqual(jn("Email_1")),jn("Email_1")).otherwise(""))

参考:Spark: Add column to dataframe conditionally

下一步:

ans.select(ans("StudentId_2") as "StudentId",ans("Name_2") as "Name",ans("Phone_2") as "Phone",ans("Email_2") as "Email",ans("Past_Email"),ans("Past_Phone")).show

+---------+-----+-----+--------+----------+----------+
|StudentId| Name|Phone|   Email|Past_Email|Past_Phone|
+---------+-----+-----+--------+----------+----------+
|        1| tony| 1234| a@g.com|          |       123|
|        2|stark|  456|bg@g.com|   b@g.com|          |
|        3|spidy|  789| c@g.com|          |          |
+---------+-----+-----+--------+----------+----------+

【讨论】:

以上是关于比较两个数据集并获取更改了哪些字段的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark Java - 如何遍历行数据集并删除空字段

循环访问 Access 中的两个记录集并更新条目

如何比较两个二进制文件或文件集并在 Python 中显示它们之间的差异?

在 spark java 中取两个数据集并集的必要条件是啥

nodejs查询数据库后,获取result结果集并赋值返回

合并两个具有列表的数据集并在合并后使用 pandas 保留列表