从 Row 创建 DataFrame 会导致“推断架构问题”

Posted

技术标签:

【中文标题】从 Row 创建 DataFrame 会导致“推断架构问题”【英文标题】:Creating a DataFrame from Row results in 'infer schema issue' 【发布时间】:2017-07-06 12:08:06 【问题描述】:

当我开始学习 PySpark 时,我使用了一个列表来创建一个dataframe。现在从列表中推断模式已被弃用,我收到一个警告,它建议我改用pyspark.sql.Row。但是,当我尝试使用Row 创建一个时,我遇到了推断架构问题。这是我的代码:

>>> row = Row(name='Severin', age=33)
>>> df = spark.createDataFrame(row)

这会导致以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/spark2-client/python/pyspark/sql/session.py", line 526, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/spark2-client/python/pyspark/sql/session.py", line 390, in _createFromLocal
    struct = self._inferSchemaFromList(data)
  File "/spark2-client/python/pyspark/sql/session.py", line 322, in _inferSchemaFromList
    schema = reduce(_merge_type, map(_infer_schema, data))
  File "/spark2-client/python/pyspark/sql/types.py", line 992, in _infer_schema
    raise TypeError("Can not infer schema for type: %s" % type(row))
TypeError: Can not infer schema for type: <type 'int'>

所以我创建了一个架构

>>> schema = StructType([StructField('name', StringType()), 
...                      StructField('age',IntegerType())])
>>> df = spark.createDataFrame(row, schema)

然后,这个错误被抛出。

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/spark2-client/python/pyspark/sql/session.py", line 526, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/spark2-client/python/pyspark/sql/session.py", line 387, in _createFromLocal
    data = list(data)
  File "/spark2-client/python/pyspark/sql/session.py", line 509, in prepare
    verify_func(obj, schema)
  File "/spark2-client/python/pyspark/sql/types.py", line 1366, in _verify_type
    raise TypeError("StructType can not accept object %r in type %s" % (obj, type(obj)))
TypeError: StructType can not accept object 33 in type <type 'int'>

【问题讨论】:

这是文档中的一个示例:EXAMPLE How to create dataframe from list in Spark SQL?的可能重复 @Jeremy 你有没有回答过这个问题?我清楚地说我知道如何从list 创建一个DataFrame,但是当我使用pyspark.sql.Row 创建一个时出现错误。我从下面的@Daniel De Paula 那里得到了我的问题的答案。在将某些内容标记为重复之前,至少检查一次问题。 【参考方案1】:

createDataFrame 函数采用 行列表(以及其他选项)加上架构,因此正确的代码类似于:

from pyspark.sql.types import *
from pyspark.sql import Row

schema = StructType([StructField('name', StringType()), StructField('age',IntegerType())])
rows = [Row(name='Severin', age=33), Row(name='John', age=48)]
df = spark.createDataFrame(rows, schema)

df.printSchema()
df.show()

输出:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Severin| 33|
|   John| 48|
+-------+---+

在 pyspark 文档 (link) 中,您可以找到有关 createDataFrame 函数的更多详细信息。

【讨论】:

【参考方案2】:

您需要创建一个 Row 类型的列表并将该列表与架构一起传递给您的 createDataFrame() 方法。示例示例

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
department1 = Row(id='AAAAAAAAAAAAAA', type='XXXXX',cost='2')
department2 = Row(id='AAAAAAAAAAAAAA', type='YYYYY',cost='32')
department3 = Row(id='BBBBBBBBBBBBBB', type='XXXXX',cost='42')
department4 = Row(id='BBBBBBBBBBBBBB', type='YYYYY',cost='142')
department5 = Row(id='BBBBBBBBBBBBBB', type='ZZZZZ',cost='149')
department6 = Row(id='CCCCCCCCCCCCCC', type='XXXXX',cost='15')
department7 = Row(id='CCCCCCCCCCCCCC', type='YYYYY',cost='23')
department8 = Row(id='CCCCCCCCCCCCCC', type='ZZZZZ',cost='10')

schema = StructType([StructField('id', StringType()), StructField('type',StringType()),StructField('cost', StringType())])
rows = [department1,department2,department3,department4,department5,department6,department7,department8 ]
df = spark.createDataFrame(rows, schema)

【讨论】:

【参考方案3】:

如果您只是制作 pandas 数据框,则可以将每个 Row 转换为 dict,然后依赖 pandas 的类型推断,如果这足以满足您的需求。这对我有用:

import pandas as pd

sample = output.head(5) #this returns a list of Row objects

df = pd.DataFrame([x.asDict() for x in sample])

【讨论】:

【参考方案4】:

我最近遇到了类似的问题,这里的答案帮助我更好地理解了这个问题。

我的代码:

row = Row(name="Alice", age=11)  
spark.createDataFrame(row).show()

导致了一个非常相似的错误:

An error was encountered:  
Can not infer schema for type: <class 'int'>  
Traceback ... 

问题的原因: createDataFrame 需要一个行数组。因此,如果您只有一行并且不想发明更多,只需将其设为数组:[row]

row = Row(name="Alice", age=11)
spark.createDataFrame([row]).show()

【讨论】:

以上是关于从 Row 创建 DataFrame 会导致“推断架构问题”的主要内容,如果未能解决你的问题,请参考以下文章

07 从RDD创建DataFrame

07 从RDD创建DataFrame

为啥 Spark 在创建 DataFrame 时会推断二进制而不是 Array[Byte]?

从 JavaRDD<Row> 创建的 Spark DataFrame 将所有列数据复制到第一列

spark 如何从 JSON 推断数字类型?

创建 Spark DataFrame。无法推断类型的架构:<type 'float'>