通过在 Spark 中列出该位置下的文件来避免“for 循环”
Posted
技术标签:
【中文标题】通过在 Spark 中列出该位置下的文件来避免“for 循环”【英文标题】:Avoid 'for loop' by listing the files under the location in Spark 【发布时间】:2021-03-11 11:02:48 【问题描述】:我正在尝试通过 Spark 将多个文件从 datalake 读取到数据块中。目前使用 for 循环,它非常慢并且修改了代码以使用过滤器,但我遇到了错误。这个问题还有其他解决方案吗?
文件位置
abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=1/*.csv
abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=2/*.csv
......
.csv。每小时生成一次
工作代码
from pyspark.sql.utils import AnalysisException
df_list = []
for day in range(1,int(getArgument("NUMBER_OF_DAYS"))+1,1):
for hour in range(0,24,1):
file_location"**dummy**/year="+getArgument("YEAR")+"/month="+getArgument("MONTH")+"/dayofmonth="+str(day)+"/hour="+str(hour)+"/*.csv"
print(file_location)
try:
batch_df= spark.read.format("csv").option("header", "true").load(file_location)
pandas_df = batch_df.toPandas()
print(pandas_df.shape)
df_list.append(pandas_df)
except AnalysisException as e:
print(e)
final_batch_pandas_df = pd.concat(df_list)
print(final_batch_pandas_df.shape)
已修改(Hadoop 文件系统 API)
data_path = sc._gateway.jvm.org.apache.hadoop.fs.Path(
"**dummy**/year=" + getArgument("YEAR") + "/month=" + getArgument("MONTH")
)
files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)
filtered_files = []
# filter files that have dayofmonth in [1, NUMBER_OF_DAYS]
while files.hasNext():
file_path = files.next().getPath().toString()
dayofmonth = int(re.search(r".*/dayofmonth=(\d+)/.*", file_path).group(1))
if dayofmonth <= getArgument("NUMBER_OF_DAYS"):
filtered_files.append(file_path)
batch_df = spark.read.format("csv").option("header", "true").load(*filtered_files)
final_pandas_df = batch_df.toPandas()
错误
Failure to initialize configuration
----> 2 files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)
3
4 filtered_files = []
5
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
【问题讨论】:
【参考方案1】:如果你定义你的路径如下:
file_location"xxxxx/year="+getArgument("YEAR")+"/month="+getArgument("MONTH")+"/dayofmonth="+days+"/hour="+hours+"/*.csv"
如果天和小时是逗号分隔的值,它应该加载与该正则表达式匹配的每个文件。
所以你可以在 foo 循环内进行路径计算,但在循环外进行 spark 读取和处理。作为一般解决方案:您可以使用正则表达式作为 spark 文件阅读器的输入。
【讨论】:
以上是关于通过在 Spark 中列出该位置下的文件来避免“for 循环”的主要内容,如果未能解决你的问题,请参考以下文章