spark 2.4 Parquet 列无法在文件中转换,列:[Impressions],预期:bigint,发现:BINARY
Posted
技术标签:
【中文标题】spark 2.4 Parquet 列无法在文件中转换,列:[Impressions],预期:bigint,发现:BINARY【英文标题】:spark 2.4 Parquet column cannot be converted in file, Column: [Impressions], Expected: bigint, Found: BINARY 【发布时间】:2019-11-28 21:05:06 【问题描述】:我遇到了一个我无法理解的奇怪问题。
我的源数据有一列“Impressions”,有时是 bigint / 有时是字符串(当我手动浏览数据时)。
为此列注册的 HIVE 架构为 Long。
因此,在加载数据时:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW adwords_ads_agg_Yxz AS
SELECT
a.customer_id
, a.Campaign
, ...
, SUM(BIGINT(a.Impressions)) as Impressions
, SUM(BIGINT(a.Cost))/1000000 as Cost
FROM adwords_ad a
LEFT JOIN ds_ad_mapping m ON BIGINT(a.Ad_ID) = BIGINT(m.adEngineId) AND a.customer_id = m.reportAccountId
WHERE a.customer_id in (...)
AND a.day >= DATE('2019-02-01')
GROUP BY
a.customer_id
, ...
""")
我正在确保所有内容都转换为 BIGINT。 错误发生在稍后的步骤:
spark.sql("CACHE TABLE adwords_ads_agg_Yxz")
看到此错误后,我在笔记本中运行相同的代码并尝试进行更多调试,首先确保转换发生在 BIGINT / long 列:
from pyspark.sql.types import LongType
df = df.withColumn("Impressions", f.col("Impressions").cast(LongType()))
df.createOrReplaceTempView('adwords_ads_agg_Yxz')
然后从这个新转换的 df 中打印模式:
root
|-- customer_id: long (nullable = true)
|-- Campaign: string (nullable = true)
|-- MatchType: string (nullable = true)
|-- League: string (nullable = false)
|-- Ad_Group: string (nullable = true)
|-- Impressions: long (nullable = true) <- Here!
|-- Cost: double (nullable = true)
然后进行缓存,但错误仍然存在:
激发工作进度 调用 o84.sql 时出错。 :org.apache.spark.SparkException:作业因阶段故障而中止:阶段 47.0 中的任务 9 失败 4 次,最近一次失败:阶段 47.0 中丢失任务 9.3(TID 2256,ip-172-31-00-00.eu -west-1.compute.internal,执行器 10):org.apache.spark.sql.execution.QueryExecutionException: Parquet 列无法在文件 s3a://bucket/prod/reports/adwords_ad/customer_id=1111111/date=2019-11-21/theparquetfile.snappy.parquet 中转换。列:[展示次数],预期:bigint,发现:BINARY
有没有人遇到过这个问题和/或知道是什么原因造成的?
如果我删除缓存,尝试将数据写入 parquet 时会发生错误。 我也不知道为什么在我尝试刷新/写入临时表时提到 adwords_ad 表
【问题讨论】:
有趣的是得到 -1 没有解释 不能直接将二进制转换为bigint,先尝试转换为StringType:BIGINT(string(a.Impressions))
。
@jxc 不幸的是,这并没有改变..
奇怪的是,我有 2 个使用 Impressions / Cost cols 的表(以 BINARY 表示),并且此转换仅适用于两者之一。不管我对第二个做什么,它仍然说它是一个二进制列。
spark 转换被延迟评估。错误仅在您执行操作(即缓存、写入)时显示。我想这就是你投反对票的原因。我认为您应该关注源数据中的 SQL 和/或问题。
【参考方案1】:
在镶木地板上使用蜂巢表时, 然后使用 SPARK 读取它, SPARK 采用 parquet 的架构,而不是 hive 表定义的架构。
在你的 parquet 文件中 schema Impressions 是一个 BINARY 是有道理的,而在 hive 表中它的 Long 并不重要,因为 spark 从 parquet 文件中获取 schema。
【讨论】:
如果转换为字符串不能解决问题,您应该编写一个自定义 UDF,将 BinaryType(实际上它只是 python 中 bytearray 的别名)转换为字符串。 有意思,谢谢你的帮助,我试试以上是关于spark 2.4 Parquet 列无法在文件中转换,列:[Impressions],预期:bigint,发现:BINARY的主要内容,如果未能解决你的问题,请参考以下文章
无法使用Spark Structured Streaming在Parquet文件中写入数据
如果 csv 列标题包含空格,则在 spark 中将 csv 转换为 parquet 会出错