PySpark:不能使用日期时间年 = 0001 进行列操作
Posted
技术标签:
【中文标题】PySpark:不能使用日期时间年 = 0001 进行列操作【英文标题】:PySpark: Can't do column operations with datetime years = 0001 【发布时间】:2018-06-16 07:03:03 【问题描述】:我有一些时间戳的格式为“0001-mm-dd HH:MM:SS”的数据。我正在努力争取最少的时间。为了获得最短时间,我需要先转换为 DoubleType,因为 PySpark 数据帧的最小函数显然不适用于时间戳。但是,出于某种原因,日期时间讨厌 0001 年。无论我做什么,我都无法让它工作。下面,我尝试使用 UDF 手动将年份增加 1,但由于某种原因,它没有注册。但是,我可以使用没有 0001 年的另一列数据,并将函数中的 if 语句更改为数据中包含的年份,我可以观察到年份的变化。
我做错了什么?
from pyspark.sql import SQLContext
import pyspark.sql.functions as sfunc
import pyspark.sql.types as tp
from pyspark import SparkConf
from dateutil.relativedelta import relativedelta
columnname='x'
#columnname='y'
tmpdf.select(columnname).show(5)
def timeyearonecheck(date):
'''Datetimes breaks down at year = 0001, so bump up the year to 0002'''
if date.year == 1:
newdate=date+relativedelta(years=1)
return newdate
else:
return date
def timeConverter(timestamp):
'''Takes either a TimestampType() or a DateType() and converts it into a
float'''
timetuple=timestamp.timetuple()
if type(timestamp) == datetime.date:
timevalue=time.mktime(timetuple)
return int(timevalue)
else:
timevalue=time.mktime(timetuple)+timestamp.microsecond/1000000
return timevalue
tmptimedf1colname='tmpyeartime'
yearoneudf=sfunc.udf(timeyearonecheck,tp.TimestampType())
tmptimedf1=tmpdf.select(yearoneudf(sfunc.col(columnname)).alias(tmptimedf1colname))
tmptimedf2colname='numbertime'
timeudf=sfunc.udf(timeConverter,tp.DoubleType())
tmptimedf2=tmptimedf1.select(timeudf(sfunc.col(tmptimedf1colname)).alias(tmptimedf2colname))
minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]
+-------------------+
| x|
+-------------------+
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
+-------------------+
only showing top 5 rows
Py4JJavaError Traceback (most recent call last)
<ipython-input-42-b5725bf01860> in <module>()
17 timeudf=sfunc.udf(timeConverter,tp.DoubleType())
18
tmptimedf2=tmpdf.select(timeudf(sfunc.col(columnname)).
alias(tmptimedf2colname))
---> 19 minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]
20 print(minimum)
...
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 43.0 failed 4 times, most recent failure: Lost task 3.3 in stage
43.0 (TID 7829, 10.10.12.41, executor 39):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
ValueError: year 0 is out of range
即使我只是尝试查看第一个 UDF 的输出,也会出现错误,但只是在查看输出时,而不是在实际计算时。
tmptimedf1.select(tmptimedf1colname).show(5)
Py4JJavaError Traceback (most recent call last)
<ipython-input-44-5fc942678065> in <module>()
----> 1 tmptimedf1.select(tmptimedf1colname).show(5)
...
Py4JJavaError: An error occurred while calling o2215.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage
44.0 (TID 7984, 10.10.12.36, executor 4):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
...
ValueError: year 0 is out of range
此外,如果我这样做,我会在谈论第 0 年时得到相同的 ValueError:
tmpdf.select(columnname).first()
但前提是我使用年份为 0001 的列,而不是没有 0001 年份的“y”列。 'y' 列工作正常。
我不明白为什么我可以为 tmpdf 显示 5 个值,其中包括 0001,但我不能选择第一个值,因为它有 0001。
编辑:如下所述,我真的很想将 0001 年转换为 0002 年,因为 PySpark 的 approxQuantile 不适用于时间戳,而且一般来说,我对数据集了解得不够透彻,无法知道几年是可以接受的。 0001 年绝对是填充年,但 1970 年可能是我数据中的真实年份(在我的工作中一般情况下)。
到目前为止我已经得到了这个:
def tmpfunc(timestamp):
time=datetime.datetime.strptime(timestamp,'%Y-%m-%d %H:%M:%S')
return time
adf=datadf.select(sfunc.col(columnname).cast("string").alias('a'))
newdf = adf.withColumn('b',sfunc.regexp_replace('a', '0001-', '0002-'))
newdf.show(10)
print(newdf.first())
tmpudf=sfunc.udf(tmpfunc,tp.TimestampType())
newnewdf=newdf.select(tmpudf(sfunc.col('b')).alias('c'))
newnewdf.show(10)
print(newnewdf.first())
+-------------------+-------------------+
| a| b|
+-------------------+-------------------+
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2015-10-13 09:56:09|2015-10-13 09:56:09|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2013-11-05 21:28:09|2013-11-05 21:28:09|
|1993-12-24 03:52:47|1993-12-24 03:52:47|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
+-------------------+-------------------+
only showing top 10 rows
Row(a='0001-01-02 00:00:00', b='0002-01-02 00:00:00')
+-------------------+
| c|
+-------------------+
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|2015-10-13 09:56:09|
|0002-01-03 23:56:02|
|2013-11-05 21:28:09|
|1993-12-24 03:52:47|
|0002-01-03 23:56:02|
+-------------------+
only showing top 10 rows
Row(c=datetime.datetime(2, 1, 2, 0, 0))
正如用户在下面评论的那样,“节目”中的天数为 1 天 23 小时 56 分钟 2 秒。为什么,我该如何摆脱它?那么为什么我的“第一次”调用是正确的,但在 (2,1,2,0,0,0) 中也缺少一个 0?
【问题讨论】:
【参考方案1】:为了获得最短时间,我需要先转换为 DoubleType,因为 PySpark 数据帧的最小函数显然不适用于时间戳。
确实如此
df = spark.createDataFrame(
["0001-01-02 00:00:00", "0001-01-03 00:00:00"], "string"
).selectExpr("to_timestamp(value) AS x")
min_max_df = df.select(sfunc.min("x"), sfunc.max("x"))
min_max_df.show()
# +-------------------+-------------------+
# | min(x)| max(x)|
# +-------------------+-------------------+
# |0001-01-02 00:00:00|0001-01-03 00:00:00|
# +-------------------+-------------------+
失败的部分实际上是转换为本地值:
>>> min_max_df.first()
Traceback (most recent call last):
...
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
ValueError: year 0 is out of range
Epoch 时间戳的最小值是
>>> df.select(sfunc.col("x").cast("long")).first().x
-62135683200
当转换回日期时,这似乎被移回了 2 天(Scala 代码):
scala> java.time.Instant.ofEpochSecond(-62135683200L)
res0: java.time.Instant = 0000-12-31T00:00:00Z
因此在 Python 中不再有效。
假设0001
只是一个占位符,您可以在解析时忽略它:
df.select(sfunc.to_timestamp(
sfunc.col("x").cast("string"),
"0001-MM-dd HH:mm:ss").alias("x")
)).select(
sfunc.min("x"),
sfunc.max("x")
).first()
# Row(min(x)=datetime.datetime(1970, 1, 2, 1, 0), max(x)=datetime.datetime(1970, 1, 3, 1, 0))
您也可以直接将结果转换为字符串:
df.select(sfunc.min("x").cast("string"), sfunc.max("x").cast("string")).first()
# Row(CAST(min(x) AS STRING)='0001-01-02 00:00:00', CAST(max(x) AS STRING)='0001-01-03 00:00:00')
【讨论】:
啊,是的。日期时间的问题出现在我尝试使用 approxQuantile 并收到此错误的代码的后面部分中:Py4JJavaError:调用 o3334.approxQuantile 时发生错误。 :java.lang.IllegalArgumentException:要求失败:不支持数据类型 TimestampType 的列 x 的分位数计算。所以我还是需要把0001转换成别的东西,最好是0002,因为我对数据不熟悉,不想改太多。您的倒数第二个代码块似乎很有希望,但它不会转换为 0002 并且会将年份归零!= 0001。以上是关于PySpark:不能使用日期时间年 = 0001 进行列操作的主要内容,如果未能解决你的问题,请参考以下文章