无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse

Posted

技术标签:

【中文标题】无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse【英文标题】:Can't write to Azure Sql DataWarehouse from databricks pyspark workers 【发布时间】:2019-07-17 23:09:19 【问题描述】:

我正在尝试将数据简单地写入 azure sql DataWarehouse,同时使用 azure blob 存储进行暂存。

在 azure databricks 文档azure/sql-data-warehouse 中有一个非常简单的教程,如果您逐步遵循它,它会起作用。

但是在我的场景中,我必须从正在执行 foreach 的工作人员那里进行写入。

这里有一些与问题相关的链接:

error-using-pyspark-with-wasb-connecting-pyspark-with-azure-blob

github.com/Azure/mmlspark/issues/456

pyspark-java-io-ioexception-no-filesystem-for-scheme-https

所以,下面的代码WORKS

  spark = SparkSession.builder.getOrCreate()      
  spark.conf.set("fs.azure.account.key.<storageAccountName>.blob.core.windows.net", "myKey")  
  df = spark.createDataFrame([(1, 2, 3, 4), (5, 6, 7, 8)], ('a', 'b', 'c', 'd'))  

  (df.write 
  .format("com.databricks.spark.sqldw") 
  .option("url", "jdbc:sqlserver:...") 
  .option("user", "user@server") 
  .option("password", "pass") 
  .option("forwardSparkAzureStorageCredentials", "true") 
  .option("dbTable", "dbo.table_teste") 
  .option("tempDir", "wasbs://<container>@<storageAccountName>.blob.core.windows.net/") 
  .mode("append")
  .save())

但是,当我将上面的代码插入到 foreach 中时,它会失败,如下所示:

from pyspark.sql.session import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.getOrCreate()          

def iterate(row):
   # The code above

dfIter = spark.createDataFrame([(1, 2, 3, 4)], ('a', 'b', 'c', 'd'))
dfIter.rdd.foreach(iterate)

执行它会产生这个异常:

py4j.protocol.Py4JJavaError:调用 o54.save 时出错。 :com.databricks.spark.sqldw.SqlDWConnectorException:异常 在 SQL DW 连接器代码中遇到。

原因:java.io.IOException: No FileSystem for scheme: wasbs

我在保存增量表时遇到了同样的问题:pyspark-saving-is-not-working-when-called-from-inside-a-foreach

但在这种情况下,我只需要在增量表位置的开头设置“/dbfs/”,这样工作人员就可以将其保存在正确的位置。

基于此,我认为 worker 中缺少某些东西,这就是它没有正确执行此保存的原因。也许我应该在 spark config 中设置一个库。

我还查看了 databricks 社区:save-the-results-of-a-query-to-azure-blo,他们通过设置此配置设法解决了这个问题:

sc.hadoopConfiguration.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")

PySpark:

spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")

但它不起作用,我得到了这个异常:

引起:java.lang.RuntimeException: java.lang.ClassNotFoundException:类 org.apache.hadoop.fs.azure.NativeAzureFileSystem 未找到

org.apache.hadoop:hadoop-azure:3.2.0 已安装。

嗯,有什么帮助吗?

【问题讨论】:

很确定你还需要 azure-storage jars 我也想了一会儿。我尝试从工作人员内部将其添加到 spark 会话中,如下所示: ...config('spark.jars.packages', 'com.microsoft.azure:azure-storage:5.2.0')... Databricks 5.4使用天蓝色存储 5.2。但它失败了。 【参考方案1】:

我相信你的主要问题是你试图从一个 foreach 循环中写入,这基本上会呈现任何类型的批处理/缩放没有意义——这就是 SQL DW 连接器的设计目的。如果你真的需要从循环中写出并且数据量不是太大,你可以使用简单的 JDBC 连接器来实​​现这一点:https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

但仍要注意,SQL DW 确实针对大规模写入进行了优化,而不是针对单行摄取。

【讨论】:

这里有一个在foreach中写入DW的例子:docs.azuredatabricks.net/spark/latest/structured-streaming/… 区别在于流式上下文和foreachBatch。就我而言,我正在迭代 ID,每个 ID 都会为我提供读取正确文件并将其保存到 DW 所需的信息。 是的,这个例子也指流式传输。据我了解,foreachBatch 与您的 foreach 关系不大。 这很奇怪,我真的不明白为什么这只适用于流媒体场景。我会尽快尝试,然后我会提供反馈。 再一次,fo​​reachBatch() 函数是特定于流编写器的,它不是像您的 rdd.foreach() 那样的循环

以上是关于无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse的主要内容,如果未能解决你的问题,请参考以下文章

无法在 Databricks 中使用 pyspark 读取 json 文件

将 AWS S3 连接到 Databricks PySpark

如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合

Pyspark 无法从 pathlib 对象加载

使用 Pyspark 将 SQL 查询从 DataBricks 发送到 SQL Server [重复]

如何在 Databricks pyspark 中导入 Excel 文件