Spark - 如果已经存在,则更新记录(在镶木地板文件中)

Posted

技术标签:

【中文标题】Spark - 如果已经存在,则更新记录(在镶木地板文件中)【英文标题】:Spark - Update the record (in parquet file) if already exists 【发布时间】:2016-11-25 10:37:50 【问题描述】:

我正在编写一个 Spark 作业来从 json 文件中读取数据并将其写入 parquet 文件,下面是示例代码:

    DataFrame dataFrame = new DataFrameReader(sqlContext).json(textFile);
    dataFrame = dataFrame.withColumn("year", year(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
    dataFrame = dataFrame.withColumn("month", month(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
    dataFrame.write().mode(SaveMode.Append).partitionBy("year", "month").parquet("<some_path>");

Json 文件包含许多 json 记录,如果记录已经存在,我希望记录在 parquet 中更新。我已经尝试过Append 模式,但它似乎在文件级别而不是记录级别上工作(即,如果文件已经存在,它最后会写入)。因此,为同一文件运行此作业会复制记录。

有什么方法可以将数据框行 id 指定为唯一键,并要求 spark 更新记录(如果它已经存在)?所有的保存模式似乎都在检查文件而不是记录。

【问题讨论】:

【参考方案1】:

Parquet 是一种文件格式而不是数据库,为了实现通过 id 更新,您需要读取文件,更新内存中的值,而不是将数据重新写入新文件(或覆盖现有文件)。

如果这是一个经常发生的用例,那么使用数据库可能会更好地为您服务。

【讨论】:

所以,在这种情况下,我需要在另一个文件中写入一个数据帧,然后再次读取数据帧以覆盖模式写入以替换原始文件。有没有更新火花记录的最佳方法。【参考方案2】:

您可以查看 Apache ORC 文件格式,请参阅:

https://orc.apache.org/docs/acid.html

根据您的用例或 HBase,如果您想保持在 HDFS 之上。

但请记住,HDFS 是一次写入文件系统,如果这不符合您的需要,请使用其他东西(可能是 elasticsearch、mongodb)。

否则,在 HDFS 中,您必须每次都创建新文件,您必须设置增量过程来构建“增量”文件,然后合并 OLD + DELTA = NEW_DATA。

【讨论】:

这正是我正在做的事情,但我无法使用覆盖模式在旧数据上写入新数据。这是主要问题。 请发布一个包含所有详细信息的新问题

以上是关于Spark - 如果已经存在,则更新记录(在镶木地板文件中)的主要内容,如果未能解决你的问题,请参考以下文章

如何在镶木地板文件中使用 K-means

Apache Arrow 使用 C++ 在镶木地板中编写嵌套类型

如果不存在则插入 Spark SQL 中的其他更新

火花数据框密封特征类型

Mysql插入数据:不存在则插入,存在则跳过或更新

如果存在且不是空白字段,则更新记录