如何在保留现有架构的同时从行中创建 DataFrame?

Posted

技术标签:

【中文标题】如何在保留现有架构的同时从行中创建 DataFrame?【英文标题】:How to create a DataFrame out of rows while retaining existing schema? 【发布时间】:2015-12-23 15:26:18 【问题描述】:

如果我调用 map 或 mapPartition 并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?

目前我正在做类似的事情:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)

【问题讨论】:

【参考方案1】:

Spark >= 2.3.0

从 Spark 2.3.0 开始,可以按分区或组使用 Pandas SeriesDataFrame。例如:

Applying UDFs on GroupedData in PySpark (with functioning python example) Efficient string suffix detection

火花

创建本地 PySpark 的自然方式是什么

没有这样的事情。 Spark 分布式数据结构不能嵌套,或者您更喜欢不能嵌套动作或转换的另一个视角。

或 Pandas 数据帧

这相对容易,但你至少要记住几件事:

Pandas 和 Spark DataFrames 甚至不完全等同。它们是不同的结构,具有不同的属性,通常不能用另一种替代。 分区可以为空。 看起来您正在传递字典。请记住,基本 Python 字典是无序的(例如,与 collections.OrderedDict 不同)。因此,传递列可能无法按预期工作。
import pandas as pd

rdd = sc.parallelize([
    "x": 1, "y": -1, 
    "x": -3, "y": 0,
    "x": -0, "y": 4
])

def combine(iter):
    rows = list(iter)
    return [pd.DataFrame(rows)] if rows else []

rdd.mapPartitions(combine).first()
##    x  y
## 0  1 -1

【讨论】:

谢谢你的解释帮助。该方法类似于我现在使用的方法,但是除了列名之外,是否有一种自然的方式来传递行模式? 我不确定我是否理解这个问题。 Pandas DataFrames 使用可以在闭包中传递的列和 dtype 参数,但 Spark 无法识别这些参数。如果你想要一个 Spark DataFrame,你应该传递这个 createDataFrame 并在那里传递架构(它不同于 Pandas dtypes)。 我想维护模式,就像我想象的正常 toPandas 一样。如果我知道如何调用 createDataFrame 并维护行模式,那么调用 createDataFrame 然后调用 toPandas 就可以了。虽然我猜可能会降低效率? toPandas 简单地收集并创建与 Spark 数据帧具有相同 columns 名称的本地数据结构。不多也不少。 Row(如pyspark.sql.Row)没有架构——它只是一个tuple,添加了一些方法和__fields__存储名称的属性。 有趣,toPandas 也不强制列类型?并且类型没有内置到行中?【参考方案2】:

你可以使用toPandas()

pandasdf = mydf.toPandas()

【讨论】:

这不能回答我的问题,我需要它在分区的 map 调用中运行。如果有一张地图可以传递数据框,那也很好。 对不起,我听不懂map that passes dataframe。火花数据帧的输出预期是什么?您想为每个分区创建数据框吗? mapPartition 为每个分区传递一个 Row 迭代器,所以我不能使用数据框函数【参考方案3】:

实际上可以在执行器中将 Spark 行转换为 Pandas,并最终使用 mapPartitions 从这些输出中创建 Spark DataFrame。 See my gist in Github

# Convert function to use in mapPartitions
def rdd_to_pandas(rdd_):
    # convert rows to dict
    rows = (row_.asDict() for row_ in rdd_)
    # create pandas dataframe
    pdf = pd.DataFrame(rows)

    # Rows/Pandas DF can be empty depending on patiition logic.
    # Make sure to check it here, otherwise it will throw untrackable error
    if len(pdf) > 0:
        #
        # Do something with pandas DataFrame 
        #
        pass

    return pdf.to_dict(orient='records')

# Create Spark DataFrame from resulting RDD
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))

【讨论】:

【参考方案4】:

为了创建 Spark SQL 数据框,您需要一个配置单元上下文:

hc = HiveContext(sparkContext)

使用 HiveContext,您可以通过 inferSchema 函数创建 SQL 数据框:

sparkSQLdataframe = hc.inferSchema(rows)  

【讨论】:

好点。这仅适用于 RDD。因此,您可以在调用 mapPartition 之前为变量“combine”调用它。 另外,如果您立即将数据作为数据框读取会更好。您可以对 JSON、Hive 表等多个输入源执行此操作 我有一个数据框,但是当我调用 mapPartition 时,每个从节点都会看到一个行迭代器,为方便起见,我想合并这些行 啊,现在我明白了。不幸的是,我想不出一个好的解决方案。如果您将行组合成一个数据框,您希望应用哪个数据框操作? 最终到Pandas,然后是pandas函数。上面概述的方法可行,但感觉很hacky

以上是关于如何在保留现有架构的同时从行中创建 DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

Oracle Apex 19.2:无法解决“架构被保留或受限”问题

如何在现有对象中创建javascript对象[重复]

如何在 JSF 中创建现有组件的组合?

使用 _PARTITIONTIME 从现有表在 BigQuery 中创建聚簇表

如何在具有自动布局的 Interface Builder 中创建的现有视图中添加视图?

如何在现有数据库中创建 ASP.Net Identity 表?