加入大数据帧和小数据帧时的广播数据帧和过滤器
Posted
技术标签:
【中文标题】加入大数据帧和小数据帧时的广播数据帧和过滤器【英文标题】:Broadcast DataFrame and Filter when joining a big with a small Dataframe 【发布时间】:2021-04-19 13:13:30 【问题描述】:我有一个大数据框,其中包含 user_id
、user_address
列和多个与用户相关的列(多 10-12 个)和一个小(user_id 和 user_contact
列。每个用户可以有很多电话号码/电子邮件地址。
我正在尝试广播较小的 Dataframe 并将其数据传递给 UDF。但是,我无法获取每个执行程序中存在的所有数据。如果未找到该 user_id 的联系方式,UDF 将返回 No match found
。
我尝试通过user_id
过滤broadcastDF,但似乎这不起作用。有人可以在这里指导我吗?
val DF = createDF("/somePath/")
val broadcastedDF = spark.sparkContext.broadcast(DF)
val DFWithUDF = someDF.select( col("user_id),
UDF(col("user_name"),
typedLit[Seq[String]](broadcastedDF.value.filter(broadcastedDF.value("user_id") === col("user_id")).
select("user_contact").
collect().map(data =>
String.valueOf(data.getAs[String]("user_contact")))
)
).alias(SCHEMA_REQUEST_TARGET)
【问题讨论】:
【参考方案1】:在加入这两个数据帧时,您可以使用加入提示来广播较小的数据帧,无需自己专门广播。您的代码可能如下所示:
val bigDf = ... // has columns user_id, user_address
val smallDf = ... // has columns user_id and user_contact
// define your join expression and join those tables using column user_id
import org.apache.spark.sql.functions.broadcast
val joinExpr = bigDf.col("user_id") === smallDf.col("user_id")
val joinedDF = bigDf.join(broadcast(smallDf), joinExpr)
如果您的小型 Dataframe 小于 10MB,并且您没有更改配置 spark.sql.autoBroadcastJoinThreshold
,默认为 10MB,您甚至不需要在加入时提及广播提示。
配置spark.sql.autoBroadcastJoinThreshold
描述为:
“配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为 -1,可以禁用广播。请注意,当前仅支持 Hive Metastore 表的统计信息,其中命令 ANALYZE TABLE COMPUTE STATISTICS noscan 已运行,以及基于文件的数据源表,其中直接在数据文件上计算统计信息。"
【讨论】:
感谢您的解决方案。但是不能直接广播数据帧吗?使用 SparkContext 中可用的广播方法? 很高兴它有帮助。关于广播DataFrame,可以看看【如何广播DataFrame? ](***.com/questions/59373087/…)。以上是关于加入大数据帧和小数据帧时的广播数据帧和过滤器的主要内容,如果未能解决你的问题,请参考以下文章