pyspark createdataframe:字符串解释为时间戳,模式混合列
Posted
技术标签:
【中文标题】pyspark createdataframe:字符串解释为时间戳,模式混合列【英文标题】:pyspark createdataframe: string interpreted as timestamp, schema mixes up columns 【发布时间】:2017-06-20 22:24:22 【问题描述】:我有一个非常奇怪的火花数据帧错误,导致字符串被评估为时间戳。
这是我的设置代码:
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
new_schema = StructType([StructField("item_id", StringType(), True),
StructField("date", TimestampType(), True),
StructField("description", StringType(), True)
])
df = sqlContext.createDataFrame([Row(description='description', date=datetime.utcnow(), item_id='id_string')], new_schema)
这给了我以下错误:
AttributeError Traceback(最近调用 最后)在() ----> 1 df = sqlContext.createDataFrame([Row(description='hey', date=datetime.utcnow(), item_id='id_string')], new_schema)
/home/florian/spark/python/pyspark/sql/context.pyc 在 createDataFrame(self, data, schema, samplingRatio, verifySchema) 307 Py4JJava错误:... 第308章 --> 309 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema) 310 第311章
/home/florian/spark/python/pyspark/sql/session.pyc 在 createDataFrame(self, data, schema, samplingRatio, verifySchema) 522 rdd,架构 = self._createFromRDD(data.map(准备),架构,采样率) 523 其他: --> 524 rdd, schema = self._createFromLocal(map(prepare, data), schema) 第525章 526 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(),schema.json())
/home/florian/spark/python/pyspark/sql/session.pyc 在 _createFromLocal(自我,数据,模式) 397 398#将python对象转换为sql数据 --> 399 data = [schema.toInternal(row) for row in data] 400返回self._sc.parallelize(数据),架构 401
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, 对象) 第574章 第575章 --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 第577章 578 d = obj.dict
/home/florian/spark/python/pyspark/sql/types.pyc in ((f, v)) 第574章 第575章 --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 第577章 578 d = obj.dict
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, 对象) 434 第435章 --> 436 返回 self.dataType.toInternal(obj) 437 第438章
/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, dt) 第188章 189 如果 dt 不是无: --> 190 秒 = (calendar.timegm(dt.utctimetuple()) 如果 dt.tzinfo 第191章 192 返回整数(秒 * 1e6 + dt.微秒)
AttributeError: 'str' 对象没有属性 'tzinfo'
这看起来好像一个字符串被传递给 TimestampType.toInternal()
真正奇怪的是这个数据框产生了同样的错误:
df = sqlContext.createDataFrame([Row(description='hey', date=None, item_id='id_string')], new_schema)
虽然这个有效:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id='id_string')], new_schema)
这个也可以:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
对我来说,这意味着 pyspark 以某种方式将“item_id”中的值放入“date”列,因此会产生此错误。 我做错什么了吗?这是数据帧中的错误吗?
信息: 我正在使用 pyspark 2.0.1
编辑:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
df.first()
Row(item_id=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset =0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=1,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH= 3,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=3,HOUR_OF_DAY=15,MINUTE=19,SECOND=30,MILLISECOND=85,ZONE_OFFSET=?,DST_OFFSET=?]', 日期=无,描述=无)
【问题讨论】:
【参考方案1】:当您创建 Row 对象时,字段按字母顺序排序 (http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.Row),因此当您创建 Row(description, date, item_id)
对象时,它将按 (date, description, item_id)
排序。
由于您的架构按StringType, TimestampType, StringType
排序,因此在使用此行和架构创建 DataFrame 时,Spark 会将date
中的内容映射到StringType
,将description
中的内容映射到TimestampType
和item_id
到 StringType
。
将时间戳(datetime
格式)传递给 StringType
不会导致错误,但将字符串传递给 TimestampType
会导致错误,因为它要求 tzinfo
属性,作为错误声明,String 对象没有它。
此外,对您有效的数据帧实际有效的原因是 None
被传递给架构中的 TimestampType
,这是一个可接受的值。
【讨论】:
我遇到了同样的问题。如上所述,只需将架构定义字段重新排序以按字母顺序列出即可解决我的问题。 谢谢!这让我发疯了【参考方案2】:基于上述@rafael-zanetti 的答案。您可以执行以下操作来对列进行排序:
new_schema = [StructField("item_id", StringType(), True),
StructField("date", TimestampType(), True),
StructField("description", StringType(), True)]
new_schema = StructType(sorted(new_schema, key=lambda f: f.name))
【讨论】:
以上是关于pyspark createdataframe:字符串解释为时间戳,模式混合列的主要内容,如果未能解决你的问题,请参考以下文章