如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输
Posted
技术标签:
【中文标题】如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输【英文标题】:How to yield in subscription call back of Google Pubsub on Flask Server for streaming 【发布时间】:2022-01-24 01:01:36 【问题描述】:我想通过 Flask 端点获得流,但事件是在订阅者的回调中发送到主题(Google Cloud PubSub)的。请看下面的代码:
def a_stream():
topic_name = 'projects/project_id/topics/topic'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic="topic",
)
subscription_name = 'projects/project_id/subscriptions/sub'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub="topic",
)
def callback(message):
print("print")
yield "yield"
message.ack()
with pubsub_v1.SubscriberClient() as subscriber:
try:
subscriber.create_subscription(
name=subscription_name, topic=topic_name)
except:
#do nothing as topic exists
pass
future = subscriber.subscribe(subscription_name, callback=callback)
future.result()
@orders_bp.route("/aStream", methods=['GET'])
def get_order_status():
return Response(stream_with_context(a_stream()),
mimetype="text/event-stream")
如果我按原样运行代码,则不会发生任何事情。如果我删除回调中的产量,那么我可以在控制台中看到“打印”。似乎问题与屈服有关,有没有简单的方法来解决这个问题?
【问题讨论】:
【参考方案1】:Python 的 yield 语句暂停执行并向调用者返回一个值。在您的情况下,您期望字符串“yield”,但实际上返回了一个生成器。函数中不会创建任何状态,例如 for 或 while 循环(如在生成器中)或多个 yield,因此当函数恢复时,会再次返回包含字符串“yield”的生成器。 message.ack() 行代码永远不会执行。
让我们检查您的代码:
def callback(message):
print("print")
yield "yield"
message.ack()
并翻译成例子:
class message:
def ack():
print('ACK')
def callback(message):
print("print")
yield "yield"
message.ack()
val = callback(message)
print(type(val))
for i in val:
print(i)
如果我们调用 callback 并检查其返回类型:
val = callback(message)
print(type(val))
类型(val)是
要从生成器获取返回值,您必须迭代结果:
for i in val:
print(i)
您将回调用作普通方法并期望返回一个字符串。
val = callback(message)
调用代码不将该方法作为生成器调用。因此,在 callback 中不会保存或恢复任何状态,并且该方法会在每次调用时重新开始。
对于您的用例,将您的代码更改为不使用生成器:
def callback(message):
message.ack()
return "yield"
否则,您必须修改 subscriber.subscribe() 以接受生成器而不是函数回调。
【讨论】:
以上是关于如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输的主要内容,如果未能解决你的问题,请参考以下文章
我们如何使用 Flask 对 SQL Server 搜索结果进行分页
如何在Dart / Flutter中使用Flask Server API将图像作为请求发布?
如何在不上传 favicon 文件的情况下将 favicon 添加到 Google colab 上的 Flask 路由?