udf 创建的时间序列无法写入镶木地板
Posted
技术标签:
【中文标题】udf 创建的时间序列无法写入镶木地板【英文标题】:udf created time series having trouble writing to parquet 【发布时间】:2020-03-18 07:33:27 【问题描述】:我有下面的 pyspark 代码。在其中,我将数据框 tz_inventory_aud_df2 中缺少的 end_date 值填充为远在未来的日期。我从同一个数据框中获得了最小的 start_date。然后我为从最小 start_date 到当前日期的每个日期创建一个时间序列。我使用 udf 创建具有这些日期的数据帧,然后从该数据帧离开连接到 tz_inventory_aud_df 以获得由我创建的数据帧中的每个日期过滤的字段总和。当我尝试最终将数据帧写为镶木地板文件时,我的驱动程序日志中出现以下错误。有谁知道是什么导致了这个错误,你能建议如何解决它吗?
代码:
tz_inventory_aud_df2=tz_inventory_aud_df.fillna('end_date':'3018-01-01 00:00:00')
bs_df=tz_inventory_aud_df.agg('start_date':'min')\
.withColumn('min_date',to_date(col('min(start_date)')))
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
bs_df = bs_df.withColumn('current_date',to_date(unix_timestamp(lit(timestamp),'yyyy-MM-dd').cast("timestamp")))
# creating time-series dataframe
# UDF
def generate_date_series(start, stop):
return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]
# Register UDF for later usage
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()) )
# mydf is a DataFrame with columns `start` and `stop` of type DateType()
bs_df.createOrReplaceTempView("mydf")
filldate_df=spark.sql("SELECT explode(generate_date_series(min_date, current_date)) as dates FROM mydf")
daily_af_units=filldate_df.alias('a').join(tz_inventory_aud_df2.alias('b'),
(col('b.current_flag')==1)
&(col('a.dates')>=col('b.start_date'))
&(col('a.dates')<col('b.end_date')),
how='inner'
)\
.select(col('b.product_id'),
col('a.dates'),
(col('b.available_units')+col('b.reserved_units')+col('b.packed_and_ready_units')).alias('daily_product_remaining')
)\
.alias('c')\
.groupby(['product_id','dates']).sum()
daily_af_units=daily_af_units.withColumn("daily_product_remaining",daily_af_units["sum(daily_product_remaining)"])
daily_af_units=daily_af_units[['product_id', 'dates', 'daily_product_remaining']]
daily_af_units.write.mode("overwrite").parquet(bckt_pth1+'daily_units_remaining')
错误:
2020-03-17 08:03:05,437 WARN [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(66)) - Lost task 0.1 in stage 12651.0 (TID 479153, ip-10-100-7-60.glue.dnsmasq, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/root/appcache/application_1584428038308_0005/container_1584428038308_0005_01_000013/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/root/appcache/application_1584428038308_0005/container_1584428038308_0005_01_000013/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/root/appcache/application_1584428038308_0005/container_1584428038308_0005_01_000013/pyspark.zip/pyspark/worker.py", line 248, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/mnt/yarn/usercache/root/appcache/application_1584428038308_0005/container_1584428038308_0005_01_000013/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
return lambda *a: toInternal(f(*a))
File "/mnt/yarn/usercache/root/appcache/application_1584428038308_0005/container_1584428038308_0005_01_000013/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "script_2020-03-17-06-55-38.py", line 1839, in generate_date_series
return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]
TypeError: unsupported operand type(s) for -: 'datetime.date' and 'NoneType'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
更新:
tz_inventory_aud_df2=tz_inventory_aud_df[tz_inventory_aud_df['current_flag']==1]\
.fillna('end_date':'3018-01-01 00:00:00',
'start_date':'1990-01-01 00:00:00')
bs_df=tz_inventory_aud_df2.agg('start_date':'min')\
.withColumn('min_date',to_date(col('min(start_date)')))
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
bs_df = bs_df.withColumn('current_date',to_date(unix_timestamp(lit(timestamp),'yyyy-MM-dd').cast("timestamp")))
# creating time-series dataframe
# UDF
def generate_date_series(start, stop):
return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]
# Register UDF for later usage
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()) )
# mydf is a DataFrame with columns `start` and `stop` of type DateType()
bs_df.createOrReplaceTempView("mydf")
filldate_df=spark.sql("SELECT explode(generate_date_series(min_date, current_date)) as dates FROM mydf")
daily_af_units=filldate_df.alias('a').join(tz_inventory_aud_df2.alias('b'),
(col('b.current_flag')==1)
&(col('a.dates')>=col('b.start_date'))
&(col('a.dates')<col('b.end_date')),
how='inner'
)\
.select(col('b.product_id'),
col('a.dates'),
(col('b.available_units')+col('b.reserved_units')+col('b.packed_and_ready_units')).alias('daily_product_remaining')
)\
.alias('c')\
.groupby(['product_id','dates']).sum()
daily_af_units=daily_af_units.withColumn("daily_product_remaining",daily_af_units["sum(daily_product_remaining)"])
daily_af_units=daily_af_units[['product_id', 'dates', 'daily_product_remaining']]
【问题讨论】:
【参考方案1】:问题不是由于写操作引起的(请记住,spark 是基于惰性计算的),而是由于此操作:
from datetime import date
import time
date.fromtimestamp(time.time()) - None
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-6-059d3edeb205> in <module>
1 from datetime import date
2 import time
----> 3 date.fromtimestamp(time.time()) - None
TypeError: unsupported operand type(s) for -: 'datetime.date' and 'NoneType'
在执行您的 udf 时会发生这种情况,这肯定由空值引起在某些时候出现在您的数据框中。
查看您的代码是您尝试但在第一行删除空值。但是在第二行 您没有使用生成的数据框...我不确定这是预期的行为。所以改变你的第一行:
tz_inventory_aud_df2=tz_inventory_aud_df.fillna('end_date':'3018-01-01 00:00:00')
bs_df=tz_inventory_aud_df2.agg('start_date':'min')\
.withColumn('min_date',to_date(col('min(start_date)')))
【讨论】:
感谢您指出这一点。我在上面添加了更新,稍后我在代码中使用了正确版本的数据框。我还尝试填写任何 null 'start_date',这样 udf 就不会出现问题。但是我在尝试写作时仍然遇到同样的错误。有什么我不明白的吗?错误是由 start_date 中的空值引起的,对吧?以上是关于udf 创建的时间序列无法写入镶木地板的主要内容,如果未能解决你的问题,请参考以下文章
将带有 timedeltas 的 pandas 数据帧写入镶木地板
如何在 python 的 S3 中从 Pandas 数据帧写入镶木地板文件