flask web——实时通讯聊天服务项目推送通知功能的实现
Posted 胖虎是只mao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flask web——实时通讯聊天服务项目推送通知功能的实现相关的知识,希望对你有一定的参考价值。
一、聊天服务实现
在toutiao-backend/im目录中创建server.py
import socketio
# 创建sio对象
sio = socketio.Server(async_mode='eventlet')
app = socketio.Middleware(sio)
在toutiao-backend/im目录中创建im服务启动程序main.py
运行方式python main.py
[端口],如python main.py 8000
import eventlet
eventlet.monkey_patch()
import eventlet.wsgi
import sys
# 通过sys模块 获取启动命令中的参数 sys.argv
# # python main.py 8001 ...
# sys.argv -> ['main.py', '8001', ...]
# 获取命令行参数,目的是想让im服务运行的端口在启动程序时指定
if len(sys.argv) < 2:
# 表示启动时忘了传递端口号参数
print('Usage: python main.py [port]')
exit(1) # 表示程序异常退出
port = int(sys.argv[1])
# 通过导入事件处理模块的方法,让主程序知道事件处理方法的存在
from server import app
import notify
# socketio服务器运行的地址
SERVER_ADDRESS = ('', port)
# 创建协程服务器 并启动
# SERVER_ADDRESS = ('', 8000)
# 需求 想要将端口不写死在程序代码中,想要在启动的时候执行端口号
# python server.py [port]
# python server.py 8001
# 启动socketio服务器
sock = eventlet.listen(SERVER_ADDRESS)
eventlet.wsgi.server(sock, app)
在toutiao-backend/im目录中创建chat.py
from server import sio
import time
# 跟客户端的约定,
# 对于聊天场景,通讯时使用message事件
# 在聊天的通讯中,传输的聊天数据约定格式
# {
# "msg": "",
# "timestamp": 发送或接受消息的时间戳
# }
@sio.on('connect')
def on_connect(sid, environ):
"""
与客户端建立好连接后被执行
"""
print('sid={}'.format(sid))
print('environ={}'.format(environ))
# 向客户端发送事件消息
msg_data = {
'msg': 'hello',
'timestamp': round(time.time()*1000)
}
sio.emit('message', msg_data, room=sid)
@sio.on('message')
def on_message(sid, data):
"""
接收message事件消息时执行
"""
"""
客户端向服务器发送聊天的事件消息时 被调用
:param sid:
:param data:
:return:
"""
# 获取用户说的信息 data
# TODO 使用rpc 调用聊天机器人子系统 获取回复内容
print('sid={} data={}'.format(sid, data))
msg_data = {
'msg': 'I have received your msg: {}'.format(data),
'timestamp': round(time.time()*1000)
}
sio.send(msg_data, room=sid)
# sio.emit('message', msg_data, room=sid)
使用firecamp.app进行测试
二、在线消息推送实现
需求
在头条的Flask应用中,用户关注后需要推送消息,通过消息队列告知IM服务为用户进行推送
Socker.io 代替了celery,详细可看生产者消费者模式
在Socket.IO 框架中可以选择使用以下两种方式作为消息中间件:
-
使用Redis
mgr = socketio.RedisManager('redis://') sio = socketio.Server(client_manager=mgr)
-
使用RabbitMQ
pip install kombu mgr = socketio.KombuManager('amqp://') sio = socketio.Server(client_manager=mgr)
实现
因为要给指定的用户推送消息,所以需要用到用户的身份,用户在客户端携带JWT连接SocketIO服务器,我们在服务器端对jwt token进行验证,对于验证出用户身份(user_id)的客户端,将其添加到名为用户id的room房间中,方便按照user_id进行推送。
socketio服务端编写
在toutiao-backend/im/main.py中补充添加搜寻包的路径,方便使用utils中的jwt_utils
模块
import eventlet
eventlet.monkey_patch()
import eventlet.wsgi
import sys
import os
# 补充搜索包路径
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR, 'common'))
if len(sys.argv) < 2:
print('Usage: python main.py [port]')
exit(1)
port = int(sys.argv[1])
from server import app
import chat
import notify
SERVER_ADDRESS = ('', port)
sock = eventlet.listen(SERVER_ADDRESS)
eventlet.wsgi.server(sock, app)
在toutiao-backend/im/server.py文件中补充消息队列rabbitmq的配置信息和jwt使用的秘钥
import socketio
RABBITMQ = 'amqp://python:rabbitmqpwd@localhost:5672/toutiao'
JWT_SECRET = 'TPmi4aLWRbyVq8zu9v82dWYW17/z+UvRnYTt4P6fAXA'
mgr = socketio.KombuManager(RABBITMQ)
sio = socketio.Server(async_mode='eventlet', client_manager=mgr)
app = socketio.Middleware(sio)
在toutiao-backend/im目录中新建notify.py
from server import sio, JWT_SECRET
from werkzeug.wrappers import Request
from utils.jwt_util import verify_jwt
def check_jwt_token(environ):
"""
检验jwt token
:param environ:
:return:
借助werkzeug提供的Request类,将environ字典转换为我们熟悉的request对象,从对象中读取属性的方式来获取客户端的请求信息
request = Request(environ) # 等价于flask 的request对象
"""
request = Request(environ)
# 从查询字符串中取出jwt token
token = request.args.get('token')
if token:
payload = verify_jwt(token, JWT_SECRET)
if payload:
user_id = payload.get('user_id')
return user_id
return None
@sio.on('connect')
def on_connect(sid, environ):
"""
与客户端建立连接后执行
当客户连接时被执行
:param sid:
:param environ: dict 解析客户端握手的http数据
:return:
"""
# 检验连接客户端的jwt token
user_id = check_jwt_token(environ)
print('user_id={}'.format(user_id))
# 若检验出user_id,将此客户端添加到user_id的room中
if user_id:
sio.enter_room(sid, str(user_id))
@sio.on('disconnect')
def on_disconnect(sid):
"""
与客户端断开连接时执行
"""
# 客户端离线时将客户端从所有房间中移除,不再推送
rooms = sio.rooms(sid)
for room in rooms:
sio.leave_room(sid, room)
socket.io
主动向客户端推送,不需要写推送方法
flask web服务端编写
在toutiao-backend/toutiao/__init__.py
中 添加sio对象的创建
import socketio
def create_app(config, enable_config_file=False):
"""
创建应用
:param config: 配置信息对象
:param enable_config_file: 是否允许运行环境中的配置文件覆盖已加载的配置信息
:return: 应用
"""
...
# socket.io
# 通过sio mgr对象 可以发布要进行及时消息推送的任务,由socketio服务器从rabbitmq中取出任务,推送消息
app.sio = socketio.KombuManager(app.config['RABBITMQ'], write_only=True)
# write_only=True 仅仅发布,不会从队列中取
...
然后,可以在视图中发布任务
在toutiao-backend/toutiao/resources/user/following.py 用户关注接口视图中添加发送事件消息
class FollowingListResource(Resource):
"""
关注用户
"""
method_decorators = {
'post': [login_required],
'get': [login_required],
}
def post(self):
"""
关注用户
"""
# 关注用户的数据库保存
...
# 发送关注通知
_user = cache_user.UserProfileCache(g.user_id).get()
_data = {
'user_id': g.user_id,
'user_name': _user['name'],
'user_photo': _user['photo'],
'timestamp': int(time.time())
}
'''
通过socketio提供的kombu管理对象 向rabbitmq中写入数据,记录需要由socketio服务器向客户端推送消息的任务
这里不是socket.io通讯,只是向mq中写入数据
'''
current_app.sio.emit('following notify', data=_data, room=str(target))
return {'target': target}, 201
效果:
以上是关于flask web——实时通讯聊天服务项目推送通知功能的实现的主要内容,如果未能解决你的问题,请参考以下文章