比较两个数据帧以在 spark 中查找子字符串
Posted
技术标签:
【中文标题】比较两个数据帧以在 spark 中查找子字符串【英文标题】:Compare two dataframes to find substring in spark 【发布时间】:2017-03-21 15:50:10 【问题描述】:我有三个数据框,字典、SourceDictionary 和 MappedDictionary。字典和SourceDictionary 只有一列,将单词表示为String。拥有百万条记录的字典是 MappedDictionary 的子集(大约 10M 条记录),MappedDictionary 中的每条记录都是字典的子字符串。所以,我需要将带有 SourceDictionary 的字典映射到 MappedDictionary。 示例:
Records in ditionary : BananaFruit, AppleGreen
Records in SourceDictionary : Banana,grape,orange,lemon,Apple,...
要在 MappedDictionary 中映射的记录(包含两列):
BananaFruit Banana
AppleGreen Apple
我计划在 java 中做两个 for 循环并进行子字符串操作,但问题是 100 万 * 1000 万 = 10 万亿次迭代 此外,我无法获得像 for 循环一样迭代数据帧的正确方法 有人可以为在 Dataframe 中进行迭代并执行子字符串操作的方法提供解决方案吗? 对不起我的英语不好,我是非本地人 提前感谢***社区成员:-)
【问题讨论】:
【参考方案1】:虽然你在 sourceDictionary 中有百万条记录,因为它只有一列将它广播到每个节点不会占用太多内存,并且会加快整体性能。
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.encoders.RowEncoder
//Assuming the schema names
val SourceDictionarySchema = StructType(StructField("value",StringType,false))
val dictionarySchema = StructType(StructField("value",StringType,false))
val MappedDictionary = StructType(StructField("value",StringType,false), StructField("key",StringType,false))
val sourceDictionaryBC = sc.broadcast(
sourceDictionary.map(row =>
row.getAs[String]("value")
).collect.toList
)
val MappedDictionaryN = dictionary.map(row =>
val value = row.getAs[String]("value")
val matchedKey = sourceDictionaryBC.value.find(value.contains)
Seq(value, matchedKey.orNull)
)(RowEncoder(MappedDictionary))
在此之后,您将拥有所有新的映射记录。如果你想将它与现有的 MappedDictionary 结合起来,只需做一个简单的联合。
MappedDictionaryN.union(MappedDictionary)
【讨论】:
感谢您的建议 Swadhin。这正是我想要的。 但是广播可以处理多少记录呢?我的意思是可以广播的最大文件大小? 只要大小小于工作节点的内存就可以广播这些记录。广播主要用于缓存每个节点中的小数据集,以便在执行 DataFarme 时,该节点不必从另一个节点检索该数据集。在您的情况下,因为 sourceDictionary 只有一列,并且几百万条记录广播是最好的方法。 有用的链接:1、2、3以上是关于比较两个数据帧以在 spark 中查找子字符串的主要内容,如果未能解决你的问题,请参考以下文章