如何刷新 HDFS 路径?

Posted

技术标签:

【中文标题】如何刷新 HDFS 路径?【英文标题】:How do I refresh a HDFS path? 【发布时间】:2020-07-01 03:06:49 【问题描述】:

我正在 jupyter notebook 中运行 sparksession。

当该路径下的文件发生更改时,即使我缓存了数据帧,我也会在spark.read.parquet(some_path) 初始的数据帧上出现错误。

例如

读码是

sp = spark.read.parquet(TB.STORE_PRODUCT)
sp.cache()

有时,sp 不能再访问了,抱怨:

Py4JJavaError: An error occurred while calling o3274.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 326.0 failed 4 times, most recent failure: Lost task 10.3 in stage 326.0 (TID 111818, dc38, executor 7): java.io.FileNotFoundException: File does not exist: hdfs://xxxx/data/dm/sales/store_product/part-00000-169428df-a9ee-431e-918b-75477c073d71-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

问题

'REFRESH TABLE tableName' 不起作用,因为

我没有 hive 表,它只是一个 hdfs 路径

重启sparksession并再次读取该路径可以解决这个问题,但是

我不想重新启动 sparksession,这会浪费很多时间

还有一个

再次执行sp = spark.read.parquet(TB.STORE_PRODUCT) 不起作用,我可以理解为什么,spark 应该再次扫描路径,或者必须有一个选项/设置来强制它扫描。将整个路径位置保存在内存中并不明智。

spark.read.parquet 没有强制扫描选项

Signature: spark.read.parquet(*paths)
Docstring:
Loads Parquet files, returning the result as a :class:`DataFrame`.

You can set the following Parquet-specific option(s) for reading Parquet files:
    * ``mergeSchema``: sets whether we should merge schemas collected from all                 Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.                 The default value is specified in ``spark.sql.parquet.mergeSchema``.

>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]

.. versionadded:: 1.4
Source:   
    @since(1.4)
    def parquet(self, *paths):
        """Loads Parquet files, returning the result as a :class:`DataFrame`.

        You can set the following Parquet-specific option(s) for reading Parquet files:
            * ``mergeSchema``: sets whether we should merge schemas collected from all \
                Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
                The default value is specified in ``spark.sql.parquet.mergeSchema``.

        >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
        >>> df.dtypes
        [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
        """
        return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File:      /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/readwriter.py
Type:      method

有没有合适的方法来解决我的问题?

【问题讨论】:

【参考方案1】:

问题是由Dataframe.cache引起的。

我需要先清除缓存,然后再读取将解决问题

代码:

try:
    sp.unpersist()
except:
    pass
sp = spark.read.parquet(TB.STORE_PRODUCT)
sp.cache()

【讨论】:

【参考方案2】:

您可以尝试两种解决方案

按照@Mithril 的建议,在每次读取之前取消持久化数据帧

或者只是创建一个临时视图并触发刷新命令

sp.createOrReplaceTempView('sp_table')
spark.sql('''REFRESH TABLE sp_table''')
df=spark.sql('''select * from sp_table''')

【讨论】:

以上是关于如何刷新 HDFS 路径?的主要内容,如果未能解决你的问题,请参考以下文章

如何判断hdfs(hadoop)上的路径是文件还是目录。

我应该如何在spark文本文件中表达hdfs路径?

如何将hdfs里某一目录下的所有文件的文件名读取出来

如何使用 Flume 按年和月对 txt/csv 文件中的数据进行分区?是不是可以使 HDFS 路径动态化?

使用java API如何获取给定路径的HDF文件结构

如何使用 HDFS 在 Hadoop 上运行 TensorFlow