Pyspark:pyarrow.lib.ArrowTypeError:需要一个整数(获取类型时间戳)
Posted
技术标签:
【中文标题】Pyspark:pyarrow.lib.ArrowTypeError:需要一个整数(获取类型时间戳)【英文标题】:Pyspark: pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp) 【发布时间】:2020-01-21 16:18:17 【问题描述】:我正在将 spark 数据框写入 bigquery 表。这是可行的,但现在我在将数据写入 bigquery 之前调用了 pandas udf。出于某种原因,当我在将 spark 数据帧写入 bigquery 之前调用 pandas udf 时,我现在看到以下错误:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 240, in create_array
return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp)
从下面的执行程序日志中,哪一个看起来像是由错误的 parquet 架构引起的,其中时间戳列被推断为整数?
20/01/20 22:45:38 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
"type" : "struct",
"fields" : [
"name" : "id",
"type" : "string",
"nullable" : true,
"metadata" :
,
"name" : "firstname",
"type" : "string",
"nullable" : true,
"metadata" :
,
"name" : "status",
"type" : "string",
"nullable" : true,
"metadata" :
,
"name" : "entry_date",
"type" : "timestamp",
"nullable" : true,
"metadata" :
,
"name" : "last_status_date",
"type" : "timestamp",
"nullable" : true,
"metadata" :
]
and corresponding Parquet message type:
message spark_schema
optional binary id (UTF8);
optional binary firstname (UTF8);
optional binary status (UTF8);
optional int96 entry_date;
optional int96 last_status_date;
这很令人困惑,因为当我在不应用 pandas_udf 的情况下运行我的代码时不会发生这种情况。 udf 没有以任何方式操作日期列...
def main():
# apply pandas udf
df.groupBy('firstname').apply(my_pandas_udf)
# drop some columns
cols_to_drop = ['firstname']
# save to bigquery
df \
.drop(*cols_to_drop) \
.write \
.format("bigquery") \
.option("temporaryGcsBucket", "<TEMP_BUCKET_NAME>") \
.option("project", "PROJECT_ID") \
.option("credentialsFile","/path/to/my/credentials.json") \
.option("parentProject", "PROJECT_ID") \
.option("table", "PROJECT_ID:dataset.table") \
.mode("overwrite") \
.save()
def udf_schema():
return StructType([
StructField('id', StringType(), True),
StructField('firstname', StringType(), True),
StructField('status', StringType(), True),
StructField('entry_date', TimestampType(), True),
StructField('last_status_date', TimestampType(), True),
])
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
df = df.sort_values('entry_date', ascending=False)
oldest_date = df['entry_date'].iloc[0]
df = df[df['entry_date'] >= oldest_date]
df = df.copy()
return df
我做错了什么?这个*** post 似乎有类似的问题,但截至 2020 年 1 月 21 日尚未得到答复。
编辑(1):pandas_udf 前后的数据框数据类型 从 pandas_udf 返回时会发生错误,但这里是 spark 数据帧在传递给 pandas_udf 之前的数据类型
==> BEFORE
id string
firstname string
status string
entry_date timestamp
date_status_change timestamp
last_status_date timestamp
【问题讨论】:
能否在运行 my_pandas_udf() 前后打印数据帧模式? @DavidRabinowitz 在编辑 (1) 中添加。当 pyarrow 在从my_pandas_udf
返回时尝试将 pandas df 转换回 spark df 时发生错误,因此我只能在 udf 调用之前打印数据类型。
更奇怪的是,这适用于本地较小的数据子集,但是当我在 AWS EMR 上运行超过 10,000 行时,我看到了这个错误?
通常如果事情在本地工作但不是以分布式方式工作,则意味着涉及序列化问题。
你有什么发现吗?我在 AWS Glue 上遇到了同样的问题……奇怪的是它运行良好,而我今天才第一次遇到这个问题。
【参考方案1】:
我最近遇到了类似的问题,我相信这个错误是由于pandas在从spark读取时将每一列都转换为object
。
我解决问题的方法是在创建 pandas 数据框后显式转换时间戳列。所以在你的情况下,类似:
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
df['entry_date'] = pd.to_datetime(df['entry_date'])
df['last_status_date'] = pd.to_datetime(df['last_status_date'])
df = df.sort_values('entry_date', ascending=False)
oldest_date = df['entry_date'].iloc[0]
df = df[df['entry_date'] >= oldest_date]
df = df.copy()
return df
【讨论】:
您可能还需要将任何pd.NaT
设置为实际日期。以上是关于Pyspark:pyarrow.lib.ArrowTypeError:需要一个整数(获取类型时间戳)的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark - ImportError:无法从“pyspark”导入名称“SparkContext”
Pyspark:基于所有列减去/差异 pyspark 数据帧
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe