Spark - 如果已经存在,则更新记录(在镶木地板文件中)
Posted
技术标签:
【中文标题】Spark - 如果已经存在,则更新记录(在镶木地板文件中)【英文标题】:Spark - Update the record (in parquet file) if already exists 【发布时间】:2016-11-25 10:37:50 【问题描述】:我正在编写一个 Spark 作业来从 json 文件中读取数据并将其写入 parquet 文件,下面是示例代码:
DataFrame dataFrame = new DataFrameReader(sqlContext).json(textFile);
dataFrame = dataFrame.withColumn("year", year(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame = dataFrame.withColumn("month", month(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame.write().mode(SaveMode.Append).partitionBy("year", "month").parquet("<some_path>");
Json 文件包含许多 json 记录,如果记录已经存在,我希望记录在 parquet 中更新。我已经尝试过Append
模式,但它似乎在文件级别而不是记录级别上工作(即,如果文件已经存在,它最后会写入)。因此,为同一文件运行此作业会复制记录。
有什么方法可以将数据框行 id 指定为唯一键,并要求 spark 更新记录(如果它已经存在)?所有的保存模式似乎都在检查文件而不是记录。
【问题讨论】:
【参考方案1】:Parquet 是一种文件格式而不是数据库,为了实现通过 id 更新,您需要读取文件,更新内存中的值,而不是将数据重新写入新文件(或覆盖现有文件)。
如果这是一个经常发生的用例,那么使用数据库可能会更好地为您服务。
【讨论】:
所以,在这种情况下,我需要在另一个文件中写入一个数据帧,然后再次读取数据帧以覆盖模式写入以替换原始文件。有没有更新火花记录的最佳方法。【参考方案2】:您可以查看 Apache ORC 文件格式,请参阅:
https://orc.apache.org/docs/acid.html
根据您的用例或 HBase,如果您想保持在 HDFS 之上。
但请记住,HDFS 是一次写入文件系统,如果这不符合您的需要,请使用其他东西(可能是 elasticsearch、mongodb)。
否则,在 HDFS 中,您必须每次都创建新文件,您必须设置增量过程来构建“增量”文件,然后合并 OLD + DELTA = NEW_DATA。
【讨论】:
这正是我正在做的事情,但我无法使用覆盖模式在旧数据上写入新数据。这是主要问题。 请发布一个包含所有详细信息的新问题以上是关于Spark - 如果已经存在,则更新记录(在镶木地板文件中)的主要内容,如果未能解决你的问题,请参考以下文章