使用 PySpark 从 MongoDB 到 Elasticsearch
Posted
技术标签:
【中文标题】使用 PySpark 从 MongoDB 到 Elasticsearch【英文标题】:MongoDB to Elasticsearch using PySpark 【发布时间】:2021-12-22 23:50:00 【问题描述】:我想使用 PySpark 将我的 MongoDB 集合集成到 Elasticsearch。我有我的 MongoDB 的连接字符串,但我不知道如何构造我的代码或指定一些参数。有人可以给我创建此任务的代码示例吗?虽然我还是这个领域的新手,但我尝试阅读一些文档,但我发现自己卡在了一些参数上,仍然没有清楚地了解这项任务的流程。谢谢你帮助我。
【问题讨论】:
请澄清您的具体问题或提供其他详细信息以准确突出您的需求。正如目前所写的那样,很难准确地说出你在问什么。 【参考方案1】:在 Spark 中,您可以使用 RDD 或 DataFrame。
RDD 不是结构化的,是较旧的 Spark API。
要使用 RDD,您需要使用 MapPartition
、ForEachPartition
等。
DataFrames 使用类似于 ANSI SQL 的 Spark SQL 函数来构建和启用。
MongoDB 启用了每个文档的模式。 Spark DataFrames 要求每个 DataFrame 有一个统一的架构。
如果您的集合对每个集合的所有文档都有统一的架构(取决于插入的数据),您可以手动(通过代码)创建 Spark 架构定义(PySpark 中的pyspark.sql.types.StructType
)。例如,请查看 StructType、DataType、StructDef、FieldType、ArrayDef、DictDef、StringType 等的 Spark 文档。
如果您的 MongoDB 集合没有统一架构,您将需要使用 RDD,然后通过 MapPartition
将其转换为统一架构,以便您将此统一架构用于 Elasticsearch 索引。
您可以使用 Spark SQL 通过 pyspark 从 Mongo 读取 DataFrames:spark_session.read.format('mongo').option('uri', 'mongodb://uri.to.mongo.goes.here').schema(schema=spark_schema_goes_here)
并类似地使用 df.write.format('mongo').option('uri', uri).mode(write_mode_eg_append).save()
写入
我不记得 Elastic 语法,但是,它是相似的,并且定义是每个索引的架构。尝试在 MongoDB 站点和 Elasticsearch 站点中搜索 Spark 以查找更多详细信息。
可以从 Spark 架构定义中自动创建 Elastic 索引定义,我曾经这样做过,但是没有代码。
【讨论】:
感谢您的解释!我找到了这个网站simplernerd.com/migrate-mongo-elasticsearch 然后我认为这可能是可能的解决方案。但是,我仍然对如何配置 Elasticsearch 配置感到困惑。你或其他人能帮我理解这段代码吗? @sam.marhaendra 您可以在该站点中提问,您可以尝试自己调试它,并在此处的新问题中询问您不了解的部分。包括代码本身,但没有链接。以上是关于使用 PySpark 从 MongoDB 到 Elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章
无法读取 pyspark 中的 mongodb 数据(json)
PySpark 读取 MongoDB 报错 Cursor not found / no longer available
Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢
使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧