Google PubSub 订阅无法从 StatusCode.UNAVAILABLE [code=8a75] 错误中恢复
Posted
技术标签:
【中文标题】Google PubSub 订阅无法从 StatusCode.UNAVAILABLE [code=8a75] 错误中恢复【英文标题】:Google PubSub Subscription cannot recover from StatusCode.UNAVAILABLE [code=8a75] error 【发布时间】:2018-08-30 13:07:12 【问题描述】:例外情况:
线程 Thread-ConsumeBidirectionalStream 中的异常:grpc._channel._Rendezvous:<_rendezvous rpc>
上述异常是以下异常的直接原因:
google.api_core.exceptions.ServiceUnavailable: 503 该服务无法满足您的请求。请再试一次。 [代码=8a75]
我正在尝试构建一个大致遵循 Google 端到端示例 (docs | code) 的 IoT 原型,当队列中没有消息时,我在订阅者中遇到错误。当订阅者在大约一分钟后首次针对空队列启动时,以及在处理任意数量的消息后以及队列清空后一分钟左右时,都会发生这种情况。
我在*** 上找到了解决方法,但无法正常工作。 所以我的问题是如何让这个变通策略发挥作用,因为它似乎所做的只是隐藏错误 - 我的订阅者仍然挂起并且不处理任何进一步的消息。
相关的代码如下所示:
from google.cloud import pubsub
import google.cloud.pubsub_v1.subscriber.message as Message
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc
class WorkaroundPolicy(thread.Policy):
def on_exception(self, exception):
# If we are seeing UNAVALABLE then we need to retry (so return None)
unavailable = grpc.StatusCode.UNAVAILABLE
if isinstance(exception, ServiceUnavailable):
logger.warning('Ignoring google.api_core.exceptions.ServiceUnavailable exception: '.format(exception))
return
elif getattr(exception, 'code', lambda: None)() in [unavailable]:
logger.warning('Ignoring grpc.StatusCode.UNAVAILABLE (Orginal exception: )'.format(exception))
return
# For anything else fall back on the parent class behaviour
super(WorkaroundPolicy, self).on_exception(exception)
# Other imports and code ommitted for brevity
def callback(msg: Message):
try:
data = json.loads(msg.data)
except ValueError as e:
logger.error('Loading Payload () threw an Exception: .'.format(msg.data, e))
# For the prototype, if we can't read it, then discard it
msg.ack()
return
device_project_id = msg.attributes['projectId']
device_registry_id = msg.attributes['deviceRegistryId']
device_id = msg.attributes['deviceId']
device_region = msg.attributes['deviceRegistryLocation']
self._update_device_config(
device_project_id,
device_region,
device_registry_id,
device_id,
data)
msg.ack()
def run(self, project_name, subscription_name):
# Specify a workaround policy to handle StatusCode.UNAVAILABLE [code=8a75] error (but may get CPU issues)
#subscriber = pubsub.SubscriberClient(policy_class = WorkaroundPolicy)
# Alternatively, instantiate subscriber without the workaround to see full exception stack
subscriber = pubsub.SubscriberClient()
subscription = subscriber.subscribe(subscription_path, callback)
subscription.future.result()
while True:
time.sleep(60)
如果有帮助,可以在 GitHub 找到完整版。
堆栈跟踪/命令行输出(无策略解决方法)
(venv) Freds-MBP:iot-subscriber-issue Fred$ python Controller.py \
--project_id=xyz-tests \
--pubsub_subscription=simple-mqtt-controller \
--service_account_json=/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json
2018-03-21 09:36:20,975 INFO Controller Creating credentials from JSON Key File: "/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json"...
2018-03-21 09:36:20,991 INFO Controller Creating service from discovery URL: "https://cloudiot.googleapis.com/$discovery/rest?version=v1"...
2018-03-21 09:36:20,992 INFO googleapiclient.discovery URL being requested: GET https://cloudiot.googleapis.com/$discovery/rest?version=v1
2018-03-21 09:36:21,508 INFO Controller Creating subscriber for project: "xyz-tests" and subscription: "simple-mqtt-controller"...
2018-03-21 09:36:23,200 INFO Controller Listening for messages on projects/xyz-tests/subscriptions/simple-mqtt-controller...
# This then occurs typically 60 seconds or so (sometimes up to 2 mins) later:
Exception in thread Thread-ConsumeBidirectionalStream:
Traceback (most recent call last):
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 76, in next
return six.next(self._wrapped)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 347, in __next__
return self._next()
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 341, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 349, in _blocking_consume
for response in responses:
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 476, in _pausable_iterator
yield next(iterator)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 78, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.ServiceUnavailable: 503 The service was unable to fulfill your request. Please try again. [code=8a75]
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 363, in _blocking_consume
request_generator, response_generator)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 275, in _stop_request_generator
if not response_generator.done():
AttributeError: '_StreamingResponseIterator' object has no attribute 'done'
^C
Traceback (most recent call last):
File "Controller.py", line 279, in <module>
if __name__ == '__main__':
File "Controller.py", line 274, in main
try:
File "Controller.py", line 196, in run
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/futures.py", line 111, in result
err = self.exception(timeout=timeout)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/futures.py", line 133, in exception
if not self._completed.wait(timeout=timeout):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
(venv) Freds-MBP:iot-subscriber-issue Fred$
这似乎是一个持续存在的问题,在 GitHub 中查看以下问题(现在所有这些问题都已关闭):
Pub/Sub Subscriber does not catch & retry UNAVAILABLE errors #4234
Pub/Sub has no way to track errors from the subscriber thread. #3888
PubSub: set_exception can only be called once.
can still occur "erroneously" #4463
我还在 *** 上找到了以下项目:
Google PubSub python client returning StatusCode.UNAVAILABLE 于 2017 年 10 月发布,答案是我在上面的代码中尝试过的策略类解决方法。虽然,至少在我的代码中,提出的答案只隐藏了错误,但不允许处理新消息。
Google Pubsub Python Client library subscriber crashes randomly 似乎是相同的原因,但用例不同。答案(由提问者提供)表明升级到最新的 google-cloud 可以解决问题,但我已经在使用最新版本的 google-api-core (1.1.0) 和 google-cloud-pubsub (0.32.1)等等
Google Pub/Sub Subscriber not receiving messages after a while 可能是相关的,但没有确凿的答案。
其他信息: 操作系统:Mac OS X El Capitan 10.11.6 在 virtualenv 15.1.0 中运行的 Python 3.6.2
(部分)点冻结输出:
google-api-core==1.1.0
google-api-python-client==1.6.5
google-auth==1.4.1
google-auth-httplib2==0.0.3
google-cloud-pubsub==0.32.1
googleapis-common-protos==1.5.3
grpc-google-iam-v1==0.11.4
grpcio==1.10.0
httplib2==0.10.3
paho-mqtt==1.3.1
【问题讨论】:
嗨@datacentricity 感谢您抽出宝贵时间撰写如此全面的问题。我真的希望你能解决你的问题,因为你的问题值得一个好的答案。我写了您在线程中引用的问题和“答案”之一。与我的线程所说的相反,我仍然遇到同样的问题 - 即使队列中没有消息,pubsub 也会在大约一分钟后崩溃! 【参考方案1】:我看到了与您类似的行为。除了不管变通策略如何,进程总是挂起,没有抛出任何异常。
更奇怪的是,它是在同事的机器上运行的,而不是我的。我在另外两台机器和 docker 容器上试过,都挂了。
接下来,我将尝试将所有内容简化为一个可以带到 Google 的最小容器化示例,并希望从他们那里获得一些反馈。
如果您在此期间学到任何东西,请告诉我。
【讨论】:
会做 - 似乎很奇怪,它是如此断断续续 嘿@datacentricity,我想通了我的问题。我正在使用烧瓶服务器运行 pubsub 客户端,并且烧瓶服务器重新加载功能会导致进程挂起。我的解决方法是仅在进程运行和not on every reload 时初始化订阅和异步线程。 由于这是一个非常早期的原型,我只是在我的 mac 上的 virtualenv 中运行它 - 容器化是列表中的下一个,所以让我们看看它是如何进行的以上是关于Google PubSub 订阅无法从 StatusCode.UNAVAILABLE [code=8a75] 错误中恢复的主要内容,如果未能解决你的问题,请参考以下文章
如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息
如何在 Google Cloud App Engine 上使用 PubSub 创建订阅者,该订阅者通过 Publisher 从 Google Cloud App Engine Flex 收听消息?
从 Google PubSub 中提取消息不起作用 - 权限被拒绝