在 pyspark 中使用 UDF 和简单数据帧

Posted

技术标签:

【中文标题】在 pyspark 中使用 UDF 和简单数据帧【英文标题】:using UDF's and simpe dataframes in pyspark 【发布时间】:2020-01-23 23:52:04 【问题描述】:

我是 pyspark 的新手,来尝试做类似下面的事情 为每个 cookie 调用函数 PrintDetails,然后将结果写入文件。 spark.sql 查询返回正确的数据,我也可以将其序列化为文件。 有人可以帮助处理每个 cookie 上的 for 语句吗?调用 UDF 的语法应该是什么?如何将输出写入文本文件?

感谢任何帮助。 谢谢

@udf(returnType=StringType())
def PrintDetails(cookie, timestamps,current_day, current_hourly_threshold,current_daily_threshold):
     #DO SOME WORK
     return "%s\t%d\t%d\t%d\t%d\t%s" %(some_data)

def main(argv):
    spark = SparkSession \
        .builder \
        .appName("parquet_test") \
        .config("spark.debug.maxToStringFields", "100") \
        .getOrCreate()

    inputPath = r'D:\Hadoop\Spark\parquet_input_files'
    inputFiles = os.path.join(inputPath, '*.parquet')

    impressionDate =  datetime.strptime("2019_12_31", '%Y_%m_%d')
    current_hourly_threshold = 40
    current_daily_threshold = 200

    parquetFile = spark.read.parquet(inputFiles)
    parquetFile.createOrReplaceTempView("parquetFile")
    cookie_and_time = spark.sql("SELECT cookie, collect_list(date_format(from_unixtime(ts), 'YYYY-mm-dd-H:M:S'))  as imp_times FROM parquetFile group by 1  ")

    for cookie in cookie_and_time :
        PrintDetails(cookie('cookie'), cookie('imp_times'), impressionDate, current_hourly_threshold, current_daily_threshold))

【问题讨论】:

【参考方案1】:

你可以像下面那样做。

cookie_df= cookie_and_time.withColumn("cookies",PrintDetails(cookie('cookie'), cookie('imp_times'), lit(impressionDate), lit(current_hourly_threshold), lit(current_daily_threshold)))

或者您可以在 udf 函数本身中定义所有变量并避免作为参数传递。

【讨论】:

谢谢 Manoj!我将代码更改为 cookie_df = cookie_and_time.withColumn("cookies", PrintDetails(cookie_and_time('cookie'), cookie_and_time('imp_times'), lit(impressionDate), lit(current_hourly_threshold), lit(current_daily_threshold))) 但我明白了以下错误 TypeError: 'DataFrame' object is not callable 请根据您的要求修改现有的数据框架构。

以上是关于在 pyspark 中使用 UDF 和简单数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

在 pyspark 中的数据帧上应用 udf 后出错

在 UDF 之后将新列附加到现有 PySpark 数据帧

Pyspark:UDF 将正则表达式应用于数据帧中的每一行

在 pyspark 中应用 udf 过滤功能