将 spark 数据帧行写入 dynamoDB 表中的项目

Posted

技术标签:

【中文标题】将 spark 数据帧行写入 dynamoDB 表中的项目【英文标题】:write spark dataframe rows as items in dynamoDB table 【发布时间】:2016-06-27 15:29:50 【问题描述】:

有没有办法将我的spark 数据框的每一行写为dynamoDB 表中的新项目? (在pySpark

我将此代码与boto3 库一起使用,但我想知道是否有另一种方法,避免pandasfor loop 步骤:

sparkDF_dict = sparkDF.toPandas().to_dict('records')
for item in sparkDF_dict :
    table.put_item(Item = item)

【问题讨论】:

这个问题有解决方案吗?关于这些东西的文档很少。似乎应该可以将 EMR 输出发送到 DynamoDB。 我有完全相同的要求,但需要写入超过 500 万行。对此我们是否有更强大和并行的解决方案? 这也是我正在遵循的方法。但是,大多数时候我在转换过程中收到数百万行的内存错误toPandas。我将 DF 拆分为多个较小的块,但在这种情况下,这项工作需要花费很多时间。也在寻找另一种方式。 【参考方案1】:

DynamoDB 提供BatchWriteItem API。它是 available in boto3,因此您可以在创建 sparkDF_dict 长 25 个元素的切片后调用它。请注意,BatchWriteItem API 仅支持writing 25 items at a time,并且并非所有写入一开始都可能成功(因为它们可能会在服务端受到限制并在响应的UnprocessedItems 部分返回给您)。您的应用程序需要查看响应中的 UnprocessedItems 并根据需要重试。

【讨论】:

以上是关于将 spark 数据帧行写入 dynamoDB 表中的项目的主要内容,如果未能解决你的问题,请参考以下文章

Spark:如何从 Spark 数据帧行解析和转换 json 字符串

Apache Spark:迭代数据帧行并通过 MutableList (Scala) 创建新数据帧

Spark中具有固定向量的数据帧行的点积

Amazon DynamoDB:警告“配置的 dynamodb 表 JobDetails 的写入吞吐量小于集群映射容量”

Pyspark 将 json 数组转换为数据帧行

DynamoDB 乐观锁