Spark:如何通过 mapInPandas 正确转换数据帧

Posted

技术标签:

【中文标题】Spark:如何通过 mapInPandas 正确转换数据帧【英文标题】:Spark: How to correctly transform dataframe by mapInPandas 【发布时间】:2020-12-27 11:38:42 【问题描述】:

我正在尝试通过最新的 spark 3.0.1 函数 mapInPandas 转换具有 10k 行的 spark 数据帧。

预期输出:映射的 pandas_function() 将一行转换为三行,因此输出的 transform_df 应该有 30k 行

当前输出:我得到 3 行 1 核和 24 行 8 核。

输入:respond_sdf 有 10k 行

+-----+-------------------------------------------------------------------+     
|url  |content                                                            |
+-----+-------------------------------------------------------------------+
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
|api_1|'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]    |
|api_2|'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] |
+-----+-------------------------------------------------------------------+
only showing top 20 rows
Input respond_sdf has 10000 rows

OUTPUT A) 3 行 - 1 个核心 - .master('local [1]')

'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]     (0 + 1) / 1]
+-----+---+---+                                                                 
|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
+-----+---+---+

'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
 Output transformed_df has 3 rows

OUTPUT B) 24 行 - 8 核 - .master('local[8]')

'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]     (0 + 1) / 1]
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]                 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
+-----+---+---+
only showing top 20 rows

'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6]     (3 + 5) / 8]
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] 
 Output transformed_df has 24 rows    

示例代码:

#### IMPORT PYSPARK ###
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
    .master('local[1]') \
    .getOrCreate()
sc = spark.sparkContext

####### INPUT DATAFRAME WITH LIST OF JSONS ########################

# Create list with 10k nested tuples(url,content)
rdd_list = [('api_1',"'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] "),
            (' api_2', "'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] ")]*5000

schema = StructType([
  StructField('url', StringType(), True),
  StructField('content', StringType(), True)
  ])

#Create input dataframe with 10k rows
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
respond_sdf.show(truncate=False)

print(f'Input respond_sdf has respond_sdf.count() rows')

####### TRANSFORMATION DATAFRAME ########################

# Pandas transformation function returning pandas dataframe
def pandas_function(iter):
    for df in iter:
        print(df['content'][0])
        yield pd.DataFrame(eval(df['content'][0]))

transformed_df = respond_sdf.mapInPandas(pandas_function, "api string, A int, B int")
transformed_df.show()
print(f' Output transformed_df has transformed_df.count() rows')
print(f'Expected output dataframe should has 30k rows')

相关讨论链接: How to yield pandas dataframe rows to spark dataframe

【问题讨论】:

【参考方案1】:

抱歉,在我对您上一个问题的回答中,使用 mapInPandas 的部分不正确。我认为下面这个函数是编写 pandas 函数的正确方法。上次我犯了一个错误,因为我之前认为iter 是一个可迭代的行,但它实际上是一个可迭代的数据帧。

def pandas_function(iter):
    for df in iter:
        yield pd.concat(pd.DataFrame(x) for x in df['content'].map(eval))

(PS 感谢here的回答。)

【讨论】:

你是明星! @mck 我尽可能地投票。还有你的 PS 问题,因为它有助于更​​多地了解它是如何工作的。最后,pandas UDF 调试起来还是很简单的,spark df 可以转成pandas df,然后直接调用udf 函数中yield 后面的pandas 表达式,方便调试。 @Dan 不错,那你就不用担心在 vscode 中调试 spark udfs 了 :) 当我知道它是如何工作的时候,不再使用这个 udf,但是如果我能在 udf 中停下来并在 vscode 中调试,它将节省很多时间,非常感谢您的帮助!跨度> 【参考方案2】:

其实有一个工具可以让你在 UDF 中停止并在 VSCode 中调试,查看 pyspark_xray 库,它的 demo app 演示了如何使用 pyspark_xray 的 wrapper_sdf_mapinpandas 函数来步入传递给 @ 的 Pandas UDF 987654324@函数。

【讨论】:

以上是关于Spark:如何通过 mapInPandas 正确转换数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 pyspark 正确使用 rdd.map 中的模块

如何正确使用Hadoop YARN Restful api提交spark应用

如何使用 dotnet spark 正确实例化 spark 会话?

如何正确处理 spark.sql.AnalysisException

如何使用用户提供的 Hadoop 正确配置 Spark 2.4

如何使用 spark 在 Hive 中正确加载数据?