如何在 Flask 框架中实现服务器推送?

Posted

技术标签:

【中文标题】如何在 Flask 框架中实现服务器推送?【英文标题】:How to implement server push in Flask framework? 【发布时间】:2012-08-27 06:32:38 【问题描述】:

我正在尝试在 Flask 微网络框架上构建一个带有服务器推送功能的小站点,但我不知道是否有框架可以直接使用。

我使用了Juggernaut,但在当前版本中它似乎不适用于redis-py,并且最近已弃用剑圣。

有人对我的案子有什么建议吗?

【问题讨论】:

Here is a relevant article from last month by Armin Ronacher, Flask's lead developer. 相关:Streaming data with Python and Flask 【参考方案1】:

看看Server-Sent Events。服务器发送事件是一个 浏览器 API,允许您保持打开服务器的套接字,订阅 更新流。欲了解更多信息,请阅读 Alex MacCaw(作者 Juggernaut) 在why he kills juggernaut 上发帖以及为什么更简单 在许多情况下,服务器发送的事件是比工作更好的工具 网络套接字。

协议非常简单。只需将 mimetype text/event-stream 添加到您的 回复。浏览器将保持连接打开并监听更新。一个事件 从服务器发送的是以data: 开头的一行文本和一个换行符。

data: this is a simple message
<blank line>

如果您想交换结构化数据,只需将数据转储为 json 并通过网络发送 json。

一个优点是您可以在 Flask 中使用 SSE 而无需额外的 服务器。 github上有一个简单的chat application example 使用 redis 作为 pub/sub 后端。

def event_stream():
    pubsub = red.pubsub()
    pubsub.subscribe('chat')
    for message in pubsub.listen():
        print message
        yield 'data: %s\n\n' % message['data']


@app.route('/post', methods=['POST'])
def post():
    message = flask.request.form['message']
    user = flask.session.get('user', 'anonymous')
    now = datetime.datetime.now().replace(microsecond=0).time()
    red.publish('chat', u'[%s] %s: %s' % (now.isoformat(), user, message))


@app.route('/stream')
def stream():
    return flask.Response(event_stream(),
                          mimetype="text/event-stream")

您不需要使用 gunicron 来运行 示例应用程序。只要确保在运行应用程序时使用线程,因为 否则 SSE 连接会阻塞你的开发服务器:

if __name__ == '__main__':
    app.debug = True
    app.run(threaded=True)

在客户端,您只需要一个 javascript 处理函数,当新的 消息从服务器推送。

var source = new EventSource('/stream');
source.onmessage = function (event) 
     alert(event.data);
;

服务器发送的事件是最近的 Firefox、Chrome 和 Safari 浏览器的 supported。 Internet Explorer 尚不支持服务器发送事件,但预计会在 版本 10. 有两个推荐的 Polyfills 来支持旧浏览器

EventSource.js jquery.eventsource

【讨论】:

嗨@PeterSmith,我尝试了这种方法,但是警报(event.data)从未出现。我在端口 8000 中运行我的 Flask 应用程序并在端口 8001 中运行。所以我输入“var source = new EventSource('localhost:8001/push');” Flask 应用程序有一个用户可以发布内容的页面。该帖子由所有其他用户广播和接收。你有什么想法吗? 为什么要在不同的端口上运行推送? SSE 的一个原因是它通过普通的 http 在您的应用程序中运行。你是如何运行你的烧瓶应用程序的?通过开发服务器?你添加了线程=真吗?你用的是什么浏览器? 每个连接的客户端是否有一个线程来处理其事件流?这看起来像一个长轮询。如果是,这将无法扩展。 @little-eyes 的问题对我来说很有意义。 这将在使用 gevent+monkeypatch 时进行扩展 你怎么能确定 Flask 关闭了连接?如果我重新加载页面很多,我会得到一堆陈旧的连接,当我 ctrl-C 烧瓶应用程序时,它仍然服务请求,因为有连接打开:-/【参考方案2】:

Redis 太过分了:使用服务器发送事件 (SSE)

聚会迟到了(像往常一样),但恕我直言,使用 Redis 可能有点矫枉过正。

只要您使用 Python+Flask,请考虑使用 this excellent article by Panisuan Joe Chasinga 中描述的生成器函数。它的要点是:

在您的客户端 index.html

var targetContainer = document.getElementById("target_div");
var eventSource = new EventSource("/stream")
  eventSource.onmessage = function(e) 
  targetContainer.innerHTML = e.data;
;
...
<div id="target_div">Watch this space...</div>

在您的 Flask 服务器中:

def get_message():
    '''this could be any function that blocks until data is ready'''
    time.sleep(1.0)
    s = time.ctime(time.time())
    return s

@app.route('/')
def root():
    return render_template('index.html')

@app.route('/stream')
def stream():
    def eventStream():
        while True:
            # wait for source data to be available, then push it
            yield 'data: \n\n'.format(get_message())
    return Response(eventStream(), mimetype="text/event-stream")

【讨论】:

当我尝试在 get_message 函数中使用睡眠时,流不会到达浏览器。没有睡眠它工作正常,但我不想在 1 秒内收到这么多消息。那么,您知道为什么当函数中有睡眠时它不起作用吗? @DeepakBanka:代码对我有用。您可以在 get_message() 中放置一个 print() 以确保它按预期被调用吗?并使用您的浏览器开发工具来监控服务器和浏览器之间的流量?这可能会提供一些见解... @fearleaa_fool 是否担心 python 的同步限制会阻止其扩展。我知道在 django 中类似的 sse/websockets 集成需要单独的异步服务器。 @JZL003:我只将 Flask 用于客户端数量有限的微服务。我将向在高并发应用程序中使用 Flask 服务器的人让步……【参考方案3】:

作为@peter-hoffmann's answer 的后续行动,我编写了一个专门用于处理服务器发送事件的 Flask 扩展。它叫做Flask-SSE,它是available on PyPI。要安装它,请运行:

$ pip install flask-sse

你可以这样使用它:

from flask import Flask
from flask_sse import sse

app = Flask(__name__)
app.config["REDIS_URL"] = "redis://localhost"
app.register_blueprint(sse, url_prefix='/stream')

@app.route('/send')
def send_message():
    sse.publish("message": "Hello!", type='greeting')
    return "Message sent!"

要从 Javascript 连接到事件流,它的工作原理如下:

var source = new EventSource(" url_for('sse.stream') ");
source.addEventListener('greeting', function(event) 
    var data = JSON.parse(event.data);
    // do what you want with this data
, false);

Documentation is available on ReadTheDocs. 请注意,您需要一个正在运行的Redis 服务器来处理发布/订阅。

【讨论】:

@ManuelGodoy:/stream @ManuelGodoy 我去年放弃了图书馆,因为我想不通。感谢您的提问。 我遇到了 CORS 问题。我希望我的反应服务器访问这个 SSE。 这会阻止 HTTP 请求!有没有办法从后台工作人员运行 sse 连接?【参考方案4】:

作为https://github.com/WolfgangFahl/pyFlaskBootstrap4 的提交者,我遇到了同样的需求,并为服务器发送事件创建了一个烧瓶蓝图,它不依赖于 redis。

此解决方案建立在过去在这里给出的其他答案的基础上。

https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/fb4/sse_bp.py 有源代码(另见下面的 sse_bp.py)。

https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/tests/test_sse.py 有单元测试

这个想法是您可以使用不同的模式来创建您的 SSE 流:

通过提供函数 通过提供生成器 使用 PubSub 帮助程序类 通过使用 PubSub 帮助程序类并同时使用 pydispatch。

截至 2021 年 2 月 12 日,这是我想分享的 alpha 代码。请在此处发表评论或作为项目中的问题发表评论。

http://fb4demo.bitplan.com/events 有一个演示和示例使用说明,例如进度条或时间显示在:http://wiki.bitplan.com/index.php/PyFlaskBootstrap4#Server_Sent_Events

示例客户端 javascript/html 代码

<div id="event_div">Watch this space...</div>
<script>
    function fillContainerFromSSE(id,url) 
        var targetContainer = document.getElementById(id);
        var eventSource = new EventSource(url)
        eventSource.onmessage = function(e) 
            targetContainer.innerHTML = e.data;
        ;
    ;
    fillContainerFromSSE("event_div","/eventfeed");
</script>

示例服务器端代码


def getTimeEvent(self):
        '''
        get the next time stamp
        '''
        time.sleep(1.0)
        s=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        return s   

def eventFeed(self):
        '''
        create a Server Sent Event Feed
        '''
        sse=self.sseBluePrint
        # stream from the given function
        return sse.streamFunc(self.getTimeEvent)

sse_bp.py

'''
Created on 2021-02-06
@author: wf
'''
from flask import Blueprint, Response, request, abort,stream_with_context
from queue import Queue
from pydispatch import dispatcher
import logging

class SSE_BluePrint(object):
    '''
    a blueprint for server side events 
    '''
    def __init__(self,app,name:str,template_folder:str=None,debug=False,withContext=False):
        '''
        Constructor
        '''
        self.name=name
        self.debug=debug
        self.withContext=False
        if template_folder is not None:
            self.template_folder=template_folder
        else:
            self.template_folder='templates'    
        self.blueprint=Blueprint(name,__name__,template_folder=self.template_folder)
        self.app=app
        app.register_blueprint(self.blueprint)
        
        @self.app.route('/sse/<channel>')
        def subscribe(channel):
            def events():
                PubSub.subscribe(channel)
            self.stream(events)
                
    def streamSSE(self,ssegenerator): 
        '''
        stream the Server Sent Events for the given SSE generator
        '''  
        response=None
        if self.withContext:
            if request.headers.get('accept') == 'text/event-stream':
                response=Response(stream_with_context(ssegenerator), content_type='text/event-stream')
            else:
                response=abort(404)    
        else:
            response= Response(ssegenerator, content_type='text/event-stream')
        return response
        
    def streamGen(self,gen):
        '''
        stream the results of the given generator
        '''
        ssegen=self.generateSSE(gen)
        return self.streamSSE(ssegen)   
            
    def streamFunc(self,func,limit=-1):
        '''
        stream a generator based on the given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            an SSE Response stream
        '''
        gen=self.generate(func,limit)
        return self.streamGen(gen)
                
    def generate(self,func,limit=-1):
        '''
        create a SSE generator from a given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            a generator for the function
        '''   
        count=0
        while limit==-1 or count<limit:
            # wait for source data to be available, then push it
            count+=1
            result=func()
            yield result
        
    def generateSSE(self,gen):
        for result in gen:
            yield 'data: \n\n'.format(result)
            
    def enableDebug(self,debug:bool):
        '''
        set my debugging
        
        Args:
            debug(bool): True if debugging should be switched on
        '''
        self.debug=debug
        if self.debug:
            logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03d %(levelname)s:\t%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
            
    def publish(self, message:str, channel:str='sse', debug=False):
        """
        Publish data as a server-sent event.
        
        Args:
            message(str): the message to send
            channel(str): If you want to direct different events to different
                clients, you may specify a channel for this event to go to.
                Only clients listening to the same channel will receive this event.
                Defaults to "sse".
            debug(bool): if True  enable debugging
        """
        return PubSub.publish(channel=channel, message=message,debug=debug)

    def subscribe(self,channel,limit=-1,debug=False):
        def stream():
            for message in PubSub.subscribe(channel,limit,debug=debug):
                yield str(message)
                
        return self.streamGen(stream)
    
class PubSub:
    '''
    redis pubsub duck replacement
    '''
    pubSubByChannel=
    
    def __init__(self,channel:str='sse',maxsize:int=15, debug=False,dispatch=False):
        '''
        Args:
            channel(string): the channel name
            maxsize(int): the maximum size of the queue
            debug(bool): whether debugging should be switched on
            dispatch(bool): if true use the pydispatch library - otherwise only a queue
        '''
        self.channel=channel
        self.queue=Queue(maxsize=maxsize)
        self.debug=debug
        self.receiveCount=0
        self.dispatch=False
        if dispatch:
            dispatcher.connect(self.receive,signal=channel,sender=dispatcher.Any)
        
    @staticmethod
    def reinit():
        '''
        reinitialize the pubSubByChannel dict
        '''
        PubSub.pubSubByChannel=
        
    @staticmethod
    def forChannel(channel):    
        '''
        return a PubSub for the given channel
        
        Args:
            channel(str): the id of the channel
        Returns:
            PubSub: the PubSub for the given channel
        '''
        if channel in PubSub.pubSubByChannel:
            pubsub=PubSub.pubSubByChannel[channel]
        else:
            pubsub=PubSub(channel)
            PubSub.pubSubByChannel[channel]=pubsub
        return pubsub
    
    @staticmethod    
    def publish(channel:str,message:str,debug=False):
        '''
        publish a message via the given channel
        
        Args:
            channel(str): the id of the channel to use
            message(str): the message to publish/send
        Returns:
            PubSub: the pub sub for the channel
            
        '''
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        pubsub.send(message)
        return pubsub
        
    @staticmethod    
    def subscribe(channel,limit=-1,debug=False): 
        '''
        subscribe to the given channel
        
        Args:
            channel(str): the id of the channel to use
            limit(int): limit the maximum amount of messages to be received        
            debug(bool): if True debugging info is printed
        '''  
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        return pubsub.listen(limit)
    
    def send(self,message):
        '''
        send the given message
        '''
        sender=object();
        if self.dispatch:
            dispatcher.send(signal=self.channel,sender=sender,msg=message)
        else:
            self.receive(sender,message)
        
    def receive(self,sender,message):
        '''
        receive a message
        '''
        if sender is not None:
            self.receiveCount+=1;
            if self.debug:
                logging.debug("received %d:%s" % (self.receiveCount,message))
            self.queue.put(message)
        
    def listen(self,limit=-1):
        '''
        listen to my channel
        
        this is a generator for the queue content of received messages
        
        Args:
            limit(int): limit the maximum amount of messages to be received
        
        Return:
            generator: received messages to be yielded
        '''
        if limit>0 and self.receiveCount>limit:
            return
        yield self.queue.get()
    
    def unsubscribe(self):
        '''
        unsubscribe me
        '''
        if self.dispatch:
            dispatcher.disconnect(self.receive, signal=self.channel)
        pass

【讨论】:

您可以从后台工作人员运行此程序,与您的 gunicorn 网络应用程序分开吗? sse 与您的服务器实现方式无关。

以上是关于如何在 Flask 框架中实现服务器推送?的主要内容,如果未能解决你的问题,请参考以下文章

如何简单地在混合应用中实现推送通知?

如何在iphone中实现Apple推送通知服务[重复]

如何在 Flask 中实现令牌认证?

要从 MobileFirst 服务器推送到移动设备的加密推送通知消息

如何使用php json数组在带有angularjs的cordova中实现推送通知

如何在 PHP 中构建一个 android 推送通知服务器