Google Cloud PubSub 不确认消息
Posted
技术标签:
【中文标题】Google Cloud PubSub 不确认消息【英文标题】:Google Cloud PubSub not ack messages 【发布时间】:2019-07-02 23:38:09 【问题描述】:我们有基于 GCP PubSub 的发布者和订阅者系统。订阅者处理单个消息的时间很长,大约 1 分钟。我们已经将订阅者确认截止时间设置为 600 秒(10 分钟)(最长 1 秒),以确保 pubsub 不会过早开始重新交付,因为基本上我们这里有长时间运行的操作。
我看到了 PubSub 的这种行为。当代码发送 ack 和监视器确认 PubSub 确认请求已被接受并且确认本身已完成并成功状态时,未确认消息的总数仍然相同。
图表上的指标对总和、计数和均值聚合调整器显示相同。上图中的 aligner 是平均值,没有启用减速器。
我正在使用 @google-cloud/pubsub Node.js 库。已经尝试过不同的版本(0.18.1、0.22.2、0.24.1),但我想问题不在其中。
下面的类可以用来检查。
TypeScript 3.1.1,节点 8.x.x - 10.x.x
import exponential, Backoff from "backoff";
const pubsub = require("@google-cloud/pubsub");
export interface IMessageHandler
handle (message): Promise<void>;
export class PubSubSyncListener
private readonly client;
private listener: Backoff;
private runningOperations: Promise<unknown>[] = [];
constructor (
private readonly handler: IMessageHandler,
private readonly options:
/**
* Maximal messages number to be processed simultaniosly.
* Listener will try to keep processing number as close to provided value
* as possible.
*/
maxMessages: number;
/**
* Formatted full subscrption name /projects/projectName/subscriptions/subscriptionName
*/
subscriptionName: string;
/**
* In milliseconds
*/
minimalListenTimeout?: number;
/**
* In milliseconds
*/
maximalListenTimeout?: number;
)
this.client = new pubsub.v1.SubscriberClient();
this.options = Object.assign(
minimalListenTimeout: 300,
maximalListenTimeout: 30000
, this.options);
public async listen ()
this.listener = exponential(
maxDelay: this.options.maximalListenTimeout,
initialDelay: this.options.minimalListenTimeout
);
this.listener.on("ready", async () =>
if (this.runningOperations.length < this.options.maxMessages)
const [response] = await this.client.pull(
subscription: this.options.subscriptionName,
maxMessages: this.options.maxMessages - this.runningOperations.length
);
for (const m of response.receivedMessages)
this.startMessageProcessing(m);
this.listener.reset();
this.listener.backoff();
else
this.listener.backoff();
);
this.listener.backoff();
private startMessageProcessing (message)
const index = this.runningOperations.length;
const removeFromRunning = () =>
this.runningOperations.splice(index, 1);
;
this.runningOperations.push(
this.handler.handle(this.getHandlerMessage(message))
.then(removeFromRunning, removeFromRunning)
);
private getHandlerMessage (message)
message.message.ack = async () =>
const ackRequest =
subscription: this.options.subscriptionName,
ackIds: [message.ackId]
;
await this.client.acknowledge(ackRequest);
;
return message.message;
public async stop ()
this.listener.reset();
this.listener = null;
await Promise.all(
this.runningOperations
);
这基本上是异步拉取消息和立即确认的部分实现。因为建议的解决方案之一是使用同步拉动。
如果我没记错问题的症状,我在 java 存储库中发现了类似的报告问题。
https://github.com/googleapis/google-cloud-java/issues/3567
这里的最后一个细节是,确认似乎适用于少量请求。如果我在 pubsub 中触发单个消息然后立即处理它,未传递的消息数量会减少(下降到 0,因为之前只有一条消息)。
问题本身 - 发生了什么以及为什么未确认的消息数量没有在收到确认后减少?
【问题讨论】:
【参考方案1】:引用the documentation 的话,您使用的订阅/num_undelivered_messages 指标是“订阅中未确认的消息(也称为积压消息)的数量。每 60 秒采样一次。采样后,数据不是最长可见 120 秒。"
您不应期望该指标会在确认消息后立即降低。此外,听起来好像您正在尝试将 pubsub 用于只发送一次的情况,试图确保不会再次发送消息。 Cloud Pub/Sub 不提供这些语义。它至少提供一次语义。换句话说,即使您收到了一个值,确认了它,收到了确认响应,并且看到指标从 1 下降到 0,但同一个工作人员或另一个工作人员仍然有可能并且正确地接收到该消息的完全相同的副本.尽管在实践中这不太可能,但您应该专注于构建一个允许重复的系统,而不是试图确保您的 ack 成功,这样您的消息就不会被重新传递。
【讨论】:
以上是关于Google Cloud PubSub 不确认消息的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Firebase Cloud Functions 中确认 PubSub 消息?
使用 Google Cloud PubSub 不断收到“向 Cloud PubSub 发送测试消息时出错...”