根据 CSV 记录从 Spark 数据帧中过滤一些数据
Posted
技术标签:
【中文标题】根据 CSV 记录从 Spark 数据帧中过滤一些数据【英文标题】:Filtering some data from Spark dataframe based on CSV records 【发布时间】:2019-05-31 10:36:37 【问题描述】:我有一个包含一些单词的CSV
文件。总数csv 文件中的单词数不会超过 50k 条记录。
我有一个Spark Dataframe
,它是从具有keywords
列的JSON
文件创建的。我需要做的是从数据框中过滤掉 keywords
列值与 CSV 文件中存在的值匹配的记录。这里的 Matches 表示 csv 文件中的单词是否出现在 dataframe 列中。
举个例子,假设csv文件中有一个单词"baby toys"
,spark dataframe
看起来像这样
***Keywords***
new baby toys
baby toys for all
costly baby toys price
baby has toys
在上面的前 3 行应该被过滤掉,因为它们都按顺序包含单词baby toys
。
为了实现,我正在做这样的事情。
1. Reading csv file and creating a dataframe.
2. Collecting all the words as an array of strings from dataframe created above as
val negativeKeywords = csvDF.distinct.map(x => x.getString(0)).collect()
3. Creating a UDF to match the words - one from negative list created above and other from Spark dataframe(created in step 4)
val udfmatch= udf((x: String) =>
val loop = new Breaks
var check = false
loop.breakable
for(s <- negativeKeywords)
if(x.contains(s))
check = true
loop.break
check
)
4. Created spark dataframe from JSON file.
5. Filter from the above JSON dataframe using UDF defined above.
sparkDf.filter(udfmatch(col("keyword_text")))
在上面,我正在为Spark dataframe
中存在的每个关键字迭代整个csv
单词列表(直到找到它),我认为这是不正确且耗时的。
有人可以建议一个更好的方法。
【问题讨论】:
x.contains(s)
是要求还是可以匹配令牌?例如“BigToysTools”应该匹配“Toys”还是可以标记为“big”、“toys”、“tools”然后匹配完整的标记?
@ollik1 我们无法进行分词,因为单词需要按精确顺序匹配
你不能在 json Df 上使用这个原生的 isin 吗?我能想到的方法是将您的 csv 读入 Seq[String] 并使用带有 .isin.. 的 when 函数映射到 Df 上。这不适合您的情况吗?
@user2315840 你能不能给我们一些代码 sn-p 让它更容易理解。
在下面查看我的答案
【参考方案1】:
让我们从来自 csv 的关键字序列开始;我们可以使用 fold left 并使用 contains 如下代码:
val conds= seq(("new toys")).toList
val result = conds.foldLeft(yourJsonDf)(newdf,conds)=>
newdf.filter(!col("yourJsonColumnToFitlerOn").contains(conds))
这将为您提供与您的列表不匹配的行。如果您只想拥有那些过滤的行。你可以简单地加入你的“yourJsonColumntoFilteron”回到你原来的DF并使用leftanti加入 类似于下面的代码
val filteredResults = originalDf.join(result ,Seq("yourJsonColumnToFilterOn"),"leftanti")
filteredResults.show()
根据您的返回空 DF 的问题是由于 foldLeft 的火花处理的性质。它不会破坏列表中项目迭代的循环。或者简单地说 foldleft 不会根据列表中的元素执行 N 次。执行只需 1 次,为您节省大量时间和资源,无需担心 GC 清理。希望以上答复中的更改符合您的要求。 干杯,
【讨论】:
谢谢.. 我会检查一下.. 你提到的评论中只有一件事 isin 但这里你建议包含.. 使用哪一个? 另外,上面一个工作得很好。但一个奇怪的是,如果我删除“!”在过滤器中,然后我得到空的数据框。我虽然会相反。你能帮帮我吗 newdf.filter(col("yourJsonColumnToFitlerOn").contains(conds)) 不起作用 什么不起作用?你能详细说明一下吗?另外,如果我正确理解了您的问题,您需要删除那些与您的 csv 中的单词匹配的行吗?这是正确的假设吗? contains 将在字符串匹配的地方进行通配符搜索。如果我正确阅读了您的问题,您应该使用 contains,因为只要它有单词或列表,它就会忽略前后的单词单词的顺序相同。它会选择它们。以上是关于根据 CSV 记录从 Spark 数据帧中过滤一些数据的主要内容,如果未能解决你的问题,请参考以下文章
无法过滤存储在 spark 2.2.0 数据框中的 CSV 列
如何根据 Spark Scala 中其他数据帧中的多列匹配过滤数据帧
是否可以以相同或不同的顺序将具有相同标题或标题子集的多个 csv 文件读取到 spark 数据帧中?