如何将数据集拆分为两个具有唯一和重复行的数据集?
Posted
技术标签:
【中文标题】如何将数据集拆分为两个具有唯一和重复行的数据集?【英文标题】:How to split dataset to two datasets with unique and duplicate rows each? 【发布时间】:2018-11-22 04:42:39 【问题描述】:我想在 Spark Scala Dataframe 中获取重复记录。例如,我想根据“id”、“name”、“age”等 3 列获取重复值。条件部分包含任何列(动态输入)。基于列值我想采取重复的记录。
我尝试过的以下代码。我只尝试了一个属性。如果超过一列,我不知道该怎么办。
我的代码:
var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')
val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()
输入数据帧:(findDuplicateRecordsDF.show())
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
在这里,我将根据 4 列(id、name、phone、email)获取重复记录。以上是示例数据框。原始数据框不包含任何列。
输出数据框应该是
重复记录输出
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
--------------------------------------------------------
唯一记录数据框输出:
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
提前致谢。
【问题讨论】:
您可以在partitionBy()
中指定逗号分隔的列列表。
【参考方案1】:
您可以使用窗口函数。看看这个
scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)
scala> df.createOrReplaceTempView("contact")
scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "
scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+
scala> df2.createOrReplaceTempView("contact2")
//重复
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+
// 唯一的
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+
EDIT2:
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show
结果:
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+
EDIT3: - 空条件检查
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
结果:
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+
+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+
空白支票
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^\s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )
仅当所有 3 列均为空白或 null 时过滤
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^\s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
【讨论】:
在 spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from在 org.apache.spark.sql 联系 -------------------------^^^。 org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114) 处的催化剂.parser.ParseException.withCommand(ParseDriver.scala:217) 似乎 dup_cols_qry 是空字符串.. 再次检查 是的,现在它正在工作......谢谢stack0114106......不,这不是一个重复的问题......另一个问题解释是“我正在使用每一列或所需的列来计算唯一的数量和重复记录。有关更多信息,请阅读该问题" 我得到了这个问题..我只是试一试..几乎完成了一半..顺便说一句,你是 QA 团队的..你的问题总是与元编程有关 我有这个问题的答案.. 我想 SO 可能暂时不允许您提问.. 提出一个新问题或提供您的邮件 ID【参考方案2】:您需要提供逗号分隔的列名。
col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)
findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()
【讨论】:
输入包含N个列..它的动态值以上是关于如何将数据集拆分为两个具有唯一和重复行的数据集?的主要内容,如果未能解决你的问题,请参考以下文章