如何从 spark sql 调用具有数据帧操作的函数?

Posted

技术标签:

【中文标题】如何从 spark sql 调用具有数据帧操作的函数?【英文标题】:How to call functions with dataframe operations from spark sql? 【发布时间】:2018-12-28 12:23:02 【问题描述】:

我有 sql 它基本上加入两个表并得到结果 accomm_sk,如果 accomm_sk 值为 NULL 则 spark UDF 将被调用如果没有,则在第三个表中查找然后得到结果。如何在 spark sql 中使用此函数,因为 Spark 不允许注册为 UDF?

火花 UDF

def GeneratedAccommSk(localHash):
    query = 'select accommodation_sk from staging.accomm_dim where accomm_hash=""'.format(localHash)
    accommodationSk_Df=spark.sql(query)
    accomm_count=accommSk_Df.filter(accommSk_Df.accomm_sk.isNotNull()).count()
    if accomm_count != 0:
        accomm_sk=accommSk_Df.select('accomm_sk').collect()[0].asDict()['accomm_sk']
    else:
        func = sc._gateway.jvm.RandomNumberGenerator()
        accom_sk=func.generateRandomNumber().encode('ascii', 'ignore')
    return accom_sk

Spark SQL:

        rate_fact_df=spark.sql("""
*Calling GeneratedAccommSk UDF*
        select  case when accomm_sk IS NOT NULL THEN accommodation_sk 
    ELSE GeneratedAccommSk(a.accommhash) END 
        from 
        staging.contract_test a 
        join 
        dim.accomm_dim b 
        on (a.accomm_hash)= b.accommodation_hash
        """)

【问题讨论】:

你为什么不使用spark数据框而不是sql表? 【参考方案1】:

至少有两个原因这行不通:

执行器上没有 Py4j 网关,因此不能调用任何这样的 Java 代码 (Calling Java/Scala function from a task) 您不能在执行程序 (Why does this Spark code make NullPointerException?) 上使用 SparkSession 或任何分布式对象 (DataFrame, RDD)

根据accommSk_Df 的大小,您应该收集它并使用本地对象 (Lookup in spark dataframes) 或执行另一个连接。

【讨论】:

以上是关于如何从 spark sql 调用具有数据帧操作的函数?的主要内容,如果未能解决你的问题,请参考以下文章

spark scala比较具有时间戳列的数据帧

Spark:如何从 Spark 数据帧行解析和转换 json 字符串

具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO

Apache Spark 如何将新列从列表/数组附加到 Spark 数据帧

将 spark 数据帧聚合转换为 SQL 查询; window、groupby 的问题,以及如何聚合?

如何在Spark SQL中查询StringType的1个字段具有json值的数据框