使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas

Posted

技术标签:

【中文标题】使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas【英文标题】:Convert Spark Structured DataFrame to Pandas using pandas_udf 【发布时间】:2019-05-20 08:16:39 【问题描述】:

我需要将 csv 文件作为流读取,然后将其转换为 pandas dataframe

这是我到目前为止所做的事情


    DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
    StructField("Count", IntegerType(), True), \
    StructField("Reading", FloatType(), True) ])

    group_columns = ['TimeStamp','Count','Reading']

    @pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        return pd.DataFrame([pdf[group_columns]],columns=[group_columns])

    # getting Surge data from the files
    SrgDF = spark \
        .readStream \
        .schema(DataShema) \
        .csv("ProcessdedData/SurgeAcc")

    mydf = SrgDF.groupby(group_columns).apply(get_pdf)

    qrySrg = SrgDF \
        .writeStream \
        .format("console") \
        .start() \
        .awaitTermination()

我相信从另一个来源 (Convert Spark Structure Streaming DataFrames to Pandas DataFrame) 将结构化流数据帧转换为 pandas 是不可能的,而且似乎 pandas_udf 是正确的方法,但无法弄清楚如何实现这一点。我需要将 pandas 数据框传递给我的函数。

编辑

当我运行代码(将查询更改为 mydf 而不是 SrgDF)时,我收到以下错误:pyspark.sql.utils.StreamingQueryException: 'Writing job aborted.\n=== Streaming Query ===\nIdentifier: [id = 18a15e9e-9762-4464-b6d1-cb2db8d0ac41, runId = e3da131e-00d1-4fed-82fc-65bf377c3f99]\nCurrent Committed Offsets: \nCurrent Available Offsets: FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc]: "logOffset":0\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nFlatMapGroupsInPandas [Count#1], get_pdf(TimeStamp#0L, Count#1, Reading#2), [TimeStamp#10L, Count#11, Reading#12]\n+- Project [Count#1, TimeStamp#0L, Count#1, Reading#2]\n +- StreamingExecutionRelation FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc], [TimeStamp#0L, Count#1, Reading#2]\n' 19/05/20 18:32:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver /usr/local/lib/python3.6/dist-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use "

EDIT-2

这是重现错误的代码

import sys

from pyspark import SparkContext
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.streaming import StreamingContext

from pyspark.sql.types import *

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyarrow as pa

import glob

#####################################################################################

if __name__ == '__main__' :

    spark = SparkSession \
        .builder \
        .appName("RealTimeIMUAnalysis") \
        .getOrCreate() 

    spark.conf.set("spark.sql.execution.arrow.enabled", "true")

    # reduce verbosity
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    ##############################################################################

    # using the saved files to do the Analysis
    DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
    StructField("Count", IntegerType(), True), \
    StructField("Reading", FloatType(), True) ])

    group_columns = ['TimeStamp','Count','Reading']

    @pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        return pd.DataFrame([pdf[group_columns]],columns=[group_columns])

    # getting Surge data from the files
    SrgDF = spark \
        .readStream \
        .schema(DataShema) \
        .csv("SurgeAcc")

    mydf = SrgDF.groupby('Count').apply(get_pdf)
    #print(mydf)

    qrySrg = mydf \
        .writeStream \
        .format("console") \
        .start() \
        .awaitTermination()

要运行,您需要在代码所在的位置创建一个名为 SurgeAcc 的文件夹,并在其中创建一个格式如下的 csv 文件:

TimeStamp,Count,Reading
1557011317299,45148,-0.015494
1557011317299,45153,-0.015963
1557011319511,45201,-0.015494
1557011319511,45221,-0.015494
1557011315134,45092,-0.015494
1557011315135,45107,-0.014085
1557011317299,45158,-0.015963
1557011317299,45163,-0.015494
1557011317299,45168,-0.015024`

【问题讨论】:

如果您使用 pandas_udf(用于 GROUPED_MAP),函数的输入将是 pandas dataframe,输出也是 pandas dataframe。我在这里没有看到任何问题。你有什么问题吗? 错误信息太长,所以我将其作为编辑添加到我的信息中。 请同时更新示例数据,这有助于其他人进行故障排除。也可能是数据问题。 @RangaVure 我刚刚用可以重现错误的代码和数据编辑了我的问题。我非常感谢您的反馈。进一步补充一下,我使用结构化流(而不是静态读取)的原因是 csv 文件会不断地被另一个流更新 【参考方案1】:

您返回的 pandas_udf 数据框与指定的架构不匹配。

请注意,pandas_udf 的输入将是 pandas 数据帧,并且还会返回 pandas 数据帧。

您可以使用 pandas_udf 中的所有 pandas 函数。唯一需要确保的是 ReturnDataShema 应该与函数的实际输出相匹配。

ReturnDataShema = StructType([StructField("TimeStamp", LongType(), True), \
                            StructField("Count", IntegerType(), True), \
                            StructField("Reading", FloatType(), True), \
                            StructField("TotalCount", FloatType(), True)])

@pandas_udf(ReturnDataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        # This following stmt is causing schema mismatch
        # return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
        # If you want to return all the rows of pandas dataframe
        # you can simply
        # return pdf
        # If you want to do any aggregations, you can do like the below, or use pandas query
        # but make sure the return pandas dataframe complies with ReturnDataShema
        total_count = pdf['Count'].sum()
        return pd.DataFrame([(pdf.TimeStamp[0],pdf.Count[0],pdf.Reading[0],total_count)])

【讨论】:

以上是关于使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas的主要内容,如果未能解决你的问题,请参考以下文章

Spark pandas_udf 并不快

结构化流是如何执行 pandas_udf 的?

将 pyspark pandas_udf 与 AWS EMR 一起使用时出现“没有名为‘pandas’的模块”错误

在 pyspark 中使用 pandas_udf 过滤数据框

将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException

PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列