如何在保留现有架构的同时从行中创建 DataFrame?
Posted
技术标签:
【中文标题】如何在保留现有架构的同时从行中创建 DataFrame?【英文标题】:How to create a DataFrame out of rows while retaining existing schema? 【发布时间】:2015-12-23 15:26:18 【问题描述】:如果我调用 map 或 mapPartition
并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?
目前我正在做类似的事情:
def combine(partition):
rows = [x for x in partition]
dfpart = pd.DataFrame(rows,columns=rows[0].keys())
pandafunc(dfpart)
mydf.mapPartition(combine)
【问题讨论】:
【参考方案1】:Spark >= 2.3.0
从 Spark 2.3.0 开始,可以按分区或组使用 Pandas Series
或 DataFrame
。例如:
火花
创建本地 PySpark 的自然方式是什么
没有这样的事情。 Spark 分布式数据结构不能嵌套,或者您更喜欢不能嵌套动作或转换的另一个视角。
或 Pandas 数据帧
这相对容易,但你至少要记住几件事:
Pandas 和 Spark DataFrames 甚至不完全等同。它们是不同的结构,具有不同的属性,通常不能用另一种替代。 分区可以为空。 看起来您正在传递字典。请记住,基本 Python 字典是无序的(例如,与collections.OrderedDict
不同)。因此,传递列可能无法按预期工作。
import pandas as pd
rdd = sc.parallelize([
"x": 1, "y": -1,
"x": -3, "y": 0,
"x": -0, "y": 4
])
def combine(iter):
rows = list(iter)
return [pd.DataFrame(rows)] if rows else []
rdd.mapPartitions(combine).first()
## x y
## 0 1 -1
【讨论】:
谢谢你的解释帮助。该方法类似于我现在使用的方法,但是除了列名之外,是否有一种自然的方式来传递行模式? 我不确定我是否理解这个问题。 Pandas DataFrames 使用可以在闭包中传递的列和 dtype 参数,但 Spark 无法识别这些参数。如果你想要一个 Spark DataFrame,你应该传递这个createDataFrame
并在那里传递架构(它不同于 Pandas dtypes)。
我想维护模式,就像我想象的正常 toPandas 一样。如果我知道如何调用 createDataFrame 并维护行模式,那么调用 createDataFrame 然后调用 toPandas 就可以了。虽然我猜可能会降低效率?
toPandas
简单地收集并创建与 Spark 数据帧具有相同 columns
名称的本地数据结构。不多也不少。 Row
(如pyspark.sql.Row
)没有架构——它只是一个tuple
,添加了一些方法和__fields__
存储名称的属性。
有趣,toPandas 也不强制列类型?并且类型没有内置到行中?【参考方案2】:
你可以使用toPandas()
,
pandasdf = mydf.toPandas()
【讨论】:
这不能回答我的问题,我需要它在分区的 map 调用中运行。如果有一张地图可以传递数据框,那也很好。 对不起,我听不懂map that passes dataframe
。火花数据帧的输出预期是什么?您想为每个分区创建数据框吗?
mapPartition 为每个分区传递一个 Row 迭代器,所以我不能使用数据框函数【参考方案3】:
实际上可以在执行器中将 Spark 行转换为 Pandas,并最终使用 mapPartitions
从这些输出中创建 Spark DataFrame。 See my gist in Github
# Convert function to use in mapPartitions
def rdd_to_pandas(rdd_):
# convert rows to dict
rows = (row_.asDict() for row_ in rdd_)
# create pandas dataframe
pdf = pd.DataFrame(rows)
# Rows/Pandas DF can be empty depending on patiition logic.
# Make sure to check it here, otherwise it will throw untrackable error
if len(pdf) > 0:
#
# Do something with pandas DataFrame
#
pass
return pdf.to_dict(orient='records')
# Create Spark DataFrame from resulting RDD
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))
【讨论】:
【参考方案4】:为了创建 Spark SQL 数据框,您需要一个配置单元上下文:
hc = HiveContext(sparkContext)
使用 HiveContext,您可以通过 inferSchema 函数创建 SQL 数据框:
sparkSQLdataframe = hc.inferSchema(rows)
【讨论】:
好点。这仅适用于 RDD。因此,您可以在调用 mapPartition 之前为变量“combine”调用它。 另外,如果您立即将数据作为数据框读取会更好。您可以对 JSON、Hive 表等多个输入源执行此操作 我有一个数据框,但是当我调用 mapPartition 时,每个从节点都会看到一个行迭代器,为方便起见,我想合并这些行 啊,现在我明白了。不幸的是,我想不出一个好的解决方案。如果您将行组合成一个数据框,您希望应用哪个数据框操作? 最终到Pandas,然后是pandas函数。上面概述的方法可行,但感觉很hacky以上是关于如何在保留现有架构的同时从行中创建 DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章
Oracle Apex 19.2:无法解决“架构被保留或受限”问题
使用 _PARTITIONTIME 从现有表在 BigQuery 中创建聚簇表