为啥 GCP Pub/Sub 订阅无法确认消息?
Posted
技术标签:
【中文标题】为啥 GCP Pub/Sub 订阅无法确认消息?【英文标题】:Why GCP Pub/Sub Subscription Fails to Acknowledge Messages?为什么 GCP Pub/Sub 订阅无法确认消息? 【发布时间】:2019-04-10 00:56:15 【问题描述】:我对 GCP Pub/Sub 订阅者消息确认有疑问。我编写了三个类:PubSubPublisher(发布到主题)、PubSubSubscriber(从订阅和索引接收到 elasticsearch)和 ElasticDailyIndexManager(Thread)。这就是我对他们所做的:
我使用 PubSubPublisher 向我的主题发布了 10 万条虚拟消息(每秒大约 1 万条消息)。
我第一次运行 PubSubSubscriber.receive_and_index()。在处理消息的同时,它还使用 10 个 ElasticDailyIndexManager 线程将其索引到 Elasticsearch。基本上我连接到订阅并阅读(和索引)60 秒然后退出。 100k 被索引到 Elasticsearch(每秒大约 1.5k 条消息)。
我第二次运行 PubSubSubscriber.receive_and_index() - 我希望没有任何内容被索引,但是,大约 40k 新文档被索引(Id 在 elasticsearch 中是随机的)。
有时它需要运行大约 3-4 次才能清空订阅,所以我想确认消息存在问题,但在运行时没有错误。我将数据与 message.message_id 一起索引,很明显我有多行具有相同的 message_id。
我看不出订阅者无法确认的原因,因为它不会引发任何错误。我在订阅上设置了 30 秒的确认超时,但它没有帮助。
下面提到的类的代码:
class ProcessFutures(Thread):
def __init__(self, futures_queue):
Thread.__init__(self)
self.queue = futures_queue
self.counter = 0
self.results = list()
self.daemon = True
self.start()
def run(self):
while getattr(self, 'keep_going', True):
future = self.queue.get()
self.results.append(future.result())
self.queue.task_done()
class PubSubPublisher:
def __init__(self, project_id, topic_name):
self.client = pubsub_v1.PublisherClient()
self.project_id = project_id
self.topic_name = topic_name
self.keep_going = True
self.futures_queue = Queue()
self.future_process = ProcessFutures(self.futures_queue)
def publish_message(self, message_body):
"""
Publishes message to a Pub/Sub topic.
future.result is verified in separate thread to avoid blocking of message publishing.
"""
topic_path = self.client.topic_path(self.project_id, self.topic_name)
if isinstance(message_body, dict):
data = dumps(message_body)
elif isinstance(message_body, str):
data = message_body
else:
raise BaseException
data = data.encode('utf-8')
future = self.client.publish(topic_path, data=data)
self.futures_queue.put(future)
def finish(self):
self.future_process.queue.join()
print("Processed results: " + str(len(self.future_process.results)))
@click.command()
@click.option('--project-id', '-p', required=True, type=str, help='Google Cloud Platform Project Id')
@click.option('--topic', '-t', required=True, type=str, help='Pub/Sub Topic to which messages will be published')
@click.option('--message', '-m', required=True, type=str, help='Message body')
@click.option('--amount', '-a', required=True, type=int, help='How many messages to send')
def run(project_id, topic, message, amount):
from time import time
psp = PubSubPublisher(project_id, topic)
time_start = time()
for i in range(amount):
message_body = dict(i=i, message=message)
psp.publish_message(message_body)
psp.finish()
time_stop = time()
seconds = time_stop - time_start
print("Published messages in :.2f seconds. That is :.2f mps!".format(amount, seconds,
amount / seconds))
from elasticsearch import Elasticsearch, ElasticsearchException, NotFoundError, helpers
from datetime import datetime
from json import load
from threading import Thread
from queue import Queue
from os import getenv
from config import BASE_PATH
class ElasticDailyIndexManager(Thread):
def __init__(self, index_basename):
Thread.__init__(self)
es_port = 9200 if ElasticDailyIndexManager.str_to_bool(getenv("RUNNING_IN_CONTAINER", "False")) else 9201
es_url = "elastic" if ElasticDailyIndexManager.str_to_bool(getenv("RUNNING_IN_CONTAINER", "False")) else "localhost"
self.es = Elasticsearch(hosts=[es_url], port=es_port)
self.index_template_name = index_basename
self.index_name_mask = index_basename if index_basename.endswith("-") else index_basename + "-"
while not self._template_exists():
self._register_index_template()
self.queue = Queue()
self.daemon = True
self.start()
def run(self):
def generator():
while True:
message_body, message_id = self.queue.get()
metadata = dict()
self.queue.task_done()
yield self._prepare_bulk_doc(message_body, **metadata)
bulk_load = helpers.streaming_bulk(self.es, generator(), 10, yield_ok=False)
while True:
for success, info in bulk_load:
print(success, info)
def index_document(self, document_body, id=None):
document_body['@timestamp'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')
try:
self.es.index(index=self.index_name_mask + datetime.utcnow().strftime('%Y.%m.%d'),
doc_type='default',
body=document_body,
id=id)
except ElasticsearchException as e:
print(document_body, id, e.args)
def _register_index_template(self):
template_body = self._get_json_file_content("/config/templates/.json".format(BASE_PATH,
self.index_template_name))
try:
if template_body is not None:
self.es.indices.put_template(name=self.index_template_name,
body=template_body,
master_timeout="60s")
except ElasticsearchException as e:
print(e.args)
def _template_exists(self):
try:
self.es.indices.get_template(self.index_template_name)
return True
except NotFoundError:
return False
@staticmethod
def _get_json_file_content(file_dir_arg):
"""
Wrapper on load function. Expects file with JSON inside.
:param file_dir_arg: Path to file to be read.
:return: Dictionary (Encoded JSON)
"""
result = None
try:
with open(file_dir_arg, 'r', encoding='UTF-8-SIG') as f:
result_tmp = f
result = load(result_tmp)
except Exception as e:
print(e.args)
return result
def _prepare_bulk_doc(self, source_arg, **kwargs):
"""
Function providing unified document structure for indexing in elasticsearch.
The structure needs to be compliant with
:param index_arg: index to which send data
:param doc_type_arg: document type in index_arg
:param source_arg: body of document
:param kwargs: additional meta parameters (like doc _id)
:return: Reformatted & enhanced source_arg
"""
metadata = dict(**kwargs).get('metadata', dict())
source_arg['@timestamp'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')
result =
'_index': self.index_name_mask + datetime.utcnow().strftime('%Y.%m.%d'),
'_type': 'default',
'_op_type': 'index',
'doc': source_arg,
'doc_as_upsert': False
result.update(metadata)
return result
@staticmethod
def str_to_bool(str_arg):
if str_arg.lower() == 'true':
return True
elif str_arg.lower() == 'false':
return False
else:
return None
ELASTIC_MANAGERS = environ.get("ElASTIC_MANAGERS", 10)
class PubSubSubscriber:
def __init__(self, project_id_arg, topic_name_arg, seconds_arg=None):
self.elasticsearch_index_managers = list()
for _ in range(ELASTIC_MANAGERS):
self.elasticsearch_index_managers.append(ElasticDailyIndexManager(topic_name_arg))
self.project_id = project_id_arg
self.topic_name = topic_name_arg
self.client = pubsub_v1.SubscriberClient()
self.counter = 0
self.latencies = list()
self.seconds = seconds_arg
self.lock = Lock()
def receive_and_index(self):
subscription_path = self.client.subscription_path(self.project_id,
"-subscription-elastic".format(self.topic_name))
def callback(message):
latency = message._received_timestamp - message.publish_time.timestamp()
document = PubSubSubscriber.struct_message(message.data)
document['message_id'] = message.message_id
self.elasticsearch_index_managers[self.counter % ELASTIC_MANAGERS].queue.put((document, None))
message.ack()
if self.seconds:
self.latencies.append(latency)
self.counter += 1
future = self.client.subscribe(subscription_path, callback=callback)
try:
# When timeout is unspecified, the result method waits indefinitely.
print('Listening for messages on '.format(subscription_path))
print('Running for'.format(' ' + str(self.seconds) + 'seconds...' if self.seconds else 'ever'))
future.result(timeout=self.seconds)
except Exception as e:
print('Listening for messages on threw an Exception: .'.format(subscription_path, e))
finally:
time_queue_join_start = time()
for manager in self.elasticsearch_index_managers:
manager.queue.join()
time_queue_join_stop = time()
self.seconds = self.seconds + time_queue_join_stop - time_queue_join_start
print("Read messages in :.2f seconds. That is :.2f mps!".format(self.counter, self.seconds,
self.counter / self.seconds))
if self.latencies:
avg_latency = float(sum(self.latencies)) / float(len(self.latencies))
print("Average latency was :.2f ms.".format(avg_latency))
@staticmethod
def struct_message(message_arg, encoding='utf-8'):
if isinstance(message_arg, dict):
message = message_arg
elif isinstance(message_arg, bytes):
message = PubSubSubscriber.message_to_dict(message_arg.decode(encoding))
elif isinstance(message_arg, str):
message = PubSubSubscriber.message_to_dict(message_arg)
else:
message = None
group_topics = message.get("group", dict()).get("group_topics", dict())
if group_topics:
message['group']['group_topics'] = [d['topic_name'] for d in message['group']['group_topics']]
# time handling
event_time = PubSubSubscriber.epoch_to_strtime(message.get("event", dict()).get("time", None))
if event_time:
message['event']['time'] = event_time
mtime = PubSubSubscriber.epoch_to_strtime(message.get("mtime", None))
if mtime:
message['mtime'] = mtime
# geo handling
group_geo_lat = message.get("group", dict()).get("group_lat", None)
group_geo_lon = message.get("group", dict()).get("group_lon", None)
if group_geo_lon and group_geo_lat:
message['group']['group_geo'] = PubSubSubscriber.create_geo_object(group_geo_lat, group_geo_lon)
venue_geo_lat = message.get("venue", dict()).get("lat", None)
venue_geo_lon = message.get("venue", dict()).get("lon", None)
if venue_geo_lon and venue_geo_lat:
message['venue']['venue_geo'] = PubSubSubscriber.create_geo_object(venue_geo_lat, venue_geo_lon)
return message
@staticmethod
def epoch_to_strtime(epoch_time):
try:
result = strftime('%Y-%m-%dT%H:%M:%S', localtime(epoch_time / 1000))
except:
result = epoch_time
return result
@staticmethod
def create_geo_object(lat, lon):
return ", ".format(str(lat), str(lon))
@staticmethod
def message_to_dict(message_arg):
keep_going = True
result = message_arg
while keep_going and (not isinstance(result, dict)):
try:
result = loads(result)
except JSONDecodeError:
result = None
keep_going = False
return result
@click.command()
@click.option('--project-id', '-p', required=True, type=str, help='Google Cloud Platform Project Id')
@click.option('--topic', '-t', required=True, type=str, help='Pub/Sub Topic from which messages will be read')
@click.option('--seconds', '-s', default=None, required=False, type=int, help='For how long to read messages. If not provided - run forever')
def run(project_id, topic, seconds):
pss = PubSubSubscriber(project_id, topic, seconds)
pss.receive_and_index()
if __name__ == '__main__':
run()
【问题讨论】:
您确定您的订阅者足够强大,可以在您正在阅读的 60 年代期间提取和处理所有 10 万条消息。例如,您的指标是否显示单次运行中至少有 100k 次拉取操作?另请记住,PubSub 是“至少一次”系统,如果您不想重复条目,则需要使您的函数具有幂等性。 【参考方案1】:来自https://cloud.google.com/pubsub/docs/faq#duplicates:
为什么重复消息太多?
Cloud Pub/Sub 保证至少一次消息传递,这意味着偶尔会出现重复消息。但是,高重复率可能表明客户端未确认配置的
ack_deadline_seconds
内的消息,并且 Cloud Pub/Sub 正在重试消息传递。这可以在monitoring metricspubsub.googleapis.com/subscription/pull_ack_message_operation_count
用于拉式订阅和pubsub.googleapis.com/subscription/push_request_count
用于推送订阅中观察到。在/response_code
中查找提升的expired
或webhook_timeout
值。如果有很多小消息,这种情况尤其可能发生,因为 Cloud Pub/Sub 可能会在内部对消息进行批处理,并且部分确认的批处理将被完全重新传递。另一种可能性是订阅者没有确认某些消息,因为处理这些特定消息的代码路径失败,并且从未进行过 Acknowledge 调用;或者推送端点从不响应或响应错误。
【讨论】:
以上是关于为啥 GCP Pub/Sub 订阅无法确认消息?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 GCP Pub/sub 中修复来自推送订阅的多条消息