如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合
Posted
技术标签:
【中文标题】如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合【英文标题】:How to overwrite/update a collection in Azure Cosmos DB from Databrick/PySpark 【发布时间】:2020-02-02 14:44:50 【问题描述】:我在 Databricks Notebook 上编写了以下 PySpark 代码,该代码成功地将结果从 sparkSQL 保存到 Azure Cosmos DB,并使用以下代码行:
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
完整代码如下:
test = spark.sql("""SELECT
Sales.CustomerID AS pattersonID1
,Sales.InvoiceNumber AS myinvoicenr1
FROM Sales
limit 4""")
## my personal cosmos DB
writeConfig3 =
"Endpoint": "https://<cosmosdb-account>.documents.azure.com:443/",
"Masterkey": "<key>==",
"Database": "mydatabase",
"Collection": "mycontainer",
"Upsert": "true"
df = test.coalesce(1)
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
使用上面的代码,我已成功写入我的 Cosmos DB 数据库 (mydatabase) 和集合 (mycontainer)
当我尝试通过更改 SparkSQL 来覆盖容器时(只需将 patersonID1 更改为 patersonID2,并将 myinvoicenr1 更改为 myinvoicenr2
test = spark.sql("""SELECT
Sales.CustomerID AS pattersonID2
,Sales.InvoiceNumber AS myinvoicenr2
FROM Sales
limit 4""")
使用新查询覆盖/更新集合,Cosmos DB 会按如下方式附加容器:
并且仍然将原始查询留在集合中:
有没有办法完全覆盖或更新 cosmos DB?
【问题讨论】:
哎呀——你刚刚嵌入了你的 Cosmos DB 密钥并将它分享给了全世界。请尽快重新生成您的密钥。我编辑了帐户名称和他们的问题,但任何有足够代表的人都可以看到修订历史记录。 感谢大卫,密钥已重新生成。谢谢 【参考方案1】:您的问题是文档有一个唯一的id
(您从未指定过,因此会自动为您生成作为 guid)。当您编写新文档时,您刚刚将一个非id
、非唯一属性pattersonID1
重命名为pattersonID2
,它只是按预期创建了一个新文档。不可能知道这个新文档是否与原始文档相关,因为它是一个全新的文档,有自己的一组属性。
您可以通过查询(或阅读)、修改和替换现有文档来更新现有文档。或者,您可以选择查询旧文档并删除它们(一个接一个,或作为分区内的一批删除,通过存储过程以事务方式进行)。最后,您可以删除并重新创建一个容器,这将删除当前存储在其中的所有文档。
【讨论】:
哦,我明白了。我从没想过 id - 很好。是否有显示如何更新现有文档/集合的链接?【参考方案2】:您可以使用适用于 Python 的 Azure Cosmos DB SQL API SDK 来管理数据库及其包含在此 NoSQL 数据库服务中的 JSON 文档,而不是使用 Spark 到 Cosmos DB 连接器:
创建 Cosmos DB 数据库并修改其设置
创建和修改容器以存储 JSON 文档集合
创建、读取、更新和删除容器中的项目(JSON 文档)
使用类似 SQL 的语法查询数据库中的文档。
Azure Cosmos DB SQL API client library for Python
【讨论】:
以上是关于如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合的主要内容,如果未能解决你的问题,请参考以下文章