为啥 GCP Pub/Sub 订阅无法确认消息?

Posted

技术标签:

【中文标题】为啥 GCP Pub/Sub 订阅无法确认消息?【英文标题】:Why GCP Pub/Sub Subscription Fails to Acknowledge Messages?为什么 GCP Pub/Sub 订阅无法确认消息? 【发布时间】:2019-04-10 00:56:15 【问题描述】:

我对 GCP Pub/Sub 订阅者消息确认有疑问。我编写了三个类:PubSubPublisher(发布到主题)、PubSubSubscriber(从订阅和索引接收到 elasticsearch)和 ElasticDailyIndexManager(Thread)。这就是我对他们所做的:

    我使用 PubSubPublisher 向我的主题发布了 10 万条虚拟消息(每秒大约 1 万条消息)。

    我第一次运行 PubSubSubscriber.receive_and_index()。在处理消息的同时,它还使用 10 个 ElasticDailyIndexManager 线程将其索引到 Elasticsearch。基本上我连接到订阅并阅读(和索引)60 秒然后退出。 100k 被索引到 Elasticsearch(每秒大约 1.5k 条消息)。

    我第二次运行 PubSubSubscriber.receive_and_index() - 我希望没有任何内容被索引,但是,大约 40k 新文档被索引(Id 在 elasticsearch 中是随机的)。

有时它需要运行大约 3-4 次才能清空订阅,所以我想确认消息存在问题,但在运行时没有错误。我将数据与 message.message_id 一起索引,很明显我有多行具有相同的 message_id。

我看不出订阅者无法确认的原因,因为它不会引发任何错误。我在订阅上设置了 30 秒的确认超时,但它没有帮助。

下面提到的类的代码:

class ProcessFutures(Thread):
    def __init__(self, futures_queue):
        Thread.__init__(self)

        self.queue = futures_queue

        self.counter = 0

        self.results = list()

        self.daemon = True

        self.start()

    def run(self):
        while getattr(self, 'keep_going', True):
            future = self.queue.get()

            self.results.append(future.result())

            self.queue.task_done()


class PubSubPublisher:
    def __init__(self, project_id, topic_name):
        self.client = pubsub_v1.PublisherClient()
        self.project_id = project_id
        self.topic_name = topic_name

        self.keep_going = True
        self.futures_queue = Queue()
        self.future_process = ProcessFutures(self.futures_queue)

    def publish_message(self, message_body):
        """
        Publishes message to a Pub/Sub topic.

        future.result is verified in separate thread to avoid blocking of message publishing.
        """

        topic_path = self.client.topic_path(self.project_id, self.topic_name)

        if isinstance(message_body, dict):
            data = dumps(message_body)
        elif isinstance(message_body, str):
            data = message_body
        else:
            raise BaseException

        data = data.encode('utf-8')

        future = self.client.publish(topic_path, data=data)

        self.futures_queue.put(future)

    def finish(self):
        self.future_process.queue.join()

        print("Processed results: " + str(len(self.future_process.results)))


@click.command()
@click.option('--project-id', '-p', required=True, type=str, help='Google Cloud Platform Project Id')
@click.option('--topic', '-t', required=True, type=str, help='Pub/Sub Topic to which messages will be published')
@click.option('--message', '-m', required=True, type=str, help='Message body')
@click.option('--amount', '-a', required=True, type=int, help='How many messages to send')
def run(project_id, topic, message, amount):
    from time import time

    psp = PubSubPublisher(project_id, topic)

    time_start = time()

    for i in range(amount):
        message_body = dict(i=i, message=message)
        psp.publish_message(message_body)

    psp.finish()

    time_stop = time()

    seconds = time_stop - time_start

    print("Published  messages in :.2f seconds. That is :.2f mps!".format(amount, seconds,
                                                                                amount / seconds))

from elasticsearch import Elasticsearch, ElasticsearchException, NotFoundError, helpers
from datetime import datetime
from json import load
from threading import Thread
from queue import Queue
from os import getenv

from config import BASE_PATH


class ElasticDailyIndexManager(Thread):
    def __init__(self, index_basename):
        Thread.__init__(self)

        es_port = 9200 if ElasticDailyIndexManager.str_to_bool(getenv("RUNNING_IN_CONTAINER", "False")) else 9201
        es_url = "elastic" if ElasticDailyIndexManager.str_to_bool(getenv("RUNNING_IN_CONTAINER", "False")) else "localhost"

        self.es = Elasticsearch(hosts=[es_url], port=es_port)

        self.index_template_name = index_basename
        self.index_name_mask = index_basename if index_basename.endswith("-") else index_basename + "-"

        while not self._template_exists():
            self._register_index_template()

        self.queue = Queue()
        self.daemon = True

        self.start()

    def run(self):
        def generator():
            while True:
                message_body, message_id = self.queue.get()

                metadata = dict()
                self.queue.task_done()

                yield self._prepare_bulk_doc(message_body, **metadata)

        bulk_load = helpers.streaming_bulk(self.es, generator(), 10, yield_ok=False)

        while True:
            for success, info in bulk_load:
                print(success, info)

    def index_document(self, document_body, id=None):
        document_body['@timestamp'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')

        try:
            self.es.index(index=self.index_name_mask + datetime.utcnow().strftime('%Y.%m.%d'),
                          doc_type='default',
                          body=document_body,
                          id=id)
        except ElasticsearchException as e:
            print(document_body, id, e.args)

    def _register_index_template(self):
        template_body = self._get_json_file_content("/config/templates/.json".format(BASE_PATH,
                                                                                         self.index_template_name))

        try:
            if template_body is not None:
                self.es.indices.put_template(name=self.index_template_name,
                                             body=template_body,
                                             master_timeout="60s")

        except ElasticsearchException as e:
            print(e.args)

    def _template_exists(self):
        try:
            self.es.indices.get_template(self.index_template_name)
            return True
        except NotFoundError:
            return False

    @staticmethod
    def _get_json_file_content(file_dir_arg):
        """
        Wrapper on load function. Expects file with JSON inside.

        :param file_dir_arg: Path to file to be read.
        :return: Dictionary (Encoded JSON)
        """
        result = None

        try:
            with open(file_dir_arg, 'r', encoding='UTF-8-SIG') as f:
                result_tmp = f
                result = load(result_tmp)
        except Exception as e:
            print(e.args)

        return result

    def _prepare_bulk_doc(self, source_arg, **kwargs):
        """
        Function providing unified document structure for indexing in elasticsearch.
        The structure needs to be compliant with

        :param index_arg: index to which send data
        :param doc_type_arg: document type in index_arg
        :param source_arg: body of document
        :param kwargs: additional meta parameters (like doc _id)
        :return: Reformatted & enhanced source_arg
        """

        metadata = dict(**kwargs).get('metadata', dict())

        source_arg['@timestamp'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')

        result = 
            '_index': self.index_name_mask + datetime.utcnow().strftime('%Y.%m.%d'),
            '_type': 'default',
            '_op_type': 'index',
            'doc': source_arg,
            'doc_as_upsert': False
        

        result.update(metadata)

        return result

    @staticmethod
    def str_to_bool(str_arg):
        if str_arg.lower() == 'true':
            return True
        elif str_arg.lower() == 'false':
            return False
        else:
            return None


ELASTIC_MANAGERS = environ.get("ElASTIC_MANAGERS", 10)


class PubSubSubscriber:
    def __init__(self, project_id_arg, topic_name_arg, seconds_arg=None):
        self.elasticsearch_index_managers = list()

        for _ in range(ELASTIC_MANAGERS):
            self.elasticsearch_index_managers.append(ElasticDailyIndexManager(topic_name_arg))

        self.project_id = project_id_arg
        self.topic_name = topic_name_arg

        self.client = pubsub_v1.SubscriberClient()

        self.counter = 0

        self.latencies = list()

        self.seconds = seconds_arg

        self.lock = Lock()

    def receive_and_index(self):
        subscription_path = self.client.subscription_path(self.project_id,
            "-subscription-elastic".format(self.topic_name))

        def callback(message):
            latency = message._received_timestamp - message.publish_time.timestamp()

            document = PubSubSubscriber.struct_message(message.data)
            document['message_id'] = message.message_id

            self.elasticsearch_index_managers[self.counter % ELASTIC_MANAGERS].queue.put((document, None))

            message.ack()

            if self.seconds:
                self.latencies.append(latency)

            self.counter += 1

        future = self.client.subscribe(subscription_path, callback=callback)

        try:
            # When timeout is unspecified, the result method waits indefinitely.
            print('Listening for messages on '.format(subscription_path))
            print('Running for'.format(' ' + str(self.seconds) + 'seconds...' if self.seconds else 'ever'))

            future.result(timeout=self.seconds)
        except Exception as e:
            print('Listening for messages on  threw an Exception: .'.format(subscription_path, e))
        finally:
            time_queue_join_start = time()

            for manager in self.elasticsearch_index_managers:
                manager.queue.join()

            time_queue_join_stop = time()

            self.seconds = self.seconds + time_queue_join_stop - time_queue_join_start

            print("Read  messages in :.2f seconds. That is :.2f mps!".format(self.counter, self.seconds,
                                                                                   self.counter / self.seconds))

            if self.latencies:
                avg_latency = float(sum(self.latencies)) / float(len(self.latencies))

                print("Average latency was :.2f ms.".format(avg_latency))

    @staticmethod
    def struct_message(message_arg, encoding='utf-8'):
        if isinstance(message_arg, dict):
            message = message_arg
        elif isinstance(message_arg, bytes):
            message = PubSubSubscriber.message_to_dict(message_arg.decode(encoding))
        elif isinstance(message_arg, str):
            message = PubSubSubscriber.message_to_dict(message_arg)
        else:
            message = None

        group_topics = message.get("group", dict()).get("group_topics", dict())

        if group_topics:
            message['group']['group_topics'] = [d['topic_name'] for d in message['group']['group_topics']]

        # time handling
        event_time = PubSubSubscriber.epoch_to_strtime(message.get("event", dict()).get("time", None))
        if event_time:
            message['event']['time'] = event_time

        mtime = PubSubSubscriber.epoch_to_strtime(message.get("mtime", None))
        if mtime:
            message['mtime'] = mtime

        # geo handling
        group_geo_lat = message.get("group", dict()).get("group_lat", None)
        group_geo_lon = message.get("group", dict()).get("group_lon", None)

        if group_geo_lon and group_geo_lat:
            message['group']['group_geo'] = PubSubSubscriber.create_geo_object(group_geo_lat, group_geo_lon)

        venue_geo_lat = message.get("venue", dict()).get("lat", None)
        venue_geo_lon = message.get("venue", dict()).get("lon", None)

        if venue_geo_lon and venue_geo_lat:
            message['venue']['venue_geo'] = PubSubSubscriber.create_geo_object(venue_geo_lat, venue_geo_lon)

        return message

    @staticmethod
    def epoch_to_strtime(epoch_time):
        try:
            result = strftime('%Y-%m-%dT%H:%M:%S', localtime(epoch_time / 1000))
        except:
            result = epoch_time

        return result

    @staticmethod
    def create_geo_object(lat, lon):
        return ", ".format(str(lat), str(lon))

    @staticmethod
    def message_to_dict(message_arg):
        keep_going = True
        result = message_arg

        while keep_going and (not isinstance(result, dict)):
            try:
                result = loads(result)
            except JSONDecodeError:
                result = None
                keep_going = False

        return result


@click.command()
@click.option('--project-id', '-p', required=True, type=str, help='Google Cloud Platform Project Id')
@click.option('--topic', '-t', required=True, type=str, help='Pub/Sub Topic from which messages will be read')
@click.option('--seconds', '-s', default=None, required=False, type=int, help='For how long to read messages. If not provided - run forever')
def run(project_id, topic, seconds):
    pss = PubSubSubscriber(project_id, topic, seconds)
    pss.receive_and_index()


if __name__ == '__main__':
    run()

【问题讨论】:

您确定您的订阅者足够强大,可以在您正在阅读的 60 年代期间提取和处理所有 10 万条消息。例如,您的指标是否显示单次运行中至少有 100k 次拉取操作?另请记住,PubSub 是“至少一次”系统,如果您不想重复条目,则需要使您的函数具有幂等性。 【参考方案1】:

来自https://cloud.google.com/pubsub/docs/faq#duplicates:

为什么重复消息太多?

Cloud Pub/Sub 保证至少一次消息传递,这意味着偶尔会出现重复消息。但是,高重复率可能表明客户端未确认配置的 ack_deadline_seconds 内的消息,并且 Cloud Pub/Sub 正在重试消息传递。这可以在monitoring metricspubsub.googleapis.com/subscription/pull_ack_message_operation_count 用于拉式订阅和pubsub.googleapis.com/subscription/push_request_count 用于推送订阅中观察到。在/response_code 中查找提升的expiredwebhook_timeout 值。如果有很多小消息,这种情况尤其可能发生,因为 Cloud Pub/Sub 可能会在内部对消息进行批处理,并且部分确认的批处理将被完全重新传递。

另一种可能性是订阅者没有确认某些消息,因为处理这些特定消息的代码路径失败,并且从未进行过 Acknowledge 调用;或者推送端点从不响应或响应错误。

【讨论】:

以上是关于为啥 GCP Pub/Sub 订阅无法确认消息?的主要内容,如果未能解决你的问题,请参考以下文章

确认后 GCP 消息保留在 Pub/Sub 中

如何在 GCP Pub/sub 中修复来自推送订阅的多条消息

为啥我会收到 50% 的 GCP Pub/Sub 消息重复?

如何在云 pub/sub 上获取订阅的消息?

GCP Pub/Sub 消息发送到创建主题之前存在的订阅

GCP Pub/Sub,如果已有活动订阅,您能否在新订阅上重播旧消息