pyspark createdataframe:字符串解释为时间戳,模式混合列

Posted

技术标签:

【中文标题】pyspark createdataframe:字符串解释为时间戳,模式混合列【英文标题】:pyspark createdataframe: string interpreted as timestamp, schema mixes up columns 【发布时间】:2017-06-20 22:24:22 【问题描述】:

我有一个非常奇怪的火花数据帧错误,导致字符串被评估为时间戳

这是我的设置代码:

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

new_schema = StructType([StructField("item_id", StringType(), True),
                         StructField("date", TimestampType(), True),
                         StructField("description", StringType(), True)
                        ])

df = sqlContext.createDataFrame([Row(description='description', date=datetime.utcnow(), item_id='id_string')], new_schema)

这给了我以下错误:

AttributeError Traceback(最近调用 最后)在() ----> 1 df = sqlContext.createDataFrame([Row(description='hey', date=datetime.utcnow(), item_id='id_string')], new_schema)

/home/florian/spark/python/pyspark/sql/context.pyc 在 createDataFrame(self, data, schema, samplingRatio, verifySchema) 307 Py4JJava错误:... 第308章 --> 309 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema) 310 第311章

/home/florian/spark/python/pyspark/sql/session.pyc 在 createDataFrame(self, data, schema, samplingRatio, verifySchema) 522 rdd,架构 = self._createFromRDD(data.map(准备),架构,采样率) 523 其他: --> 524 rdd, schema = self._createFromLocal(map(prepare, data), schema) 第525章 526 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(),schema.json())

/home/florian/spark/python/pyspark/sql/session.pyc 在 _createFromLocal(自我,数据,模式) 397 398#将python对象转换为sql数据 --> 399 data = [schema.toInternal(row) for row in data] 400返回self._sc.parallelize(数据),架构 401

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, 对象) 第574章 第575章 --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 第577章 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in ((f, v)) 第574章 第575章 --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 第577章 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, 对象) 434 第435章 --> 436 返回 self.dataType.toInternal(obj) 437 第438章

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, dt) 第188章 189 如果 dt 不是无: --> 190 秒 = (calendar.timegm(dt.utctimetuple()) 如果 dt.tzinfo 第191章 192 返回整数(秒 * 1e6 + dt.微秒)

AttributeError: 'str' 对象没有属性 'tzinfo'

这看起来好像一个字符串被传递给 TimestampType.toInternal()

真正奇怪的是这个数据框产生了同样的错误:

df = sqlContext.createDataFrame([Row(description='hey', date=None, item_id='id_string')], new_schema)

虽然这个有效:

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id='id_string')], new_schema)

这个也可以:

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)

对我来说,这意味着 pyspark 以某种方式将“item_id”中的值放入“date”列,因此会产生此错误。 我做错什么了吗?这是数据帧中的错误吗?

信息: 我正在使用 pyspark 2.0.1

编辑:

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
df.first()

Row(item_id=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset =0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=1,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH= 3,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=3,HOUR_OF_DAY=15,MINUTE=19,SECOND=30,MILLISECOND=85,ZONE_OFFSET=?,DST_OFFSET=?]', 日期=无,描述=无)

【问题讨论】:

【参考方案1】:

当您创建 Row 对象时,字段按字母顺序排序 (http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.Row),因此当您创建 Row(description, date, item_id) 对象时,它将按 (date, description, item_id) 排序。

由于您的架构按StringType, TimestampType, StringType 排序,因此在使用此行和架构创建 DataFrame 时,Spark 会将date 中的内容映射到StringType,将description 中的内容映射到TimestampTypeitem_idStringType

将时间戳(datetime 格式)传递给 StringType 不会导致错误,但将字符串传递给 TimestampType 会导致错误,因为它要求 tzinfo 属性,作为错误声明,String 对象没有它。

此外,对您有效的数据帧实际有效的原因是 None 被传递给架构中的 TimestampType,这是一个可接受的值。

【讨论】:

我遇到了同样的问题。如上所述,只需将架构定义字段重新排序以按字母顺序列出即可解决我的问题。 谢谢!这让我发疯了【参考方案2】:

基于上述@rafael-zanetti 的答案。您可以执行以下操作来对列进行排序:

new_schema = [StructField("item_id", StringType(), True),
                     StructField("date", TimestampType(), True),
                     StructField("description", StringType(), True)]
new_schema = StructType(sorted(new_schema, key=lambda f: f.name))

【讨论】:

以上是关于pyspark createdataframe:字符串解释为时间戳,模式混合列的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:从数据框列表创建 RDD

使用带有熊猫数据的 CreateDataFrame 时将 NaN 替换为 null

撤消规模数据pyspark

GraphFrames 的 PySpark 异常

拆分pyspark中的列

Pyspark:过滤多列上的行