卡夫卡 -> Flink 数据流 -> MongoDB

Posted

技术标签:

【中文标题】卡夫卡 -> Flink 数据流 -> MongoDB【英文标题】:Kafka -> Flink DataStream -> MongoDB 【发布时间】:2016-05-11 13:34:51 【问题描述】:

我想设置 Flink,以便它将数据流从 Apache Kafka 转换并重定向到 MongoDB。出于测试目的,我在 flink-streaming-connectors.kafka 示例 (https://github.com/apache/flink) 之上构建。

Kafka 流被 Flink 正确设置为红色,我可以映射它们等,但是当我想将每个接收和转换的消息保存到 MongoDB 时就会出现问题。我发现的关于 MongoDB 集成的唯一示例是来自 github 的 flink-mongodb-test。不幸的是,它使用静态数据源(数据库),而不是数据流。

我相信 MongoDB 应该有一些 DataStream.addSink 实现,但显然没有。

实现它的最佳方法是什么?我是否需要编写自定义接收器功能,或者我可能遗漏了什么?也许应该以不同的方式完成?

我不依赖任何解决方案,所以任何建议都将不胜感激。

下面有一个例子,我得到了什么作为输入以及我需要存储什么作为输出。

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>

Flink: DataStream.map(
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
)
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection

正如你在这个例子中看到的,我使用 Flink 主要是为了 Kafka 的消息流缓冲和一些基本的解析。

【问题讨论】:

【参考方案1】:

Flink 目前没有可用的 Streaming MongoDB sink。

但是,将数据写入 MongoDB 有两种方式:

使用 Flink 的 DataStream.write() 调用。它允许您将任何 OutputFormat(来自 Batch API)与流式传输一起使用。使用 Flink 的 HadoopOutputFormatWrapper,可以使用官方的 MongoDB Hadoop 连接器

自己实现 Sink。使用 Streaming API 实现接收器非常容易,而且我确信 MongoDB 有一个很好的 Java 客户端库。

这两种方法都不提供任何复杂的处理保证。但是,当您将 Flink 与 Kafka(并启用检查点)一起使用时,您将拥有至少一次语义:在错误情况下,数据会再次流式传输到 MongoDB 接收器。 如果您正在进行幂等更新,重做这些更新不会导致任何不一致。

如果您确实需要 MongoDB 的一次性语义,您可能应该提交JIRA in Flink 并与社区讨论如何实现它。

【讨论】:

【参考方案2】:

作为 Robert Metzger 答案的替代方案,您可以将结果再次写入 Kafka,然后使用维护的 kafka 连接器之一将主题的内容放入 MongoDB 数据库中。

Kafka -> Flink -> Kafka -> Mongo/Anything

通过这种方法,您可以保持“至少一次语义”行为。

【讨论】:

以上是关于卡夫卡 -> Flink 数据流 -> MongoDB的主要内容,如果未能解决你的问题,请参考以下文章

处理卡夫卡中具有依赖关系的数据时的最佳实践?

卡夫卡快速入门

雪花卡夫卡连接器疑惑和疑问

卡夫卡:如何在卡夫卡实现循环分区

卡夫卡主题——我应该多点还是少点?

卡夫卡领导人多党选举与仲裁员/证人/观察员一起选举