从 Spark 错误更新到 CosmosDB

Posted

技术标签:

【中文标题】从 Spark 错误更新到 CosmosDB【英文标题】:Upsert to CosmosDB from Spark error 【发布时间】:2018-04-05 09:50:21 【问题描述】:

我对 Spark/CosmosDB/Python 非常陌生,所以我正在浏览 MS 网站和 GitHub 上的代码示例,同时尝试自己创建一些东西。经过与 Spark-CosmosDB 连接器的长期斗争,我能够从 CosmosDB 集合中读取数据。现在我想做相反的事情(upsert),但发现了另一个障碍。这是我要引用的示例: Writing to Cosmos DB section.

我能够从 Cosmos 读取数据,并对数据进行处理,但我无法插入回 Cosmos。以下是我稍作修改的代码:

%%configure
 "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": 
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   


# Read Configuration
readConfig = 
  "Endpoint" : "https://doctorwho.documents.azure.com:443/",
  "Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
  "Database" : "DepartureDelays",
  "preferredRegions" : "Central US;East US2",
  "Collection" : "flights_pcoll", 
  "SamplingRatio" : "1.0",
  "schema_samplesize" : "1000",
  "query_pagesize" : "2147483647",
  "query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"


# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

# Write configuration
writeConfig = 
 "Endpoint" : "https://doctorwho.documents.azure.com:443/",
 "Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
 "Database" : "DepartureDelays",
 "Collection" : "flights_pcoll",
 "Upsert" : "true"


# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()

所以,当我尝试运行它时,我得到:

An error occurred while calling o90.save.
: java.lang.UnsupportedOperationException: Writing in a non-empty collection.

快速谷歌搜索后,我尝试在最后一行添加 mode("append"):

flights.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

不幸的是,这给我留下了一个我无法理解的错误:

An error occurred while calling o127.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 4 times, most recent failure: Lost task 2.3 in stage 4.0 (TID 90, wn2-MDMstr.zxmmgisclg5udfemnv0v3qva3e.ax.internal.cloudapp.net, executor 2): java.lang.NoClassDefFoundError: com/microsoft/azure/documentdb/bulkexecutor/DocumentBulkExecutor

这里是完整的堆栈跟踪:error in pastebin

有人可以帮我解决这个错误吗?在使用我自己的 cosmosDB 时,我也收到了完全相同的错误,而不是文档中的示例。

我正在使用带有 PySpark3 内核的 Jupyter 笔记本。 Spark 2.2 版,HDInsight 群集 3.6。

编辑 我不想只是坐等回复,所以我用 Scala 尝试了同样的事情。你猜怎么了?相同的错误(或至少非常相似):Scala error

这是我的 Scala 代码:

%%configure
 "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": 
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

val readConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/",
  "Masterkey" -> "$my_key",
  "Database" -> "test",
  "PreferredRegions" -> "West Europe",
  "Collection" -> "$my_collection", 
  "SamplingRatio" -> "1.0"
))
val docs = spark.read.cosmosDB(readConfig)

docs.show()

val writeConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/",
  "Masterkey" -> "$my_key",
  "Database" -> "test",
  "PreferredRegions" -> "West Europe",
  "Collection" -> "$my_collection", 
  "WritingBatchSize" -> "100"
))




val someData = Seq(
    Row(8, "bat"),
    Row(64, "mouse"),
    Row(-27, "test_name")
)

val someSchema = List(
  StructField("number", IntegerType, true),
  StructField("name", StringType, true)
)

val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(someSchema)
)

someDF.show()

someDF.write.mode(SaveMode.Append).cosmosDB(writeConfig)

也许这有助于排除故障。

谢谢!

【问题讨论】:

问题解决了吗?如果是的话,你能发布一个解决方案吗? @DavidGreenshtein - 是和否 :) 我已经添加了我的答案,请检查是否有任何提示对您有帮助。 感谢@Jangcy 的详细回复,使用带有适当 HDI 和 Spark 版本的 uber jar 方法为我使用 Jupyther Notebook 完成了工作。我也在答案中发布了我的工作配置 【参考方案1】:

对于使用python 时的第一个问题,请注意您使用的是doctorwho Azure Cosmos DB 集合。这是一个演示集合,我们提供了只读密钥但不提供写入密钥。因此,您收到的错误是缺少对集合的写访问权限。

对于第二个问题,来自 pastebin 的错误看起来是一样的。说到这里,一些快速的观察:

您使用的是 HDI 3.6,如果您使用的是 Spark 2.1,并且使用的 JAR 用于 Spark 2.2。如果您使用的是 HDI 3.7,那么它在 Spark 2.2 上,然后您使用的是正确的 jar。 您可能希望使用 maven 坐标来获取最新版本的 JAR。请注意azure-cosmosdb-spark > Using Jupyter Notebooks 了解更多信息。

【讨论】:

嗨,丹尼。关于第一个问题(使用python 时),不幸的是,当我尝试更新插入到我自己的 Cosmos 数据库时,我收到了同样的错误。我确保我正在使用该数据库的读写主键。关于第二个,我的集群参数:Cluster type, HDI version Spark 2.2 on Linux (HDI 3.6)Head Nodes, Worker nodes D3 v2 (x2), D12 v2 (x2) 基于此 - 我应该使用 2.2 的连接器,对吗?我将尝试检查您的最新链接以查看一些详细信息。如果你想出了别的东西,请告诉我。谢谢 不幸的是,经过一整天的尝试,我无法插入数据 - 我尝试了不同的 jar,包括 uber,但仍然没有运气。我现在正在考虑尝试使用较旧的集群(3.5)来检查我是否会成功插入 - 也许连接器本身有问题? 抱歉,您遇到了问题 - 您能否直接在 denny [dot] lee [at] microsoft.com 上联系我,或许我可以提供帮助? Denny,非常感谢您在这个问题上花费的时间和帮助。我知道我们已经找到了一些解决方法。如果我在某些时候错了,请检查我的答案并纠正我。对于新版本的连接器,我一直为 MS 团队祈祷,我希望它能在 Zeppelin、Jupyter 和通过 SSH 中正常工作:)【参考方案2】:

在与微软工程师沟通后,我自己进行了一些测试,发现 Spark-CosmosDB 连接器存在一些问题。基本上,要使用的连接器的最佳版本是 1.0.0,日期为 2017 年 11 月 15 日(适用于 Spark 2.1 和 2.2)。 Link to the repository 以下是一些对我有用的解决方案/解决方法。您可以尝试与它们进行试验以找到最适合您的解决方案。

1) 如果您使用 Spark 2.1 或 2.2,请使用版本 1.0.0 中的连接器(上面的链接)。在我写这个答案(2018 年 5 月 18 日)时,最新版本的连接器是 2018 年 3 月 23 日的 1.1.1 - 当需要将数据帧写入 Cosmos DB 或尝试计算从 Cosmos 读取的超过 50k 文档数据帧(什么是 50k 文档对于非 SQL 数据库?)。

2) 如果您使用 Spark 2.1 -> Jupter 将与 1.0.0 连接器一起使用。如果您使用 Spark 2.2 -> 不要使用 Jupyter notebook - 使用外部包会出现一些问题,尤其是在 Spark 2.2 安装中。 请改用 Zeppelin 笔记本(带有 1.0.0 连接器)。打开 Zeppelin 后,在右上角单击用户,然后单击解释器。进入 Livy 解释器设置点击编辑并添加包坐标:com.microsoft.azure:azure-cosmosdb-spark_2.2.0_2.11:1.0.0

保存并重新启动解释器。然后使用 livy2 解释器创建一个新笔记本。需要注意的是,在 Zeppelin 的每个单元格中,您必须在第一行添加 %pyspark 魔术命令。由于启动整个应用程序,第一个单元格的运行将持续 1-2 分钟。

3) 您可以直接使用集群,而不是使用笔记本。使用 putty SSH 到您的集群,使用创建集群时提供的 sshuser 和密码:

然后启动 pyspark 附加 uber-jar 文件(您必须从存储库下载 uber-jar 文件,然后将其上传到连接到集群的 blob 存储。在我的情况下,文件位于名为 example 的文件夹中(从根目录的第一级容器)。这里我也使用了 1.0.0 连接器。 这是命令:

pyspark --master yarn --jars wasb:///example/azure-cosmosdb-spark_2.2.0_2.11-1.0.0-uber.jar

当 spark 准备好后,您可以粘贴并运行您的命令,一切都会正常运行。

如果您有任何问题或不清楚的地方,请告诉我。

【讨论】:

【参考方案3】:

由于我找不到该问题的正确答案解决方案,我想分享我的工作配置。我的配置适用于带有 Spark 2.1 的 HDI 3.6。使用 Jupyther Notebook 的 PySpark 脚本成功地从 Cosmos Document DB 读取和写入数据。

%%configure

 "name":"Spark-to-Cosmos_DB_Connector", 
 "jars": ["wasb:///cosmos-libs/azure-cosmosdb-spark_2.1.0_2.11-1.0.0-uber.jar"],
 "conf": "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"

读取和写入配置,读取和保存命令与问题中描述的完全相同。写入配置有附加参数WritingBatchSize 描述here。我从这个location下载的Uber jar。

【讨论】:

没错,HDI 3.6 和 Spark 2.1 与 Jupyter 和 1.0.0 连接器配合得很好(重要)。不幸的是 - 你的方法在 Spark 2.2 中不起作用:((现在,我希望他们能修复它)。如果你需要 Spark 2.2,你也可以查看我的答案。

以上是关于从 Spark 错误更新到 CosmosDB的主要内容,如果未能解决你的问题,请参考以下文章

CosmosDB - Mongodb IsUpsert 不适用于批量更新

Cosmos DB,如果我在使用 SkipToken 查询时更新某些项目会发生啥?

无法使用microsoft.azure.documentdb.core更新Azure CosmosDB中的文档

(AZURE cosmosDB/mongoDB) 更新数组中对象的特定元素的字段

Spark 失败,因为 S3 文件已更新。如何消除这个错误?

spark常见错误持续更新