将增量文件写入 S3 (MinIO) - PySpark 2.4.3

Posted

技术标签:

【中文标题】将增量文件写入 S3 (MinIO) - PySpark 2.4.3【英文标题】:Write delta file to S3 (MinIO) - PySpark 2.4.3 【发布时间】:2019-09-08 19:36:59 【问题描述】:

我目前正在尝试将 delta-lake parquet 文件写入 S3,我在本地将其替换为 MinIO。

我可以完美地将标准parquet 文件读/写到S3

但是,当我使用delta lake example

配置delta to s3

看来我无法将delta_log/ 写到我的MinIO

所以我尝试设置:fs.AbstractFileSystem.s3a.implfs.s3a.impl

我正在使用pyspark[sql]==2.4.3,我在当前的venv 中使用它。

src/.env

# pyspark packages
DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.3
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.3
PYSPARK_SUBMIT_ARGS = $HADOOP_AWS,$HADOOP_COMMON,$DELTA

src/spark_session.py:

# configure s3 connection for read/write operation (native spark)
hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
# hadoop_conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")  #  when using hadoop 2.8.5
# hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")  #  alternative to above hadoop 2.8.5
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("spark.history.fs.logDirectory", 's3a://spark-logs-test/')

src/apps/raw_to_parquet.py

# Trying to write pyspark dataframe to MinIO (S3)

raw_df.coalesce(1).write.format("delta").save(s3_url)


bash:

# RUN CODE
spark-submit --packages $(PYSPARK_SUBMIT_ARGS) src/run_onlineretailer.py

hadoop-common: 2.7.3hadoop-aws: 2.7.3 出错:java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.fs.s3a.S3AFileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)

所以有了这个错误,我随后更新为hadoop-common: 2.8.5hadoop-aws: 2.8.5,以修复NoSuchMethodException。因为delta需要:S3AFileSystem

py4j.protocol.Py4JJavaError: An error occurred while calling o89.save. : java.lang.NoSuchMethodError: org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders(Lorg/apache/hadoop/conf/Configuration;Ljava/lang/Class;)Lorg/apache/hadoop/conf/Configuration

所以对我来说,parquet 文件似乎可以毫无问题地写入,但是,delta 创建了这些无法识别的delta_log 文件夹(我认为?)。

当前source code。

阅读了几个不同的类似问题,但似乎没有人尝试使用 delta lake 文件。

更新

目前可以使用以下设置:

#pyspark packages
DELTA_LOGSTORE = spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.7
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.7
PYSPARK_SUBMIT_ARGS = $HADOOP_AWS,$HADOOP_COMMON,$DELTA
PYSPARK_CONF_ARGS = $DELTA_LOGSTORE
# configure s3 connection for read/write operation (native spark)
hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
spark-submit --packages $(PYSPARK_SUBMIT_ARGS) --conf $(PYSPARK_CONF_ARGS) src/run_onlineretailer.py

奇怪的是它只会这样工作。

如果我尝试使用sc.confhadoop_conf 设置它不起作用,请参阅未注释的代码:

def spark_init(self) -> SparkSession:

    sc: SparkSession = SparkSession \
        .builder \
        .appName(self.app_name) \
        .config("spark.sql.warehouse.dir", self.warehouse_location) \
        .getOrCreate()

    # set log level
    sc.sparkContext.setLogLevel("WARN")

    # Enable Arrow-based columnar data transfers
    sc.conf.set("spark.sql.execution.arrow.enabled", "true")

    # sc.conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    # configure s3 connection for read/write operation (native spark)
    hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
    hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
    hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
    #hadoop_conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    return sc

如果有人能解释一下,那就太好了。是因为.getOrCreate()吗?没有这个电话似乎不可能设置conf?运行应用程序时在命令行中除外。

【问题讨论】:

您可以尝试使用包含 Hadoop 的 Spark 包吗?此外,您需要在 SparkSession 创建之前放置所有 conf 调用,尤其是 logStore。见docs.delta.io/latest/delta-storage.html 【参考方案1】:

你正在混合 hadoop-* jars;就像火花一样,它们只有在它们都来自同一个版本时才能工作

【讨论】:

好的,但我不明白我是如何混合它们的?因为我对两者都使用相同的版本?是因为当我导入pyspark 时它有自己的hadoop 版本,而当我给它packages 时它不一样?因为如果我将它作为conf 参数提供它现在可以工作,并且我将fs.* 端点设置为sc.sparkContext._jsc.hadoopConfiguration() 指出混合版本发生在哪里会很有帮助。 ..我不知道混合版本发生在哪里。我确实认识到不一致的 JAR 版本。这是一个部署/配置问题,所以很遗憾,看到问题的人都可以解决。我:我会运行 storageiag 或其他方式来定位托管冲突类的 JAR github.com/steveloughran/cloudstore

以上是关于将增量文件写入 S3 (MinIO) - PySpark 2.4.3的主要内容,如果未能解决你的问题,请参考以下文章

Docker Swarm 使用NFS 搭建 S3 (minio)多副本

drill 集成开源s3 存储minio

Dremio: 将 Minio 配置为分布式存储

s3fs 挂载minio为本地文件系统

通过火花数据框读取 S3 文件时,胶水书签不起作用

增量加载 s3 文件夹文件