Flask Socket-IO 服务器客户端不通信(使用 tweepy 和 twilio)

Posted

技术标签:

【中文标题】Flask Socket-IO 服务器客户端不通信(使用 tweepy 和 twilio)【英文标题】:Flask Socket-IO Server Client Not Communicating (with tweepy and twilio) 【发布时间】:2021-12-13 13:53:16 【问题描述】:

我正在尝试拥有一个 Flask 服务器,它允许我启动一个 tweepy 流,并且在流侦听器中收到的每条消息上,它都会将该消息发送到 socketio 客户端。 Flask 服务器同时应该允许 Twilio 向其发布消息,并将该消息路由到客户端,以便客户端同时接收来自 Twilio 和 twitter 的消息。

我一直在尝试让服务器将消息发送到客户端以获取来自 twitter 的数据,Twilio 的代码工作得很好。它在收到消息时将数据发送给客户端。 tweepy 中的主循环也没有锁定程序——我可以测试打印语句并查看推文和传入的短信在handle_message(msg) 函数中异步打印。我觉得这里一定有一些非常简单的东西,因为 SMS 被发送到客户端,但传入的推文不是,即使它们正在传播到 handle_message(msg) 函数。什么给了?

server.py

from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json

PATH = '/path/to/credentials/'
with open(PATH, "r") as file:
    credentials = json.load(file)

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, cors_allowed_origins="*")

auth = tweepy.OAuthHandler(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'])
auth.set_access_token(credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
api = tweepy.API(auth)

class MyListener(tweepy.StreamListener):
    def on_status(self, status):
        print('status')

    def on_data(self, data):
        handle_message(data)

    def on_error(self, status):
        print('error')
        print(status)

stream_listener = MyListener()

# twilio sms route
@app.route('/sms', methods=['POST'])
def sms():
    number = request.form['From']
    message_body = request.form['Body']
    message_data = "number": number, "msg": message_body
    resp = MessagingResponse()
    resp.message('Hello , you said: '.format(number, message_body))
    handle_message(message_data)
    return str(resp)

# flask-socketio stuff
@sio.on('connect')
def connect():
    print('connected')
    sio.emit('client_connected', "you connected")
    search_term = "#mysearchterm"
    stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
    stream.filter(track=[search_term], is_async=True)
    sio.emit('client_connected', "the search term is ".format(search_term))

@sio.on('disconnect')
def disconnect():
    print('Client Diconnected')

@sio.event
def handle_message(message):
    print("This is the message received: ", message)
    sio.emit('handle_message', message)

if __name__ == '__main__':
    sio.run(app)

client.py

import socketio

client = socketio.Client()

@client.on('client_connected')
def on_connect(message):
    print(message)

@client.on('handle_message')
def message(data):
    print(data)

client.connect('http://localhost:5000/')

【问题讨论】:

【参考方案1】:

我解决了我的问题!正如我在comment 中指出的那样,问题在于多线程和线程之间的信息传递。使用 tweepy,参数 is_async=True(在 4.1.0 中为 threading=True)会在流运行后打开一个新线程。

我没有尝试处理传递信息,而是利用using a local redis server as a message queue 现有的flask-socketio 功能(如果您是第一次设置它,请从“使用多个工人”部分开始,也请务必安装redis)。

这是更新后的server.py 代码。 client.py 代码基本保持不变:

import eventlet
eventlet.monkey_patch()
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json

PATH = '/PATH/TO/CREDENTIALS'
with open(PATH, "r") as file:
    credentials = json.load(file)

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, message_queue='redis://', cors_allowed_origins="*")

class MyStream(tweepy.Stream):
    def __init__(self, consumer_key, consumer_secret, access_token, access_secret):
        super(MyStream, self).__init__(consumer_key, consumer_secret, access_token, access_secret)
        self.stream_sio = SocketIO(message_queue='redis://')

    def on_status(self, status):
        print('status')

    def on_data(self, data):
        json_data = json.loads(data)
        self.stream_sio.emit('handle_message', json_data['text'])
        # TODO: Send along all necessary information

@app.route('/sms', methods=['POST'])
def sms():
    number = request.form['From']
    message_body = request.form['Body']
    message_data = "number": number, "msg": message_body
    resp = MessagingResponse()
    resp.message('Hello , you said: '.format(number, message_body))
    handle_message(message_data)
    return str(resp)
 
@sio.on('connect')
def connect():
    print('connected')
    sio.emit('client_connected', "you connected")
    search_term = "#testingtesting123"
    stream = MyStream(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'],
                           credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
    stream.filter(track=[search_term], threaded=True)
    sio.emit('client_connected', "the search term is ".format(search_term))

@sio.on('disconnect')
def disconnect():
    print('Client disconnected')

def handle_message(message):
    sio.emit('handle_message', message)

if __name__ == '__main__':
    sio.run(app)

【讨论】:

不错的解决方案,很高兴你解决了!【参考方案2】:

这里是 Twilio 开发者宣传员。

您已将handle_message 函数修饰为@sio.event 但as far as I can see in the docs,您应该只让handle_message 方法响应名为“handle_message”的套接字上的事件。

我将开始删除 @sio.event 装饰器。

我不是 Python 专家,但我也想知道这里是否存在范围问题。在定义 handle_message 方法之前定义 MyListener 类并创建它的实例。只是为了测试,您可以尝试直接在on_data 方法中发射到套接字:

    def on_data(self, data):
        sio.emit('handle_message', data)

如果可行,请考虑将handle_message 的定义移到MyListener 的定义之上。

【讨论】:

您好菲尔,感谢您的回复。我之前尝试过直接从 on_data 发射,但没有成功。我现在猜测这实际上是一个线程问题,因为我必须将 Tweepy 放入一个带有 is_async=True 位的单独线程中。那里的命名有点混乱——因为它不使用 asyncio。最后我决定改变并更新到更新版本的 tweepy。尽管如此,在功能上我也有同样的问题——但现在是多线程!如果您对如何从位于不同线程中的 tweepy 和烧瓶中传递数据有建议(或链接),我将不胜感激!

以上是关于Flask Socket-IO 服务器客户端不通信(使用 tweepy 和 twilio)的主要内容,如果未能解决你的问题,请参考以下文章

Socket-io客户端不断发送连接请求

如何将backbone.js 与websockets/socket-io/nowjs 一起使用

关于cocos creator 使用socket-io在生成微信小游戏遇到的各种问题

我无法从其他设备连接到Flask服务器(=不是从localhost)

使用flask_socketio实现客户端间即时通信

Python 脚本与将内容传输到客户端的 Python Flask 服务器通信的最佳方式是啥?