根据 CSV 记录从 Spark 数据帧中过滤一些数据

Posted

技术标签:

【中文标题】根据 CSV 记录从 Spark 数据帧中过滤一些数据【英文标题】:Filtering some data from Spark dataframe based on CSV records 【发布时间】:2019-05-31 10:36:37 【问题描述】:

我有一个包含一些单词的CSV 文件。总数csv 文件中的单词数不会超过 50k 条记录。

我有一个Spark Dataframe,它是从具有keywords 列的JSON 文件创建的。我需要做的是从数据框中过滤掉 keywords 列值与 CSV 文件中存在的值匹配的记录。这里的 Matches 表示 csv 文件中的单词是否出现在 dataframe 列中。

举个例子,假设csv文件中有一个单词"baby toys"spark dataframe看起来像这样

***Keywords***
new baby toys
baby toys for all
costly baby toys price
baby has toys

在上面的前 3 行应该被过滤掉,因为它们都按顺序包含单词baby toys

为了实现,我正在做这样的事情。

1. Reading csv file and creating a dataframe.

2. Collecting all the words as an array of strings from dataframe created above as
val negativeKeywords = csvDF.distinct.map(x => x.getString(0)).collect()

3. Creating a UDF to match the words - one from negative list created above and other from Spark dataframe(created in step 4)
 val udfmatch= udf((x: String) => 
      val loop = new Breaks
      var check = false
      loop.breakable
        for(s <- negativeKeywords)
          if(x.contains(s))
            check = true
            loop.break
          
        
      
      check
  )

4. Created spark dataframe from JSON file. 
5. Filter from the above JSON dataframe using UDF defined above. 
   sparkDf.filter(udfmatch(col("keyword_text")))

在上面,我正在为Spark dataframe 中存在的每个关键字迭代整个csv 单词列表(直到找到它),我认为这是不正确且耗时的。 有人可以建议一个更好的方法。

【问题讨论】:

x.contains(s) 是要求还是可以匹配令牌?例如“BigToysTools”应该匹配“Toys”还是可以标记为“big”、“toys”、“tools”然后匹配完整的标记? @ollik1 我们无法进行分词,因为单词需要按精确顺序匹配 你不能在 json Df 上使用这个原生的 isin 吗?我能想到的方法是将您的 csv 读入 Seq[String] 并使用带有 .isin.. 的 when 函数映射到 Df 上。这不适合您的情况吗? @user2315840 你能不能给我们一些代码 sn-p 让它更容易理解。 在下面查看我的答案 【参考方案1】:

让我们从来自 csv 的关键字序列开始;我们可以使用 fold left 并使用 contains 如下代码:

val conds= seq(("new toys")).toList
val result = conds.foldLeft(yourJsonDf)(newdf,conds)=>
 newdf.filter(!col("yourJsonColumnToFitlerOn").contains(conds))

这将为您提供与您的列表不匹配的行。如果您只想拥有那些过滤的行。你可以简单地加入你的“yourJsonColumntoFilteron”回到你原来的DF并使用leftanti加入 类似于下面的代码

val filteredResults = originalDf.join(result ,Seq("yourJsonColumnToFilterOn"),"leftanti")
filteredResults.show()

根据您的返回空 DF 的问题是由于 foldLeft 的火花处理的性质。它不会破坏列表中项目迭代的循环。或者简单地说 foldleft 不会根据列表中的元素执行 N 次。执行只需 1 次,为您节省大量时间和资源,无需担心 GC 清理。希望以上答复中的更改符合您的要求。 干杯,

【讨论】:

谢谢.. 我会检查一下.. 你提到的评论中只有一件事 isin 但这里你建议包含.. 使用哪一个? 另外,上面一个工作得很好。但一个奇怪的是,如果我删除“!”在过滤器中,然后我得到空的数据框。我虽然会相反。你能帮帮我吗 newdf.filter(col("yourJsonColumnToFitlerOn").contains(conds)) 不起作用 什么不起作用?你能详细说明一下吗?另外,如果我正确理解了您的问题,您需要删除那些与您的 csv 中的单词匹配的行吗?这是正确的假设吗? contains 将在字符串匹配的地方进行通配符搜索。如果我正确阅读了您的问题,您应该使用 contains,因为只要它有单词或列表,它就会忽略前后的单词单词的顺序相同。它会选择它们。

以上是关于根据 CSV 记录从 Spark 数据帧中过滤一些数据的主要内容,如果未能解决你的问题,请参考以下文章

无法过滤存储在 spark 2.2.0 数据框中的 CSV 列

如何根据 Spark Scala 中其他数据帧中的多列匹配过滤数据帧

如何加载包含多行记录的 CSV 文件?

是否可以以相同或不同的顺序将具有相同标题或标题子集的多个 csv 文件读取到 spark 数据帧中?

如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?

当函数在具有自动检测模式的 spark 数据帧中不起作用时