无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse
Posted
技术标签:
【中文标题】无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse【英文标题】:Can't write to Azure Sql DataWarehouse from databricks pyspark workers 【发布时间】:2019-07-17 23:09:19 【问题描述】:我正在尝试将数据简单地写入 azure sql DataWarehouse,同时使用 azure blob 存储进行暂存。
在 azure databricks 文档azure/sql-data-warehouse 中有一个非常简单的教程,如果您逐步遵循它,它会起作用。
但是在我的场景中,我必须从正在执行 foreach 的工作人员那里进行写入。
这里有一些与问题相关的链接:
error-using-pyspark-with-wasb-connecting-pyspark-with-azure-blob
github.com/Azure/mmlspark/issues/456
pyspark-java-io-ioexception-no-filesystem-for-scheme-https
所以,下面的代码WORKS:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("fs.azure.account.key.<storageAccountName>.blob.core.windows.net", "myKey")
df = spark.createDataFrame([(1, 2, 3, 4), (5, 6, 7, 8)], ('a', 'b', 'c', 'd'))
(df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver:...")
.option("user", "user@server")
.option("password", "pass")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "dbo.table_teste")
.option("tempDir", "wasbs://<container>@<storageAccountName>.blob.core.windows.net/")
.mode("append")
.save())
但是,当我将上面的代码插入到 foreach 中时,它会失败,如下所示:
from pyspark.sql.session import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.getOrCreate()
def iterate(row):
# The code above
dfIter = spark.createDataFrame([(1, 2, 3, 4)], ('a', 'b', 'c', 'd'))
dfIter.rdd.foreach(iterate)
执行它会产生这个异常:
py4j.protocol.Py4JJavaError:调用 o54.save 时出错。 :com.databricks.spark.sqldw.SqlDWConnectorException:异常 在 SQL DW 连接器代码中遇到。
原因:java.io.IOException: No FileSystem for scheme: wasbs
我在保存增量表时遇到了同样的问题:pyspark-saving-is-not-working-when-called-from-inside-a-foreach
但在这种情况下,我只需要在增量表位置的开头设置“/dbfs/”,这样工作人员就可以将其保存在正确的位置。
基于此,我认为 worker 中缺少某些东西,这就是它没有正确执行此保存的原因。也许我应该在 spark config 中设置一个库。
我还查看了 databricks 社区:save-the-results-of-a-query-to-azure-blo,他们通过设置此配置设法解决了这个问题:
sc.hadoopConfiguration.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
PySpark:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
但它不起作用,我得到了这个异常:
引起:java.lang.RuntimeException: java.lang.ClassNotFoundException:类 org.apache.hadoop.fs.azure.NativeAzureFileSystem 未找到
org.apache.hadoop:hadoop-azure:3.2.0 已安装。
嗯,有什么帮助吗?
【问题讨论】:
很确定你还需要 azure-storage jars 我也想了一会儿。我尝试从工作人员内部将其添加到 spark 会话中,如下所示: ...config('spark.jars.packages', 'com.microsoft.azure:azure-storage:5.2.0')... Databricks 5.4使用天蓝色存储 5.2。但它失败了。 【参考方案1】:我相信你的主要问题是你试图从一个 foreach 循环中写入,这基本上会呈现任何类型的批处理/缩放没有意义——这就是 SQL DW 连接器的设计目的。如果你真的需要从循环中写出并且数据量不是太大,你可以使用简单的 JDBC 连接器来实现这一点:https://docs.databricks.com/spark/latest/data-sources/sql-databases.html
但仍要注意,SQL DW 确实针对大规模写入进行了优化,而不是针对单行摄取。
【讨论】:
这里有一个在foreach中写入DW的例子:docs.azuredatabricks.net/spark/latest/structured-streaming/… 区别在于流式上下文和foreachBatch。就我而言,我正在迭代 ID,每个 ID 都会为我提供读取正确文件并将其保存到 DW 所需的信息。 是的,这个例子也指流式传输。据我了解,foreachBatch 与您的 foreach 关系不大。 这很奇怪,我真的不明白为什么这只适用于流媒体场景。我会尽快尝试,然后我会提供反馈。 再一次,foreachBatch() 函数是特定于流编写器的,它不是像您的 rdd.foreach() 那样的循环以上是关于无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse的主要内容,如果未能解决你的问题,请参考以下文章
无法在 Databricks 中使用 pyspark 读取 json 文件
将 AWS S3 连接到 Databricks PySpark
如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合