Pyspark 无法从 pathlib 对象加载

Posted

技术标签:

【中文标题】Pyspark 无法从 pathlib 对象加载【英文标题】:Pyspark cannot load from pathlib object 【发布时间】:2020-10-20 14:16:55 【问题描述】:
Python Version 3.7.5
Spark Version 3.0
Databricks Runtime 7.3

我目前正在处理我的数据湖文件系统中的路径。

这是

p = dbutils.fs.ls('dbfs:/databricks-datasets/nyctaxi')
print(p)
 [FileInfo(path='dbfs:/databricks-datasets/nyctaxi/readme_nyctaxi.txt', name='readme_nyctaxi.txt', size=916),
 FileInfo(path='dbfs:/databricks-datasets/nyctaxi/reference/', name='reference/', size=0),
 FileInfo(path='dbfs:/databricks-datasets/nyctaxi/taxizone/', name='taxizone/', size=0),
 FileInfo(path='dbfs:/databricks-datasets/nyctaxi/tripdata/', name='tripdata/', size=0)]

现在,为了将其转换为有效的 Pathlib Posix 对象,我将其传递给一个函数

def create_valid_path(paths):
    return Path('/dbfs').joinpath(*[part for part in Path(paths).parts[1:]])

tripdata 的输出是

PosixPath('/dbfs/databricks-datasets/nyctaxi/tripdata')

现在,如果我想在将 csv 的子集收集到列表中之后将其读入 sparkdata 帧。

from pyspark.sql.functions import * 
df = spark.read.format('csv').load(paths)

返回

AttributeError: 'PosixPath' object has no attribute '_get_object_id'

现在,我可以让它工作的唯一方法是手动添加路径 dbfs:/.. 并将每个项目返回到一个字符串,但是有必要使用 Pathlib 来执行一些基本的 I/O 操作。我错过了一些简单的东西还是 Pyspark 根本无法读取 pathlib 对象?

例如

trip_paths_str = [str(Path('dbfs:').joinpath(*part.parts[2:])) for part in trip_paths]

print(trip_paths_str)

['dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-01.csv.gz',
 'dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-02.csv.gz'...]

【问题讨论】:

pathlib 对象中的dbfs 之后是否需要冒号(例如dbfs:/part1/parts2)? @PaulH 这仅在我将其强制转换为字符串时才有效,但与我自己的工作类似,在此之前将dbfs:/ 预先挂起(抱歉在我的帖子中错过了/)并返回一个字符串。这仍然要求我首先创建一个有效的 pathlib 对象,然后创建一个有效的 pyspark 文件路径以读入 spark 数据帧。 听起来您需要将代码分解成块 1) 使用 pathlib 对象来执行基本 IO,通过您需要编写的自定义函数将这些 pathlib 对象转换为 pyspark 路径,3) 执行您的pyspark 的东西 @PaulH 谢谢 - 只是想确保我没有因为经验不足而遗漏一些东西 - 我会留下这个问题,也许微软团队的某个人可以提供更清楚的说明。跨度> 【参考方案1】:

那么不如这样做呢?

from pyspark.sql.functions import * 
import os

def db_list_files(file_path):
  file_list = [file.path for file in dbutils.fs.ls(file_path) if os.path.basename(file.path)]
  return file_list

files = db_list_files('dbfs:/FileStore/tables/')
 
df = spark.read.format('text').load(files)
df.show()

【讨论】:

是的,我明白了,但有时你需要解决这个问题,这就是为什么我说那怎么办?也许我应该添加。 在大数据处理方面,速度可以忽略不计。在数据块方面有很多细微差别。

以上是关于Pyspark 无法从 pathlib 对象加载的主要内容,如果未能解决你的问题,请参考以下文章

pyspark GBTRegressor 对象在加载模型后没有属性“转换”

使用 pathlib 递归遍历所有子目录

[PY3]——IO——pathlib

使用 pathlib 模块从 rglob() 方法的输出中解压缩所有项目 [关闭]

如何从 Azure blob 数据存储中获取 Python pathlib 路径?

在 pyspark 地图逻辑中使用 sparksql 不起作用