通过在 Spark 中列出该位置下的文件来避免“for 循环”

Posted

技术标签:

【中文标题】通过在 Spark 中列出该位置下的文件来避免“for 循环”【英文标题】:Avoid 'for loop' by listing the files under the location in Spark 【发布时间】:2021-03-11 11:02:48 【问题描述】:

我正在尝试通过 Spark 将多个文件从 datalake 读取到数据块中。目前使用 for 循环,它非常慢并且修改了代码以使用过滤器,但我遇到了错误。这个问题还有其他解决方案吗?

文件位置

 abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=1/*.csv
 abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=2/*.csv
......

.csv。每小时生成一次

工作代码

 from pyspark.sql.utils import AnalysisException


df_list = []
for day in range(1,int(getArgument("NUMBER_OF_DAYS"))+1,1):
  for hour in range(0,24,1):

file_location"**dummy**/year="+getArgument("YEAR")+"/month="+getArgument("MONTH")+"/dayofmonth="+str(day)+"/hour="+str(hour)+"/*.csv"

    print(file_location)

    try:
        batch_df= spark.read.format("csv").option("header", "true").load(file_location)

        pandas_df = batch_df.toPandas()

        print(pandas_df.shape)

        df_list.append(pandas_df)

    except AnalysisException as e:
        print(e)

final_batch_pandas_df = pd.concat(df_list)

print(final_batch_pandas_df.shape)

已修改(Hadoop 文件系统 API)

data_path = sc._gateway.jvm.org.apache.hadoop.fs.Path(
    "**dummy**/year=" + getArgument("YEAR") + "/month=" + getArgument("MONTH")
)
files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)

filtered_files = []

# filter files that have dayofmonth in [1, NUMBER_OF_DAYS]
while files.hasNext():
    file_path = files.next().getPath().toString()
    dayofmonth = int(re.search(r".*/dayofmonth=(\d+)/.*", file_path).group(1))
    if dayofmonth <= getArgument("NUMBER_OF_DAYS"):
        filtered_files.append(file_path)

batch_df = spark.read.format("csv").option("header", "true").load(*filtered_files)
final_pandas_df = batch_df.toPandas()

错误

Failure to initialize configuration

   ----> 2 files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)
      3 
      4 filtered_files = []
      5 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    125     def deco(*a, **kw):
    126         try:
--> 127             return f(*a, **kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling 012.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

【问题讨论】:

【参考方案1】:

如果你定义你的路径如下:

file_location"xxxxx/year="+getArgument("YEAR")+"/month="+getArgument("MONTH")+"/dayofmonth="+days+"/hour="+hours+"/*.csv"

如果天和小时是逗号分隔的值,它应该加载与该正则表达式匹配的每个文件。

所以你可以在 foo 循环内进行路径计算,但在循环外进行 spark 读取和处理。作为一般解决方案:您可以使用正则表达式作为 spark 文件阅读器的输入。

【讨论】:

以上是关于通过在 Spark 中列出该位置下的文件来避免“for 循环”的主要内容,如果未能解决你的问题,请参考以下文章

使用File类列出指定位置下的文件及目录信息

列出/etc目录下的所有文件命令

避免指定模式两次(Spark/scala)

php列出目录下所有文件(包括子目录)

使用 Spark 列出 Hadoop HDFS 目录中的所有文件?

Spark History Server 配置部署