如何从 spark sql 调用具有数据帧操作的函数?
Posted
技术标签:
【中文标题】如何从 spark sql 调用具有数据帧操作的函数?【英文标题】:How to call functions with dataframe operations from spark sql? 【发布时间】:2018-12-28 12:23:02 【问题描述】:我有 sql 它基本上加入两个表并得到结果 accomm_sk,如果 accomm_sk 值为 NULL 则 spark UDF 将被调用如果没有,则在第三个表中查找然后得到结果。如何在 spark sql 中使用此函数,因为 Spark 不允许注册为 UDF?
火花 UDF
def GeneratedAccommSk(localHash):
query = 'select accommodation_sk from staging.accomm_dim where accomm_hash=""'.format(localHash)
accommodationSk_Df=spark.sql(query)
accomm_count=accommSk_Df.filter(accommSk_Df.accomm_sk.isNotNull()).count()
if accomm_count != 0:
accomm_sk=accommSk_Df.select('accomm_sk').collect()[0].asDict()['accomm_sk']
else:
func = sc._gateway.jvm.RandomNumberGenerator()
accom_sk=func.generateRandomNumber().encode('ascii', 'ignore')
return accom_sk
Spark SQL:
rate_fact_df=spark.sql("""
*Calling GeneratedAccommSk UDF*
select case when accomm_sk IS NOT NULL THEN accommodation_sk
ELSE GeneratedAccommSk(a.accommhash) END
from
staging.contract_test a
join
dim.accomm_dim b
on (a.accomm_hash)= b.accommodation_hash
""")
【问题讨论】:
你为什么不使用spark数据框而不是sql表? 【参考方案1】:至少有两个原因这行不通:
执行器上没有 Py4j 网关,因此不能调用任何这样的 Java 代码 (Calling Java/Scala function from a task) 您不能在执行程序 (Why does this Spark code make NullPointerException?) 上使用SparkSession
或任何分布式对象 (DataFrame
, RDD
)
根据accommSk_Df
的大小,您应该收集它并使用本地对象 (Lookup in spark dataframes) 或执行另一个连接。
【讨论】:
以上是关于如何从 spark sql 调用具有数据帧操作的函数?的主要内容,如果未能解决你的问题,请参考以下文章
Spark:如何从 Spark 数据帧行解析和转换 json 字符串
具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO
Apache Spark 如何将新列从列表/数组附加到 Spark 数据帧