从pyspark到cosmosdb插入多行

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从pyspark到cosmosdb插入多行相关的知识,希望对你有一定的参考价值。

我试图在pyspark中向数据框插入多行。这是我的代码:

首先我导入包:

import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents

然后,我定义connectionPolicy:

connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = {"Western Europe"}

凭证:

masterKey = 'yourmasterkey'
host = 'https://testcosmosdbasdada.documents.azure.com:443/'
client = document_client.DocumentClient(host,{'masterKey': masterKey}, connectionPolicy)

然后我定义数据库和集合的名称:

databaseId = 'pruebadb'
collectionId = 'collection1'

dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

注意:我应该在Azure套件中创建一个具有此名称的数据库和集合。然后我可以使用或CreateDocument或UpsertDocument。我将使用UpsertDocument。

client.UpsertDocument(collLink,{'attribute1': 4}, options=None)

这有效!正如您在文档中看到的那样:https://docs.microsoft.com/en-us/python/api/pydocumentdb/pydocumentdb.document_client.documentclient?view=azure-python#upsertdocument

但是我不知道如何一次插入一些行。这些证明不起作用:

1)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

'list'对象没有属性'get'

2)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

'list'对象没有属性'get'

3)

df = spark.read.csv('/FileStore/tables/points.csv', sep=';', header=True)
client.UpsertDocument(collLink, df, options=None)

'list'对象没有属性'get'

这些证明不起作用,因为我需要一个dict作为UpsertDocument()的第二个参数。

为了做到这一点,有pydocumentdb或其他python库的任何功能吗?

使用pyspark将数据从数据框插入CosmosDB的最佳性能方法如何?

答案

您可以使用Spark MongoDB连接器提供的DataFrameWriter API,而不是依赖于CosmosDB API。

以下代码应该有效:

df.write.format("com.mongodb.spark.sql.DefaultSource")
        .option("uri", "<CosmosDB URI>")
        .option("database","CosmosDB Database Name")
        .option("collection","CosmosDB Collection Name")
        .mode("append").save()

您需要通过在spark-submit命令中使用--jars参数或--packages参数将Spark-MongoDB连接器添加到类路径中。

例如:spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 <YOUR_SRC_FILE>.py

有关DataFrameWriter API的更多信息,请访问:http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

另一答案

感谢Sivaprasanna Sethuraman,我一直在调查。没有必要使用MongoDB。最后我发现:https://github.com/Azure/azure-cosmosdb-spark

如果需要在非空数据框上插入,请注意使用mode append:

writeConfig = {
 "Endpoint" : "yourhostcosmosdb",
 "Masterkey" : "yourmasterkey",
 "Database" : "pruebadb",
 "Collection" : "collection1",
}
df.write.format("com.microsoft.azure.cosmosdb.spark").mode('append').options(**writeConfig).save()

以上是关于从pyspark到cosmosdb插入多行的主要内容,如果未能解决你的问题,请参考以下文章

从 Spark 错误更新到 CosmosDB

如何在 pyspark 中启用 csv 文件的多行读取

从文本框表单中将多行插入到单个表中 - Access 2010

从 pyspark 中的多行文件中读取 JSON 文件

如何从pyspark中的文件中匹配/提取多行模式

无法使用 pySpark 从 Databricks 在 Cosmos DB/documentDB 中写入数据帧