如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输

Posted

技术标签:

【中文标题】如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输【英文标题】:How to yield in subscription call back of Google Pubsub on Flask Server for streaming 【发布时间】:2022-01-24 01:01:36 【问题描述】:

我想通过 Flask 端点获得流,但事件是在订阅者的回调中发送到主题(Google Cloud PubSub)的。请看下面的代码:

def a_stream():
    topic_name = 'projects/project_id/topics/topic'.format(
        project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
        topic="topic",  
    )
    subscription_name = 'projects/project_id/subscriptions/sub'.format(
        project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
        sub="topic", 
    )
    def callback(message):
        print("print")
        yield "yield"
        message.ack()

    with pubsub_v1.SubscriberClient() as subscriber:
        try:
            subscriber.create_subscription(
            name=subscription_name, topic=topic_name)
        except:
            #do nothing as topic exists
            pass
        future = subscriber.subscribe(subscription_name, callback=callback)             
        future.result()

@orders_bp.route("/aStream", methods=['GET'])
def get_order_status():
    return Response(stream_with_context(a_stream()),
                          mimetype="text/event-stream")

如果我按原样运行代码,则不会发生任何事情。如果我删除回调中的产量,那么我可以在控制台中看到“打印”。似乎问题与屈服有关,有没有简单的方法来解决这个问题?

【问题讨论】:

【参考方案1】:

Python 的 yield 语句暂停执行并向调用者返回一个值。在您的情况下,您期望字符串“yield”,但实际上返回了一个生成器。函数中不会创建任何状态,例如 for 或 while 循环(如在生成器中)或多个 yield,因此当函数恢复时,会再次返回包含字符串“yield”的生成器。 message.ack() 行代码永远不会执行。

让我们检查您的代码:

def callback(message):
    print("print")
    yield "yield"
    message.ack()

并翻译成例子:

class message:
        def ack():
                print('ACK')

def callback(message):
    print("print")
    yield "yield"
    message.ack()

val = callback(message)
print(type(val))

for i in val:
        print(i)

如果我们调用 callback 并检查其返回类型:

val = callback(message)
print(type(val))

类型(val)是

要从生成器获取返回值,您必须迭代结果:

for i in val:
        print(i)

您将回调用作普通方法并期望返回一个字符串。

val = callback(message)

调用代码将该方法作为生成器调用。因此,在 callback 中不会保存或恢复任何状态,并且该方法会在每次调用时重新开始。

对于您的用例,将您的代码更改为不使用生成器:

def callback(message):
    message.ack()
    return "yield"

否则,您必须修改 subscriber.subscribe() 以接受生成器而不是函数回调。

【讨论】:

以上是关于如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Flask CORS 将域列入白名单

如何使用Http Server服务Flask [重复]

如何将数据发送到 Flask Server? [复制]

我们如何使用 Flask 对 SQL Server 搜索结果进行分页

如何在Dart / Flutter中使用Flask Server API将图像作为请求发布?

如何在不上传 favicon 文件的情况下将 favicon 添加到 Google colab 上的 Flask 路由?