写入和回读时火花缺失列
Posted
技术标签:
【中文标题】写入和回读时火花缺失列【英文标题】:Spark missing column when write and read back 【发布时间】:2017-11-16 17:43:54 【问题描述】:我正在创建一个数据集并将其以 parquet 格式写入目录结构 $BasePath/y=2107/m=11/d=16/
中的 s3。在我创建当天数据并将其写入 s3 后,我需要读回过去 10 天的数据。
我能做到的有两种方法,
第一种方法:将当天数据写入 s3,读取其余 9 天的数据并进行联合,例如
dataset.write.mode(SaveMode.Overwrite)
.format(sourceConfig.format).save(getWriteBasePath(sourceConfig.sourcePath
, replaceDate))
newDf = spark.read.parquet("path1",...,"path9").union(dataset)
第二种方法:将当天数据写入s3,并再次读取所有10天数据。类似的东西:
dataset.write.mode(SaveMode.Overwrite)
.format(sourceConfig.format).save(getWriteBasePath(sourceConfig.sourcePath
, replaceDate))
newDf = spark.read.parquet("path1",...,"path10")
第一种方法运行没有任何问题但与第二种方法相比非常慢。但是使用第二种方法,当我将完整数据读回 spark 后,当天的一些列会得到空值。我验证了当天写入的数据是正确的。
我无法弄清楚为什么会这样。我正在使用以下属性创建火花上下文:
sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sparkSession.sparkContext.getConf.set("spark.hadoop.parquet.enable.summary-metadata", "false")
sparkSession.sparkContext.getConf.set("spark.sql.parquet.mergeSchema", "false")
sparkSession.sparkContext.getConf.set("spark.sql.parquet.filterPushdown", "true")
sparkSession.sparkContext.getConf.set("spark.sql.hive.metastorePartitionPruning", "true")
【问题讨论】:
“一些列...”??哪些列得到空值? @GlennieHellesSindholt 在大约 20 列中,第 15 列和第 16 列的值都为空。所有其他列都有正确的值 从逻辑上讲,第一种方法应该工作得更快,因为您从磁盘读取的数据更少。可能是第二种方法工作得更快,因为只为这 2 个列加载空值。您正在处理的数据的大小是多少。如果这 2 列的数据很大,则您的第二种方法运行速度很快,因为它没有加载这些数据。 @Phoenix 采用快速压缩的镶木地板总数据大小约为 70 Gb。这 2 列只是文本列,最多 30-40 个字符。 可能对有类似问题的人有所帮助:如果用于对数据框进行分区的列具有空值(如“day”或“month”列),请尝试添加 option("basePath", basePath) . 【参考方案1】:我希望将当前数据缓存在一个数据帧中,并将过去 9 天的所有数据读入另一个数据帧。现在您已经将所有数据都保存在内存中了。您的执行速度必须更快。我们写入 s3 的那一刻(作为一个动作),创建的整个 DAG 将被刷新,这就是您的第一种方法非常慢的原因。
【讨论】:
我实际上正在缓存它.. 但仍然比第二种方法多花 5 分钟以上是关于写入和回读时火花缺失列的主要内容,如果未能解决你的问题,请参考以下文章