使用来自 Google Pubsub 的消息并将其发布到 Kafka
Posted
技术标签:
【中文标题】使用来自 Google Pubsub 的消息并将其发布到 Kafka【英文标题】:Consuming messages from Google Pubsub and publishing it to Kafka 【发布时间】:2021-02-20 21:57:12 【问题描述】:我正在尝试使用同步 PULL API 来使用 Google PubSub 消息。这在 Apache Beam Google PubSub IO 连接器库中可用。 我想使用 KafkaIO 将消费的消息写入 Kafka。我想使用 FlinkRunner 来执行作业,因为我们在 GCP 之外运行这个应用程序。
我面临的问题是消费的消息没有在 GCP PubSub 中得到确认。我已经确认本地 Kafka 实例具有从 GCP PubSub 消耗的消息。 GCP DataFlow 中的文档表明,当管道使用数据接收器(在我的例子中是 Kafka)终止时,数据包得到最终确定。
但由于代码是在 Apache Flink 而不是 GCP DataFlow 中运行的,我认为某种回调不会被触发与确认提交的消息相关。 我在这里做错了什么?
pipeline
.apply("Read GCP PubSub Messages", PubsubIO.readStrings()
.fromSubscription(subscription)
)
.apply(ParseJsons.of(User.class))
.setCoder(SerializableCoder.of(User.class))
.apply("Filter-1", ParDo.of(new FilterTextFn()))
.apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
.apply("Write to Local Kafka",
KafkaIO.<Void,String>write()
.withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
.withTopic("test-topic")
.withValueSerializer((StringSerializer.class))
.values()
);
【问题讨论】:
你是如何验证消息没有被确认的? 在 google pubsub 控制台中,它显示了该订阅的未确认消息的图表。 您是否以流模式处理消息? 是的。我们使用 Google GCP pubsub Java 客户端使用同步 Pull API 来消费数据,该 API 具有内置的轮询机制,每个请求以 1000 条消息为单位批量消费数据,然后通过构建器管道顺序处理这些消息。 如果某处有显式标志来区分批处理/拉取 API。我不知道。 【参考方案1】:我修复此解决方案的方法是使用 Guillaume Blaquiere (https://***.com/users/11372593/guillaume-blaquiere) 建议查看检查点。即使在管道中添加了 Window.into() 函数,源 PubSub 订阅端点也没有收到 ACK。 问题出在 Flink 服务器配置中,我没有提到检查点配置。如果没有这些参数,检查点将被禁用。
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-1.9.3/state/checkpoints/
这些配置应该放在 flink_home/conf/flink-conf.yaml 中。 添加这些条目并重新启动 flink 后。在 GCP pubsub 监控图表中,所有积压(未确认的消息)都变为 0。
【讨论】:
【参考方案2】:在 Beam documentation on the PubSub IO class 中提到了这一点:
检查点既用于向 Pubsub 确认收到的消息(以便它们可能在 Pubsub 端停用),也用于在需要恢复检查点时对已使用的消息进行 NACK(以便 Pubsub 及时重新发送这些消息) .
ACK 未链接到数据流,您应该在数据流上具有相同的行为。 ack 在检查点上发送。通常,检查点是您在流中设置的窗口。
但是,你没有设置窗口!默认情况下,窗口是全局的,并且仅在最后关闭,如果你优雅地停止你的工作(甚至,我不确定这一点)。无论如何,更好的解决方案是设置固定窗口(例如 5 分钟)来确认每个窗口上的消息。
【讨论】:
谢谢!!我会试试这个并更新这个线程。非常感谢你! 这个解决方案对我不起作用。我确实将此添加到管道.apply( "FixedWindowsLabel", Window.into(FixedWindows.of(Duration.standardSeconds(5L))))
...但没有帮助
什么意思?管道工作,但消息没有确认,对吧?
是的。管道工作但没有 ACK。请注意,当我使用 DirectRunner(嵌入式默认运行程序)时,ACK 起作用。不知何故,ACK 在 FlinkRunner 中不起作用
这很奇怪,可能是一个错误。您可以尝试在Beam JIRA 中打开问题以上是关于使用来自 Google Pubsub 的消息并将其发布到 Kafka的主要内容,如果未能解决你的问题,请参考以下文章
使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出
通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery
如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息
如何从 PubSub 主题读取数据并将其解析到光束管道中并打印
是否可以从 PubSub 读取消息并将其数据分隔在 PCollection<String> 的不同元素中?如果是这样,怎么做?