Pyspark:pyarrow.lib.ArrowTypeError:需要一个整数(获取类型时间戳)

Posted

技术标签:

【中文标题】Pyspark:pyarrow.lib.ArrowTypeError:需要一个整数(获取类型时间戳)【英文标题】:Pyspark: pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp) 【发布时间】:2020-01-21 16:18:17 【问题描述】:

我正在将 spark 数据框写入 bigquery 表。这是可行的,但现在我在将数据写入 bigquery 之前调用了 pandas udf。出于某种原因,当我在将 spark 数据帧写入 bigquery 之前调用 pandas udf 时,我现在看到以下错误:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 240, in create_array
    return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
  File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp)

从下面的执行程序日志中,哪一个看起来像是由错误的 parquet 架构引起的,其中时间戳列被推断为整数?

20/01/20 22:45:38 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:

  "type" : "struct",
  "fields" : [ 
    "name" : "id",
    "type" : "string",
    "nullable" : true,
    "metadata" :  
  , 
    "name" : "firstname",
    "type" : "string",
    "nullable" : true,
    "metadata" :  
  , 
    "name" : "status",
    "type" : "string",
    "nullable" : true,
    "metadata" :  
  , 
    "name" : "entry_date",
    "type" : "timestamp",
    "nullable" : true,
    "metadata" :  
  , 
    "name" : "last_status_date",
    "type" : "timestamp",
    "nullable" : true,
    "metadata" :  
   ]

and corresponding Parquet message type:
message spark_schema 
  optional binary id (UTF8);
  optional binary firstname (UTF8);
  optional binary status (UTF8);
  optional int96 entry_date;
  optional int96 last_status_date;

这很令人困惑,因为当我在不应用 pandas_udf 的情况下运行我的代码时不会发生这种情况。 udf 没有以任何方式操作日期列...

def main():
    # apply pandas udf 
    df.groupBy('firstname').apply(my_pandas_udf)

    # drop some columns
    cols_to_drop = ['firstname']

    # save to bigquery
    df \
        .drop(*cols_to_drop) \
        .write \
        .format("bigquery") \
        .option("temporaryGcsBucket", "<TEMP_BUCKET_NAME>") \
        .option("project", "PROJECT_ID") \
        .option("credentialsFile","/path/to/my/credentials.json") \
        .option("parentProject", "PROJECT_ID") \
        .option("table", "PROJECT_ID:dataset.table") \
        .mode("overwrite") \
        .save()

def udf_schema():
    return StructType([
        StructField('id', StringType(), True),
        StructField('firstname', StringType(), True),
        StructField('status', StringType(), True),
        StructField('entry_date', TimestampType(), True),
        StructField('last_status_date', TimestampType(), True),
    ])

@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
    df = df.sort_values('entry_date', ascending=False)
    oldest_date = df['entry_date'].iloc[0]
    df = df[df['entry_date'] >= oldest_date]
    df = df.copy()
    return df

我做错了什么?这个*** post 似乎有类似的问题,但截至 2020 年 1 月 21 日尚未得到答复。

编辑(1):pandas_udf 前后的数据框数据类型 从 pandas_udf 返回时会发生错误,但这里是 spark 数据帧在传递给 pandas_udf 之前的数据类型

==> BEFORE 

id string
firstname string
status string
entry_date timestamp
date_status_change timestamp
last_status_date timestamp

【问题讨论】:

能否在运行 my_pandas_udf() 前后打印数据帧模式? @DavidRabinowitz 在编辑 (1) 中添加。当 pyarrow 在从my_pandas_udf 返回时尝试将 pandas df 转换回 spark df 时发生错误,因此我只能在 udf 调用之前打印数据类型。 更奇怪的是,这适用于本地较小的数据子集,但是当我在 AWS EMR 上运行超过 10,000 行时,我看到了这个错误? 通常如果事情在本地工作但不是以分布式方式工作,则意味着涉及序列化问题。 你有什么发现吗?我在 AWS Glue 上遇到了同样的问题……奇怪的是它运行良好,而我今天才第一次遇到这个问题。 【参考方案1】:

我最近遇到了类似的问题,我相信这个错误是由于pandas在从spark读取时将每一列都转换为object

我解决问题的方法是在创建 pandas 数据框后显式转换时间戳列。所以在你的情况下,类似:

@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
    df['entry_date'] = pd.to_datetime(df['entry_date'])
    df['last_status_date'] = pd.to_datetime(df['last_status_date'])
    df = df.sort_values('entry_date', ascending=False)
    oldest_date = df['entry_date'].iloc[0]
    df = df[df['entry_date'] >= oldest_date]
    df = df.copy()
    return df

【讨论】:

您可能还需要将任何pd.NaT 设置为实际日期。

以上是关于Pyspark:pyarrow.lib.ArrowTypeError:需要一个整数(获取类型时间戳)的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 安装错误:没有名为“pyspark”的模块

Pyspark:将 sql 查询转换为 pyspark?

Pyspark - ImportError:无法从“pyspark”导入名称“SparkContext”

Pyspark:基于所有列减去/差异 pyspark 数据帧

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

pyspark:在日期和时间上重新采样 pyspark 数据帧