来自 Databricks Notebook 的 COSMOS DB 写入问题

Posted

技术标签:

【中文标题】来自 Databricks Notebook 的 COSMOS DB 写入问题【英文标题】:COSMOS DB write issue from Databricks Notebook 【发布时间】:2020-07-06 00:08:35 【问题描述】:

根据 databricks 文档-https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html,我下载了最新的 azure-cosmosdb-spark 库 (azure-cosmosdb-spark_2.4.0_2.11-2.1.2-uber.jar) 并放入库中dbfs 的位置。

当尝试将数据从数据帧写入 COSMOS 容器时,我收到以下错误,任何帮助将不胜感激。

我的 Databricks 运行时版本是:7.0(包括 Apache Spark 3.0.0、Scala 2.12)

从笔记本导入:

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.DataFrame, SaveMode, SparkSession
import org.apache.spark.sql.types.StructField, _
import org.apache.spark.sql.functions._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config._

val dtcCANWrite = Config(Map(
  "Endpoint" -> "NOT DISPLAYED",
  "Masterkey" -> "NOT DISPLAYED",
  "Database" -> "NOT DISPLAYED",
  "Collection" -> "NOT DISPLAYED",
  "preferredRegions" -> "NOT DISPLAYED",
  "Upsert" -> "true"
))

distinctCANDF.write.mode(SaveMode.Append).cosmosDB(dtcCANWrite)

错误:

    at com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfigBuilder.<init>(CosmosDBConfigBuilder.scala:31)
    at com.microsoft.azure.cosmosdb.spark.config.Config$.apply(Config.scala:259)
    at com.microsoft.azure.cosmosdb.spark.config.Config$.apply(Config.scala:240)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:7)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:69)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:71)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:73)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:75)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:77)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:79)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:81)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:83)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:85)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:87)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:89)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:91)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw.<init>(command-3649834446724317:93)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw.<init>(command-3649834446724317:95)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw.<init>(command-3649834446724317:97)
    at line6d80624d7a774601af6eb962eb59453253.$read.<init>(command-3649834446724317:99)
    at line6d80624d7a774601af6eb962eb59453253.$read$.<init>(command-3649834446724317:103)
    at line6d80624d7a774601af6eb962eb59453253.$read$.<clinit>(command-3649834446724317)
    at line6d80624d7a774601af6eb962eb59453253.$eval$.$print$lzycompute(<notebook>:7)
    at line6d80624d7a774601af6eb962eb59453253.$eval$.$print(<notebook>:6)
    at line6d80624d7a774601af6eb962eb59453253.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:202)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:396)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:230)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:268)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfigBuilder.<init>(CosmosDBConfigBuilder.scala:31)
    at com.microsoft.azure.cosmosdb.spark.config.Config$.apply(Config.scala:259)
    at com.microsoft.azure.cosmosdb.spark.config.Config$.apply(Config.scala:240)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:7)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:69)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:71)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:73)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:75)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:77)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:79)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:81)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:83)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:85)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:87)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:89)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw$$iw.<init>(command-3649834446724317:91)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw$$iw.<init>(command-3649834446724317:93)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw$$iw.<init>(command-3649834446724317:95)
    at line6d80624d7a774601af6eb962eb59453253.$read$$iw.<init>(command-3649834446724317:97)
    at line6d80624d7a774601af6eb962eb59453253.$read.<init>(command-3649834446724317:99)
    at line6d80624d7a774601af6eb962eb59453253.$read$.<init>(command-3649834446724317:103)
    at line6d80624d7a774601af6eb962eb59453253.$read$.<clinit>(command-3649834446724317)
    at line6d80624d7a774601af6eb962eb59453253.$eval$.$print$lzycompute(<notebook>:7)
    at line6d80624d7a774601af6eb962eb59453253.$eval$.$print(<notebook>:6)
    at line6d80624d7a774601af6eb962eb59453253.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:202)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:396)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:230)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:268)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

您链接到的文档页面明确指出:“您无法从运行 Databricks Runtime 7.0 或更高版本的集群访问此数据源,因为不支持 Apache Spark 3.0 的 Azure Cosmos DB 连接器可用。”在您的问题中,您提到您使用的是 Runtime v7.0。看来那是你的问题,不是吗? 另外:您编写的代码与您指向的文档中给出的示例不同:您没有调用CosmosDBSpark.save()。你确定你的语法是正确的吗?我在文档中没有看到任何等效的语法。 您在此处存在版本冲突,您在 Scala 2.11 上使用 azure-cosmosdb-spark_2.4.0_2.11-2.1.2-uber.jar lib,在 Databricks 上使用 Scala 2.12。您必须更新 lib 版本,或在 Datbricks 上降级 Spark/Scala 版本。 您好@DavidMakogon,Rayan 感谢您的建议,我已将 Spark Scala 版本从 Spark 3.0.0、Scala 2.12 降级为带有 Scala 2.11 的 Spark 2.4.4。现在一切正常。 嗨,我已经在答案中更新了这个问题的解决方案。你能把它标记为结束这个问题的答案吗?这可能会帮助其他遇到类似问题的人。您也可以发布自己的答案并将其标记为结束此问题。如果是这样,请告诉我,我将删除我的答案。:) 【参考方案1】:

感谢 Rayan Ral 的建议。所以问题出在版本冲突上。

解决方案是降级版本。在这种情况下,您可以从 Spark 3.0.0、Scala 2.12 降级到 Spark 2.4.4、Scala 2.11。

【讨论】:

以上是关于来自 Databricks Notebook 的 COSMOS DB 写入问题的主要内容,如果未能解决你的问题,请参考以下文章

python Databricks_notebook_run

如何动态获取 Databricks Notebook 的路径?

从 Azure Databricks Notebook 访问 SQL Server

在 Azure Databricks Notebook 上检索群集不活动时间

Azure Databricks Notebook 在包中时无法找到“dbutils”

如何使用 python 从 azure databricks notebook 连接到本地 Windows 服务器?