spark 2.4 Parquet 列无法在文件中转换,列:[Impressions],预期:bigint,发现:BINARY

Posted

技术标签:

【中文标题】spark 2.4 Parquet 列无法在文件中转换,列:[Impressions],预期:bigint,发现:BINARY【英文标题】:spark 2.4 Parquet column cannot be converted in file, Column: [Impressions], Expected: bigint, Found: BINARY 【发布时间】:2019-11-28 21:05:06 【问题描述】:

我遇到了一个我无法理解的奇怪问题。

我的源数据有一列“Impressions”,有时是 bigint / 有时是字符串(当我手动浏览数据时)。

为此列注册的 HIVE 架构为 Long。

因此,在加载数据时:

spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW adwords_ads_agg_Yxz AS

SELECT
    a.customer_id
    , a.Campaign
    , ...
    , SUM(BIGINT(a.Impressions)) as Impressions
    , SUM(BIGINT(a.Cost))/1000000 as Cost
FROM adwords_ad a
LEFT JOIN ds_ad_mapping m ON BIGINT(a.Ad_ID) = BIGINT(m.adEngineId) AND a.customer_id = m.reportAccountId
WHERE a.customer_id in (...)
AND a.day >= DATE('2019-02-01')
GROUP BY
    a.customer_id
    , ...
""")

我正在确保所有内容都转换为 BIGINT。 错误发生在稍后的步骤:

spark.sql("CACHE TABLE adwords_ads_agg_Yxz")

看到此错误后,我在笔记本中运行相同的代码并尝试进行更多调试,首先确保转换发生在 BIGINT / long 列:

from pyspark.sql.types import LongType

df = df.withColumn("Impressions", f.col("Impressions").cast(LongType()))
df.createOrReplaceTempView('adwords_ads_agg_Yxz')

然后从这个新转换的 df 中打印模式:

root
 |-- customer_id: long (nullable = true)
 |-- Campaign: string (nullable = true)
 |-- MatchType: string (nullable = true)
 |-- League: string (nullable = false)
 |-- Ad_Group: string (nullable = true)
 |-- Impressions: long (nullable = true) <- Here!
 |-- Cost: double (nullable = true)

然后进行缓存,但错误仍然存​​在:

激发工作进度 调用 o84.sql 时出错。 :org.apache.spark.SparkException:作业因阶段故障而中止:阶段 47.0 中的任务 9 失败 4 次,最近一次失败:阶段 47.0 中丢失任务 9.3(TID 2256,ip-172-31-00-00.eu -west-1.compute.internal,执行器 10):org.apache.spark.sql.execution.QueryExecutionException: Parquet 列无法在文件 s3a://bucket/prod/reports/adwords_ad/customer_id=1111111/date=2019-11-21/theparquetfile.snappy.parquet 中转换。列:[展示次数],预期:bigint,发现:BINARY

有没有人遇到过这个问题和/或知道是什么原因造成的?

如果我删除缓存,尝试将数据写入 parquet 时会发生错误。 我也不知道为什么在我尝试刷新/写入临时表时提到 adwords_ad 表

【问题讨论】:

有趣的是得到 -1 没有解释 不能直接将二进制转换为bigint,先尝试转换为StringType:BIGINT(string(a.Impressions)) @jxc 不幸的是,这并没有改变.. 奇怪的是,我有 2 个使用 Impressions / Cost cols 的表(以 BINARY 表示),并且此转换仅适用于两者之一。不管我对第二个做什么,它仍然说它是一个二进制列。 spark 转换被延迟评估。错误仅在您执行操作(即缓存、写入)时显示。我想这就是你投反对票的原因。我认为您应该关注源数据中的 SQL 和/或问题。 【参考方案1】:

在镶木地板上使用蜂巢表时, 然后使用 SPARK 读取它, SPARK 采用 parquet 的架构,而不是 hive 表定义的架构。

在你的 parquet 文件中 schema Impressions 是一个 BINARY 是有道理的,而在 hive 表中它的 Long 并不重要,因为 spark 从 parquet 文件中获取 schema。

【讨论】:

如果转换为字符串不能解决问题,您应该编写一个自定义 UDF,将 BinaryType(实际上它只是 python 中 bytearray 的别名)转换为字符串。 有意思,谢谢你的帮助,我试试

以上是关于spark 2.4 Parquet 列无法在文件中转换,列:[Impressions],预期:bigint,发现:BINARY的主要内容,如果未能解决你的问题,请参考以下文章

无法使用Spark Structured Streaming在Parquet文件中写入数据

Spark parquet 模式演变

Spark:从镶木地板中读取一个 int 列,只要

如果 csv 列标题包含空格,则在 spark 中将 csv 转换为 parquet 会出错

Apache Spark 无法读取使用流式作业编写的 parquet 文件夹

Spark Parquet统计(最小/最大)集成