读取镶木地板文件时,有没有办法在 basePath 中使用通配符?

Posted

技术标签:

【中文标题】读取镶木地板文件时,有没有办法在 basePath 中使用通配符?【英文标题】:Is there a way to have wildcards in the basePath when reading parquet files? 【发布时间】:2020-07-30 19:40:43 【问题描述】:

有没有一种方法可以在使用带有 spark 读取的 basePath 选项时使用通配符 (*) 一次性读取具有不同 basePath 的多个分区镶木地板文件?例如:

spark.read.option("basePath","s3://latest/data/*/").parquet(*dir)

得到错误:

error:   pyspark.sql.utils.IllegalArgumentException: u"Option 'basePath' must be a directory"

【问题讨论】:

我很确定我回答了你的问题 请问您为什么不能接受这个答案?我仔细检查了 @thebluephantom 我已经接受了。没有办法做我期望的事情 投了反对票的人,我想说这个问题是我自己检查后才发布的。 网站有时很难,有时不合理。 【参考方案1】:

没有。您可以将多个 paths 与单个基本路径结合使用以获取 DF 架构中的分区列,但您不能指定多个 base paths 或使用通配符作为该基本 path.string 的一部分。

【讨论】:

【参考方案2】:

你可以简单地给出根路径,

spark.read.parquet("s3://latest/data/")

带有选项。

spark.hive.mapred.supports.subdirectories    true
spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive    true

然后,spark 会从 /data/ 文件夹到子目录递归查找 parquet 文件。

下面的代码是示例。

import org.apache.spark.SparkContext, SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("test")
    .set("spark.hive.mapred.supports.subdirectories","true")
    .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")

val spark = SparkSession.builder.config(conf).getOrCreate()

val df = spark.read.parquet("s3a://bucket/path/to/base/")

SCALA:我已经用我的多个 CSV 文件进行了测试。目录的树形结构是

.
|-- test=1
|   `-- test1.csv
`-- test=2
    `-- test2.csv

基本路径是s3://bucket/test/。对于每个 CSV hs 的内容

test1.csv

x,y,z
tes,45,34
tes,43,67
tes,56,43
raj,45,43
raj,44,67

test2.csv

x,y,z
shd,43,34
adf,2,67

和命令

val df = spark.read.option("header","true").csv("s3a://bucket/test/")

df.show(false)

给出如下结果:

+---+---+---+----+
|x  |y  |z  |test|
+---+---+---+----+
|tes|45 |34 |1   |
|tes|43 |67 |1   |
|tes|56 |43 |1   |
|raj|45 |43 |1   |
|raj|44 |67 |1   |
|shd|43 |34 |2   |
|adf|2  |67 |2   |
+---+---+---+----+

PYSPARK

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("yarn") \
  .appName("test") \
  .config("spark.hive.mapred.supports.subdirectories","true") \
  .config("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true") \
  .getOrCreate()

df = spark.read.option("header","true").csv("s3a://bucket/test/")
df.show(10, False)

+---+---+---+----+
|x  |y  |z  |test|
+---+---+---+----+
|tes|45 |34 |1   |
|tes|43 |67 |1   |
|tes|56 |43 |1   |
|raj|45 |43 |1   |
|raj|44 |67 |1   |
|shd|43 |34 |2   |
|adf|2  |67 |2   |
+---+---+---+----+

当我测试 pyspark 代码时,我没有换行。所以,请检查它是否正确。好吧,我把test=x之类的路径放上去,它被识别为一个分区结构,所以结果就是把它作为一个列。

【讨论】:

你能解释得更好吗? 是的。需要对此进行一些解释 不相信回答手头的问题 它与原始问题有关吗?

以上是关于读取镶木地板文件时,有没有办法在 basePath 中使用通配符?的主要内容,如果未能解决你的问题,请参考以下文章

如何在读取镶木地板文件时检查损坏的文件?

在读取镶木地板文件时刷新 Dataframe 的元数据

流式镶木地板文件python并且仅下采样

无法从镶木地板中读取零件文件

从目录读取镶木地板文件时,pyspark不保存

无法读取镶木地板文件