确认后 GCP 消息保留在 Pub/Sub 中

Posted

技术标签:

【中文标题】确认后 GCP 消息保留在 Pub/Sub 中【英文标题】:GCP message stays in the Pub/Sub after acknowledge 【发布时间】:2020-02-05 22:40:08 【问题描述】:

我将 Pub/Sub 订阅逻辑包装在订阅方法中,该方法在每个订阅的服务初始化期间调用一次:

    def subscribe(self,
                  callback: typing.Callable,
                  subscription_name: str,
                  topic_name: str,
                  project_name: str = None) -> typing.Optional[SubscriberClient]:
        """Subscribes to Pub/Sub topic and return subscriber client

        :param callback: subscription callback method
        :param subscription_name: name of the subscription
        :param topic_name: name of the topic
        :param project_name: optional project name. Uses default project if not set
        :return: subscriber client or None if testing
        """
        project = project_name if project_name else self.pubsub_project_id
        self.logger.info('Subscribing to project ``, topic ``'.format(project, topic_name))

        project_path = self.pubsub_subscriber.project_path(project)
        topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
        subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)

        # check if there is an existing subscription, if not, create it
        if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
            self.logger.info('Creating new subscription ``, topic ``'.format(subscription_name, topic_name))
            self.pubsub_subscriber.create_subscription(subscription_path, topic_path)

        # subscribe to the topic
        self.pubsub_subscriber.subscribe(
            subscription_path, callback=callback,
            scheduler=self.thread_scheduler
        )
        return self.pubsub_subscriber

这个方法是这样调用的:

        self.subscribe_client = self.subscribe(
            callback=self.pubsub_callback,
            subscription_name='subscription_topic',
            topic_name='topic'
        )

回调方法做了很多事情,发送 2 封电子邮件然后确认消息

    def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
        self.logger.debug('Processing pub sub message')

        try:
            self.do_something_with_message(data)

            self.logger.debug('Acknowledging the message')
            data.ack()
            self.logger.debug('Acknowledged')
            return

        except:
            self.logger.warning(
                "message": "Failed to process Pub/Sub message",
                "request_size": data.size,
                "data": data.data
            , exc_info=True)

        self.logger.debug('Acknowledging the message 2')
        data.ack()

当我向订阅运行推送某些内容时,回调运行,打印所有调试消息,包括Acknowledged。但是,消息保留在 Pub/Sub 中,回调被再次调用,并且每次重试后都需要指数级的时间。问题是,即使在调用 ack 之后,什么可能导致消息保留在 pub/sub 中?

我有几个这样的订阅,它们都按预期工作。截止日期不是一个选项,回调几乎立即完成,无论如何我都玩了确认截止日期,没有任何帮助。

当我尝试从连接到该 pub-sub 的本地运行的应用程序处理这些消息时,它完成得很好,并且确认按预期将消息从队列中取出。

所以问题只出现在已部署的服务中(在 kubernetes pod 中运行) 回调执行 buck ack 似乎什么都没做 从本地运行的脚本(...并执行完全相同的操作)或通过 GCP UI 确认消息按预期工作。

有什么想法吗?

【问题讨论】:

【参考方案1】:

我做了一些额外的测试,终于找到了问题所在。

TL;DR:我对所有订阅都使用相同的google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler

这是我用来测试它的代码的 sn-ps。这是损坏的版本:

server.py

import concurrent.futures.thread
import os
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler


def create_subscription(project_id, topic_name, subscription_name):
    """Create a new pull subscription on the given topic."""
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path)

    print('Subscription created: '.format(subscription))


def receive_messages(project_id, subscription_name, t_scheduler):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: '.format(message.data))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback, scheduler=t_scheduler)
    print('Listening for messages on '.format(subscription_path))


project_id = os.getenv("PUBSUB_PROJECT_ID")

publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project_id)

# Create both topics
try:
    topics = [topic.name.split('/')[-1] for topic in publisher.list_topics(project_path)]
    if 'topic_a' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_a'))
    if 'topic_b' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_b'))
except AlreadyExists:
    print('Topics already exists')

# Create subscriptions on both topics
sub_client = pubsub_v1.SubscriberClient()
project_path = sub_client.project_path(project_id)

try:
    subs = [sub.name.split('/')[-1] for sub in sub_client.list_subscriptions(project_path)]
    if 'topic_a_sub' not in subs:
        create_subscription(project_id, 'topic_a', 'topic_a_sub')
    if 'topic_b_sub' not in subs:
        create_subscription(project_id, 'topic_b', 'topic_b_sub')
except AlreadyExists:
    print('Subscriptions already exists')

scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(10))

receive_messages(project_id, 'topic_a_sub', scheduler)
receive_messages(project_id, 'topic_b_sub', scheduler)

while True:
    time.sleep(60)

client.py

import datetime
import os
import random
import sys
from time import sleep

from google.cloud import pubsub_v1


def publish_messages(pid, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(pid, topic_name)

    for n in range(1, 10):
        data = '[ - ] Message number '.format(datetime.datetime.now().isoformat(), topic_name, n)
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)
        sleep(random.randint(10, 50) / 10.0)


project_id = os.getenv("PUBSUB_PROJECT_ID")
publish_messages(project_id, sys.argv[1])

我连接到云发布/订阅,服务器创建主题和订阅。然后我为这两个主题并行运行了多次客户端脚本。不久之后,一旦我更改服务器代码以在 receive_messages 范围内实例化新的线程调度程序,服务器就会清理两个主题并按预期运行。

令人困惑的是,无论哪种情况,服务器都会打印出所有消息的接收消息。

我要把这个发到https://github.com/googleapis/google-cloud-python/issues

【讨论】:

【参考方案2】:

确认是 Pub/Sub 中的最大努力,因此重新传递消息是可能的,但不常见。

如果您一直收到重复消息,可能是由于重复发布了相同的消息内容。就 Pub/Sub 而言,这些是不同的消息,将被分配不同的消息 ID。检查 Pub/Sub 提供的消息 ID,以确保您实际上多次收到相同的消息。

有an edge case in dealing with large backlogs of small messages with streaming pull(这是 Python 客户端库使用的)。如果您正在运行多个订阅同一订阅的客户端,则这种边缘情况可能是相关的。

您还可以查看您订阅的Stackdriver metrics 以查看:

如果它的确认发送成功 (subscription/ack_message_count) 如果它的积压正在减少 (subscription/backlog_bytes) 如果您的订阅者错过了确认截止日期(subscription/streaming_pull_ack_message_operation_countresponse_code != "success" 过滤)

如果您没有错过确认截止日期并且您的积压工作保持稳定,您应该联系 Google Cloud 支持并提供您的项目名称、订阅名称和重复消息 ID 的示例。他们将能够调查为什么会发生这些重复。

【讨论】:

以上是关于确认后 GCP 消息保留在 Pub/Sub 中的主要内容,如果未能解决你的问题,请参考以下文章

GCP Pub/Sub 无法确认消息

即使在确认后,应用程序也接受来自 google Pub/Sub 的重复消息

如何在 GCP Pub/sub 中修复来自推送订阅的多条消息

gcp 云函数 pub/sub 主题死信

Google Cloud Functions 仅在成功时确认 Pub/Sub(GCP 解决的问题)

如何在 GCP Pub Sub 中配置非持久消息?