逐句回答,流式返回,ChatGPT采用的Server-sent events后端实时推送协议Python3.10实现,基于Tornado6.1

Posted 刘悦的技术分享

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了逐句回答,流式返回,ChatGPT采用的Server-sent events后端实时推送协议Python3.10实现,基于Tornado6.1相关的知识,希望对你有一定的参考价值。

善于观察的朋友一定会敏锐地发现ChatGPT网页端是逐句给出问题答案的,同样,ChatGPT后台Api接口请求中,如果将Stream参数设置为True后,Api接口也可以实现和ChatGPT网页端一样的流式返回,进而更快地给到前端用户反馈,同时也可以缓解连接超时的问题。

Server-sent events(SSE)是一种用于实现服务器到客户端的单向通信的协议。使用SSE,服务器可以向客户端推送实时数据,而无需客户端发出请求。

SSE建立在HTTP协议上,使用基于文本的数据格式(通常是JSON)进行通信。客户端通过创建一个EventSource对象来与服务器建立连接,然后可以监听服务器发送的事件。服务器端可以随时将事件推送给客户端,客户端通过监听事件来接收这些数据。

ChatGPT的Server-sent events应用

首先打开ChatGPT网页端,随便问一个问题,然后进入网络选单,清空历史请求记录后,进行网络抓包监听:

可以看到,在触发了回答按钮之后,页面会往后端的backend-api/conversation对话接口发起请求,但这个接口的通信方式并非传统的http接口或者Websocket持久化链接协议,而是基于EventSteam的事件流一段一段地返回ChatGPT后端模型的返回数据。

为什么ChatGPT会选择这种方式和后端Server进行通信?ChatGPT网页端使用Server-sent events通信是因为这种通信方式可以实现服务器向客户端推送数据,而无需客户端不断地向服务器发送请求。这种推送模式可以提高应用程序的性能和响应速度,减少了不必要的网络流量。

与其他实时通信协议(如WebSocket)相比,Server-sent events通信是一种轻量级协议,易于实现和部署。此外,它也具有广泛的浏览器兼容性,并且可以在不需要特殊网络配置的情况下使用。

在ChatGPT中,服务器会将新的聊天消息推送到网页端,以便实时显示新的聊天内容。使用Server-sent events通信,可以轻松地实现这种实时更新功能,并确保网页端与服务器之间的通信效率和稳定性。

说白了,降低成本,提高效率,ChatGPT是一个基于深度学习的大型语言模型,处理自然语言文本需要大量的计算资源和时间。因此,返回响应的速度肯定比普通的读数据库要慢的多,Http接口显然并不合适,因为Http是一次性返回,等待时间过长,而Websocket又过重,因为全双工通信并不适合这种单项对话场景,所谓单项对话场景,就是对话双方并不会并发对话,而是串行的一问一答逻辑,同时持久化链接也会占用服务器资源,要知道ChatGPT几乎可以算是日均活跃用户数全球最高的Web应用了。

效率层面,大型语言模型没办法一下子返回所有计算数据,但是可以通过Server-sent events将前面计算出的数据先“推送”到前端,这样用户也不会因为等待时间过长而关闭页面,所以ChatGPT的前端观感就是像打字机一样,一段一段的返回答案,这种“边计算边返回”的生成器模式也提高了ChatGPT的回答效率。

Python3.10实现Server-sent events应用

这里我们使用基于Python3.10的Tornado异步非阻塞框架来实现Server-sent events通信。

首先安装Tornado框架

pip3 install tornado==6.1

随后编写sse_server.py:

import tornado.ioloop  
import tornado.web  
  
  
push_flag = True  
  
from asyncio import sleep  
  
  
class ServerSentEvent(tornado.web.RequestHandler):  
  
    def __init__(self, *args, **kwargs):  
        super(ServerSentEvent, self).__init__(*args, **kwargs)  
        self.set_header('Content-Type', 'text/event-stream')  
        self.set_header('Access-Control-Allow-Origin', "*")  
        self.set_header("Access-Control-Allow-Headers","*")  
        # 请求方式  
        self.set_header("Access-Control-Allow-Methods","*")  
  
    # 断开连接  
    def on_finish(self):  
        print("断开连接")  
        return super().on_finish()  
  
    async def get(self):  
        print("建立链接")  
        while True:  
            if push_flag:  
                print("开始")  
                self.write("event: message\\n");  
                self.write("data:" + "push data" + "\\n\\n");  
                self.flush()  
                await sleep(2)

建立好推送路由类ServerSentEvent,它继承Tornado内置的视图类tornado.web.RequestHandler,首先利用super方法调用父类的初始化方法,设置跨域,如果不使用super,会将父类同名方法重写,随后建立异步的get方法用来链接和推送消息,这里使用Python原生异步的写法,每隔两秒往前端推送一个事件message,内容为push data。

注意,这里只是简单的推送演示,真实场景下如果涉及IO操作,比如数据库读写或者网络请求之类,还需要单独封装异步方法。

另外这里假定前端onmessage处理程序的事件名称为message。如果想使用其他事件名称,可以使用前端addEventListener来订阅事件,最后消息后必须以两个换行为结尾。

随后编写路由和服务实例:

def make_app():  
    return tornado.web.Application([  
        (r"/sse/data/", ServerSentEvent),  
    ])  
  
if __name__ == "__main__":  
    app = make_app()  
    app.listen(8000)  
    print("sse服务启动")  
    tornado.ioloop.IOLoop.current().start()

随后在后台运行命令:

python3 sse_server.py

程序返回:

PS C:\\Users\\liuyue\\www\\videosite> python .\\sse_server.py  
sse服务启动

至此,基于Tornado的Server-sent events服务就搭建好了。

前端Vue.js3链接Server-sent events服务

客户端我们使用目前最流行的Vue.js3框架:

sse_init:function()  
  
  
          var push_data = new EventSource("http://localhost:8000/sse/data/")  
        push_data.onopen = function (event)   
            // open事件  
            console.log("EventSource连接成功");  
        ;  
         
  
        push_data.onmessage = function (event)   
    try   
        console.log(event);  
     catch (error)   
        console.log('EventSource结束消息异常', error);  
      
;  
  
  
       push_data.onerror = function (error)   
    console.log('EventSource连接异常', error);  
;  
  
  
      

这里在前端的初始化方法内建立EventSource实例,通过onmessage方法来监听后端的主动推送:

可以看到,每隔两秒钟就可以订阅到后端的message事件推送的消息,同时,SSE默认支持断线重连,而全双工的WebSocket协议则需要自己在前端实现,高下立判。

结语

不仅仅可以实现ChatGPT的流式返回功能,SSE在Web应用程序中的使用场景非常广泛,例如实时的新闻推送、实时股票报价、在线游戏等等,比起轮询和长轮询,SSE更加高效,因为只有在有新数据到达时才会发送;同时SSE支持自定义事件和数据,具有更高的灵活性和复用性,为流式数据返回保驾护航,ChatGPT的最爱,谁不爱?最后奉上项目地址,与众乡亲同飨:github.com/zcxey2911/sse_tornado6_vuejs3

从chatGPT到语音回答雏形的python实现

从玩玩chatGPT说起

起因

最近,哦,已经不是最近了,挺长一段时间了,chatGPT火了。出于好奇,注册了账号,而且尝试与其文字对话,询问问题,还有在工作中让其帮忙翻译以及进行语言学习,发现----chatGPTj是一位好老师。虽然这个老师有时候会一本正经地胡说八道,但综合下来,百分之九十以上可以给出比较靠谱回答的。而且其回答的方式很亲和,即使出错,也可以引导其给出正确回答,确实是目前为止非常高级的AI了。不满足于只是文字交流,在想是否可以通过语音交流呢?(由于本人一直活在小世界孤陋寡闻应该早已有此实现,见谅!)恰好又刷 到一个视频,大致内容是某位大神通过树莓派实现了自己的语音助手。于是有此次尝试,希望在没有硬件的前提下考虑实现。

想法

想法其实很简单,如视频中大神的想法。语音唤醒——语音识别——发送识别文本到openai的API并取得文字回答内容——文本转语音播报。本身各部分实现都有现成的接口了应该,考虑胶水语言python完成。

先文本转语音播报

第一步想到的先从文本转语音播放开始。为什么从此步开始而不是语音识别呢?主要是我觉得这一步最简单吧。具体实现代码呢,拼拼凑凑,不会的就问chatGPT,再次赞赏一下这个好老师。完成函数方法实现。

# 该函数用于实现语音转文字并播报
def say_txt_as_audio(text):
    directory = "./chat-content"
    if not os.path.exists(directory):
        os.makedirs(directory)

    text = text.strip()
    if len(text) > 0:
        print("AI:" + text)
        # 将参数传送过来的txt内容转换为语音保存为mp3文件
        now = datetime.now()
        save_name = now.strftime("%Y%m%d%H%M%S.mp3")
        tts = gTTS(text, lang='zh-cn')
        tts.save("chat-content/" + save_name)

        # 通过pygame模块播放保存的mp3文件
        pygame.init()
        pygame.mixer.music.load("chat-content/" + save_name)
        pygame.mixer.music.play()
        while pygame.mixer.music.get_busy():
            pygame.time.wait(1000)
        pygame.quit()

这部分代码有使用pygame,所以不能忘了import pygame。另外由于执行的时候会有部分无关的文字输出到控制台,所以可能会像我一样修改环境设定。主要就是在import pygame之前有如下两行。

from os import environ
environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'  # pygame导入前此参数设定用于限制pygame的无关内容输出

经执行确认,可以正常将指定文字内容转成mp3文件,并即时播报。
mp3文件名为年月日进分秒。
对了,还有gtts的导入。from gtts import gTTS。

再来语音转文本

这一步就是把上边功能的文本变量的值的来源变成我们自己说的话,只要把我们说话的语音变成文本赋给这个变量就可以衔接上边功能了。嗯,就是这样。感谢google及chatGPT,不费时的就完成了。

# 该函数用于实现将语音的输入转换为文字
def convert_audio_to_text():
    # 创建麦克风的识别器对象
    r = sr.Recognizer()

    # 打开麦克风并录制音频
    with sr.Microphone() as source:
        print("我在听,你现在可以开始说话啦!~~")
        audio = r.listen(source)

    # 识别语音
    text = ""
    try:
        text = r.recognize_google(audio, language='zh-CN', show_all=False)
        print("你:" + text)
        with open("chatHistory.txt", "a", encoding="utf-8") as file:
            file.write("你:" + text)
    except Exception as e:
        print("没听到,无法识别:" + str(e))
    return text

至于代码中的sr当然是speech_recognition了,所以import部分记得添加

import speech_recognition as sr 

就好了。由于想要将对话内容输出打印到控制台,且保存到文本文档中。所以代码中有print与write动作。

连接AI接口

了解到目前chatGPT并未开放API,所以只能使用opanai的API,貌似连接的是达芬奇3机器人,后来尝试确实不如在线与chagGPT沟通来得舒服。
首先要取得API key,可以在网站上点击获得。代码中有用到。

# 将指定内容发送给AI并取得回答返回
def get_questions_ai_answer(question):
    ai_answer_content = ""
    if len(question) > 0:
        openai.api_key = ""
        with open("OpenAiApiKey", 'r') as f:
            api_key = f.read().strip()
            openai.api_key = api_key

        response = openai.Completion.create(
            model="text-davinci-003",
            prompt=question,
            temperature=0.9,
            max_tokens=2048, #150 OpenAI的API实现中,max_tokens参数的上限值为4096。
            top_p=1,
            frequency_penalty=0.0,
            presence_penalty=0.6,
            stop=[" Human:", " AI:"]
        )
        response_result_text = response.get("choices")[0].get("text")
        ai_answer_content = response_result_text
        with open("chatHistory.txt", "a", encoding="utf-8") as file:
            file.write("\\nAI:" + ai_answer_content.strip() + "\\n")
    return ai_answer_content

此部分用于连接前边的两步实现。即接收语音转文字的结果,发给openai再将回答结果传给文字转语音的实现部分。
开始询问chatGPT给出的示例代码中max_tokens设置为150。这样的情况下,当AI回答你问题的输出内容过多时,会中断,也就是说半句话就戛然而止了,经询问chatGPT,可调引参数,了解到上限是4096,所以改150到了2048。
至于OpenAiApiKey文件中存放的内容,就是在openai网站上取得的key值了。不公开了。

再来添加唤醒

上面三步串起来,基本可以完成从说话到ai语音回答了。但是每次都是单个python文件执行才能开始说一回。所以想到还是得加上语音唤醒才可以。不表经过,选了pocketsphinx,网上说要什么sphinxbase就是这一步折腾了挺长时间,不过后来发现纯粹是语音模型配置的路径不对而已。所以,我猜现在导入pocketsphinx库就可以了,当然swig库也加入到工程了,以防万一。

import os
import openai
import speech_recognition as sr
from gtts import gTTS
from datetime import datetime, timedelta
from os import environ

environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'  # pygame导入前此参数设定用于限制pygame的无关内容输出
import pygame
from pocketsphinx import LiveSpeech, get_model_path

model_path = get_model_path()

# 创建语音识别对象,用于监听说话,以备唤醒
speech = LiveSpeech(
    verbose=False,
    sampling_rate=16000,
    buffer_size=2048,
    no_search=False,
    full_utt=False,
    #hmm=os.path.join(model_path, 'en-us', 'en-us'),
    #lm=os.path.join(model_path, 'en-us', 'en-us.lm.bin'),
    #dic=os.path.join(model_path, 'en-us', 'cmudict-en-us.dict')
    # 此处切用网上下来的中文语音模型
    hmm = os.path.join(model_path, 'cmusphinx-zh-cn-5.2', 'zh_cn.cd_cont_5000'),
    lm = os.path.join(model_path, 'cmusphinx-zh-cn-5.2', 'zh_cn.lm.bin'),
    dic = os.path.join(model_path, 'cmusphinx-zh-cn-5.2', 'zh_cn.dic')
)

上面代码的Import部分,包括前面几步的import方便就一齐粘过来了。主要是构建了speech对象,但是还没使用,所以还要有下面一部分

# 开始语音识别
for phrase in speech:
    print("识别到的语音内容:", phrase)
    # 这里再执行唤醒后的动作

空间考虑

考虑到MP3文件存放大小的问题,又是以年月日时分秒命名的mp3文件,所以干脆每次问答的时候都删除几分种以前的文件好了。所以添加了一个文件删除的方法。

# 为了节省磁盘占用空间,删除指定分钟数以前的mp3文件。在每次执行前开始算起。
def delete_mp3_files_several_minutes_ago(minute):
    now = datetime.now()
    several_minutes_ago = now - timedelta(minutes=minute)
    several_minutes_ago_str = several_minutes_ago.strftime("%Y%m%d%H%M%S")

    for filename in os.listdir("chat-content/"):  # 遍历该路径下的所有文件
        if not filename.endswith('.mp3'):  # 如果不是mp3文件,跳过
            continue
        else:
            file_time = filename[:-4]  # 取出文件名即时间
            if file_time < several_minutes_ago_str:
                # 如果该文件是指定分钟数以前的,删除
                os.remove(os.path.join("chat-content/", filename))

至于保存的txt文档一直在变大的问题,其实也可以判断大小清除就好啦,毕竟是文本内容,暂不考虑,暂不实现。

串成线

前面几个基本可以串起来了,在语音唤醒之后的动作部分把几个函数方法体串起来就可以了。

# 开始语音识别
for phrase in speech:
    print("识别到的语音内容:", phrase)
    word_heared = str(phrase)
    if word_heared == "如果":
        print("我被唤醒了,您请说话!")
        # 语音识别人类说话
        you_said_txt = convert_audio_to_text()
        # 调用AI的API将语音识别内容发送,并将回答取回
        ai_answer_text = get_questions_ai_answer(you_said_txt)
        # 语音播报AI的回答
        say_txt_as_audio(ai_answer_text)
        # 为了不长期占用磁盘空间,只保存当前时间算起几分钟以内的mp3音频文件,其他生成的mp3文件进行删除
        delete_mp3_files_several_minutes_ago(3)

上面把关键词设成了【如果】,至于为什么设成如果,是因为这个模型的错误率实在太高了,试了几个词这个词相对能听清,测试嘛,就用它了。整体执行下来还可以。

问题与想法

整体跑起来,主要有两点问题,一个是响应速度,在询问问题后到播放出语音来间隔时间有点长,再琢磨吧。再有一个问题就是这个唤醒的错误率太高了,我说了好多次【如果】它才有一次识别对了。这个貌似可以通过更换模型来解决。可模型查了查,不好搞,暂先搁置吧。
还有一点想法,可以整出一个应用界面来,就一个钮,点击可以问ai问题,看文字回答并收听语音朗读。或者还可以进行设置,达到实时监听唤醒词。当然可能会考虑更多东西才可以。这两天摸鱼,搞了这么个东西,聊以记录一下。
PS:翻出了几年前获赠的google home mini,原来只当个小音箱,现在忽然想想是否可以让google assistant 连openai呢。由于openai是预训练模型能不能实时与assistant切换呢。在Actions on Google上可以搞定的么,有空再研究一下。

以上是关于逐句回答,流式返回,ChatGPT采用的Server-sent events后端实时推送协议Python3.10实现,基于Tornado6.1的主要内容,如果未能解决你的问题,请参考以下文章

看!前端新人如何用ChatGPT开发APP

用ChatGPT三分钟免费做出数字人视频- 提升自媒体魅力

ChatGPT 使用 拓展资料:用 Rasa Open Source 和 ChatGPT 回答有关结构化数据的问题

看ChatGPT如何回答微博签到数据相关问题。

ChatGPT爆火!它如何回答 Python 相关问题

ChatGPT爆火!它如何回答 Python 相关问题