如何在不使用请求的情况下直接从代码调用烧瓶端点

Posted

技术标签:

【中文标题】如何在不使用请求的情况下直接从代码调用烧瓶端点【英文标题】:How can i invoke a flask endpoint directly from code without using requests 【发布时间】:2021-11-07 15:20:41 【问题描述】:

大家好,几天前我的任务是创建一个 API 风格的应用程序,它通过 TCP 套接字侦听一些命令,然后返回一些响应,主要是成功/失败(我知道这很愚蠢,但它是客户端请求),因为我有一些验证/数据库的东西我直接想到了烧瓶,但我仍然坚持如何直接调用代码中的特定端点。这是一个关于我想象的事情的小sn-p

from flask import Flask                                                         
import threading

data = 'foo'
app = Flask(__name__)

@app.route("/SomeCommand")
def SomeCommand():
    return  'Some' : 'Response'

def flaskThread():
    app.run()


def TcpListenner():
    # logic that listens over tcp socket then invoks the flask app
    # I was thinking about calling app.something() from here
    pass

if __name__ == "__main__":
    flaskApp = threading.Thread(target=flaskThread)
    flaskApp.start()
    listenner = threading.Thread(target=TcpListenner)
    listenner.start()

任何帮助/想法将不胜感激,谢谢

【问题讨论】:

你想实现它就像套接字会监听不同的命令,然后在每个命令上,会有不同的响应吗? 而且您不想向烧瓶应用程序发出任何请求。相反,flask 应用程序会在收到命令时自行发送响应? 我基本上希望 tcp 监听器充当烧瓶应用程序和客户端之间的中间件 所以顺序是客户端 -> tcp 监听器 -> flaskapp -> tcp 监听器 -> 客户端 【参考方案1】:

您可以使用 flask_socketio,flask 应用程序和套接字(即 tcp 侦听器)一起启动... 根据我的理解,你可以这样做:

客户端将首先连接到烧瓶套接字。

然后,为了向烧瓶应用程序发送命令,客户端将向烧瓶套接字发送一条消息,其中包含该命令。

烧瓶套接字将监听消息。因此,当它接收到特定命令的消息时,它会根据该命令向套接字发出响应,然后客户端将接收该响应。

以下是烧瓶套接字应用程序的示例代码:

import eventlet
eventlet.monkey_patch()

from flask import Flask
from flask_socketio import SocketIO, send, emit

app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True)

# other flask APIs can come here and can be called by the client...

def someCommandOneResponse(): # function that sends response to client when flask socket gets command 1
    commandOneResponse = 'Success'
    socketio.emit('message', commandOneResponse)

def someCommandTwoResponse(): # function that sends response to client when flask socket gets command 2
    commandTwoResponse = 'Failure'
    socketio.emit('message', commandTwoResponse)

@socketio.on('message') # when any command is received on the socket from the client
def handleMessage(cmd):
    print('\nCommand Received: ' + cmd + '\n')

    if( cmd == 'SomeCommand1' ): # if the client has sent a message for command 1
        print('Got Some Command 1')
        someCommandOneResponse()
        
    elif( cmd == 'SomeCommand2' ): # if the client has sent a message for command 2
        print('Got Some Command 2')
        someCommandTwoResponse
    
    send(cmd, broadcast=True)

if __name__ == '__main__':
    socketio.run(app, port=3000) # starts the flask socket (tcp listener) as well as the flask app

【讨论】:

嘿 Zoha,感谢您的回复,但我并没有尝试与 WebSocket 交互,而是尝试使用纯低级 TCP 套接字。 flask_socketio 库实现了socketIO 协议,不是m case 好的,我明白了

以上是关于如何在不使用请求的情况下直接从代码调用烧瓶端点的主要内容,如果未能解决你的问题,请参考以下文章

如何在不使用 ImageView 的情况下从 API 端点下载图像或位图?

Java WebSocket:如何在不编写客户端的情况下测试服务器 WebSocket 端点 [关闭]

如何在不改变状态的情况下对 redux 操作做出反应

如何在不重新启动我的 Express.js 项目的情况下刷新 API 端点

如何在不使用服务器端缓存的情况下调用 cURL?

如何在不使用 sleep() 的情况下暂停 DispatchQueue 内的程序执行