如何在火花的过滤条件中使用NOT IN子句
Posted
技术标签:
【中文标题】如何在火花的过滤条件中使用NOT IN子句【英文标题】:How to use NOT IN clause in filter condition in spark 【发布时间】:2017-07-04 07:18:11 【问题描述】:我想过滤一个 RDD 源的列:
val source = sql("SELECT * from sample.source").rdd.map(_.mkString(","))
val destination = sql("select * from sample.destination").rdd.map(_.mkString(","))
val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))
val src = source_primary_key.subtractByKey(destination_primary_key)
我想在过滤条件中使用 IN 子句仅从源中过滤掉 src 中存在的值,如下所示(已编辑):
val source = spark.read.csv(inputPath + "/source").rdd.map(_.mkString(","))
val destination = spark.read.csv(inputPath + "/destination").rdd.map(_.mkString(","))
val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))
val extra_in_source = source_primary_key.filter(rec._1 != destination_primary_key._1)
等效的SQL代码是
SELECT * FROM SOURCE WHERE ID IN (select ID from src)
谢谢
【问题讨论】:
你的价值观有哪些类型? 这不是我要求的。 'src' 或 'source' 的类型是什么?您使用的是 RDD 还是 DataFrame? 请编辑您的帖子,为每个变量添加类型。 为什么不直接使用 spark sql 呢? sql返回结构化数据 【参考方案1】:由于您的代码不可重现,这里有一个使用spark-sql
的小示例,说明如何使用select * from t where id in (...)
:
// create a DataFrame for a range 'id' from 1 to 9.
scala> val df = spark.range(1,10).toDF
df: org.apache.spark.sql.DataFrame = [id: bigint]
// values to exclude
scala> val f = Seq(5,6,7)
f: Seq[Int] = List(5, 6, 7)
// select * from df where id is not in the values to exclude
scala> df.filter(!col("id").isin(f : _*)).show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 8|
| 9|
+---+
// select * from df where id is in the values to exclude
scala> df.filter(col("id").isin(f : _*)).show
这是not isin
的RDD版本:
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val f = Seq(5,6,7)
f: Seq[Int] = List(5, 6, 7)
scala> val rdd2 = rdd.filter(x => !f.contains(x))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at <console>:28
尽管如此,我仍然认为这是矫枉过正,因为您已经在使用 spark-sql
。
在您的情况下,您实际上是在处理 DataFrame,因此上述解决方案不起作用。
您可以使用左反连接方法:
scala> val source = spark.read.format("csv").load("source.file")
source: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields]
scala> val destination = spark.read.format("csv").load("destination.file")
destination: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields]
scala> source.show
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|_c0| _c1| _c2| _c3| _c4|_c5|_c6| _c7| _c8| _c9| _c10|
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
| 1| Ravi kumar| Ravi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 2|Shekhar shudhanshu| Shekhar|shudhanshu| Manulife | 2| M|18-01-1994|76.34| 250000| Alaska |
| 3|Preethi Narasingam| Preethi|Narasingam| Retail | 3| F|19-01-1994|77.45|270000.01| Arizona |
| 4| Abhishek Nair|Abhishek| Nair| Banking | 4| M|20-01-1994|78.65| 345000| Arkansas |
| 5| Ram Sharma| Ram| Sharma|Infrastructure | 5| M|21-01-1994|79.12| 45000| California |
| 6| Chandani Kumari|Chandani| Kumari| BNFS | 6| F|22-01-1994|80.13| 43000.02| Colorado |
| 7| Balaji Kumar| Balaji| Kumar| MSO | 1| M|23-01-1994|81.33| 1234678|Connecticut |
| 8| Naveen Shekrappa| Naveen| Shekrappa| Manulife | 2| M|24-01-1994| 100| 789414| Delaware |
| 9| Milind Chavan| Milind| Chavan| Retail | 3| M|25-01-1994|83.66| 245555| Florida |
| 10| Raghu Rajeev| Raghu| Rajeev| Banking | 4| M|26-01-1994|87.65| 235468| Georgia|
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
scala> destination.show
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|_c0| _c1| _c2| _c3| _c4|_c5|_c6| _c7| _c8| _c9| _c10|
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
| 1| Ravi kumar| Revi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 1| Ravi1 kumar| Revi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 1| Ravi2 kumar| Revi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 2| Shekhar shudhanshu| Shekhar|shudhanshu| Manulife | 2| M|18-01-1994|76.34| 250000| Alaska |
| 3|Preethi Narasingam1| Preethi|Narasingam| Retail | 3| F|19-01-1994|77.45|270000.01| Arizona |
| 4| Abhishek Nair1|Abhishek| Nair| Banking | 4| M|20-01-1994|78.65| 345000| Arkansas |
| 5| Ram Sharma| Ram| Sharma|Infrastructure | 5| M|21-01-1994|79.12| 45000| California |
| 6| Chandani Kumari|Chandani| Kumari| BNFS | 6| F|22-01-1994|80.13| 43000.02| Colorado |
| 7| Balaji Kumar| Balaji| Kumar| MSO | 1| M|23-01-1994|81.33| 1234678|Connecticut |
| 8| Naveen Shekrappa| Naveen| Shekrappa| Manulife | 2| M|24-01-1994| 100| 789414| Delaware |
| 9| Milind Chavan| Milind| Chavan| Retail | 3| M|25-01-1994|83.66| 245555| Florida |
| 10| Raghu Rajeev| Raghu| Rajeev| Banking | 4| M|26-01-1994|87.65| 235468| Georgia|
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
您只需要执行以下操作:
scala> val res1 = source.join(destination, Seq("_c0"), "leftanti")
scala> val res2 = destination.join(source, Seq("_c0"), "leftanti")
这与我在回答 here 中提到的逻辑相同。
【讨论】:
@eliasha,我使用的是 RDD 而不是数据框 我正在寻找类似 val extra_in_source = source_primary_key.filter(rec != destination_primary_key._1) 我仍然不相信使用 DataFrame 的原因是否可以使用。无论如何,让我更新我的答案 我已经更新了我正在寻找的问题 我在 source_primary_key 中有 (1,2,3,4),在destination_primary_key 中有 (1,2,3),我需要一个过滤器来过滤源中的额外内容,即 (4) 类似 val extra_In_source = source_primary_key.filter(NOT IN (destination_primary_key)),以上只是我要找的伪代码【参考方案2】:你可以试试——
df.filter(~df.Dept.isin("30","20")).show()
//这将列出 df 的所有列,其中 Dept NOT IN 30 或 20
【讨论】:
@BdEngineer 以上代码仅适用于 PySpark【参考方案3】:你可以在 Java 中尝试类似的东西,
ds = ds.filter(functions.not(functions.col(COLUMN_NAME).isin(exclusionSet)));
其中 excludeSet 是一组需要从数据集中移除的对象。
【讨论】:
以上是关于如何在火花的过滤条件中使用NOT IN子句的主要内容,如果未能解决你的问题,请参考以下文章