Python从环境搭建到写出聊天机器人--保姆级教程,深入浅出带图详细流程

Posted 提笔忘字的帝国

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python从环境搭建到写出聊天机器人--保姆级教程,深入浅出带图详细流程相关的知识,希望对你有一定的参考价值。

目录

🐍环境搭建

一、下载Python

二、安装Python

三、检验Python 

四、下载PyCharm  

 五、安装IDE——PyCharm 

六、激活PyCharm  

七、创建Python程序

🐍正文部分

一、什么是Python? 

二、思路分析

三、准备阶段 

 四、代码实现

五、对代码的解释  

六、总结


大家好呀,我是爷爷的茶七里香,这个名字有没有让你想起周总的那两首歌呢?言归正传,本文将会从环境的搭建教大家一步步完成一个Ai智能回复机器人的制作,只需要这一篇文章哦,环境的搭建也会教大家如何激活使用IDE——PyCharm!认真看完你会有收获的!!!

🐍环境搭建

一、下载Python

我下载的Python版本是3.7.8的,各位可根据自身情况下载,如图:

二、安装Python

安装需要注意的地方:Add Python 3.7 to PATH一定要勾上(不勾选的话需要自己添加环境变量,比较麻烦);看图:

点击Install Now就会自行安装了:

安装完成后关闭即可:

三、检验Python 

好了,到这里我们不确定Python是否成功的安装了,我们可以测试一下;键盘按住win+r快捷键,在弹出的窗口中输入cmd:

然后输入以下命令:(意思是查看下Python的版本,如果能查看到版本说明安装是成功的)

Python -V

  

四、下载PyCharm  

版本需要小伙伴们自行选择哦,PyCharm我使用的版本是2019.3.5的,如图:

 

 五、安装IDE——PyCharm 

 这就是我们要写Python代码的软件啦,下载完成后需要右键使用管理员的方式打开哦:

 

下一步之后安装路径看自己情况来选择(我一般不放在C盘): 

         需要注意的地方:一定要勾选Add launchers dir to the PATH

 

等待安装。。。

到这我们的PyCharm 就安装好啦,下面就开始教大家怎么激活使用PyCharm 

六、激活PyCharm  

网上激活的方式有很多种,例如即将演示无限激活30天的方式;当安装好之后我们需要打开PyCharm,打开之后会有下面这个弹出:

往下走,选择Send Usage Statistics:

选择Skip Remaining and Set Defaults:

  随后会弹出一个主页面,让我们选择右下角的Configure -> Plugins :

 点击右边的一个加(+)号,然后添加地址:

https://plugins.zhile.io

然后搜索IDE Eval Reset这个插件名 :

重启完成后插件就已经安装好了,下面需要创建一个Python项目,进入里面对插件设置一下:

新建一个项目 

选择项目存放的路径然后下一步 

项目创建完成,关闭不必要的提示 

需要对刚刚下载的插件设置一下 

一定要勾选Auto reset before per restart这个选项,不勾选它不会自动重置 

 

 重启完PyCharm你就可以一直使用啦~

七、创建Python程序

给它起个名吧,不要用中文哦!

🐍正文部分

一、什么是Python? 

什么是Python?哈哈~我也不是很清楚啊~咱们看看维基百科怎么说的吧!

Python是一种广泛使用的解释型、高级和通用的编程语言。Python支持多种编程范型,包括函数式、指令式、反射式、结构化和面向对象编程。它拥有动态类型系统和垃圾回收功能,能够自动管理内存使用,并且其本身拥有一个巨大而广泛的标准库。它的语言结构以及面向对象的方法旨在帮助程序员为小型的和大型的项目编写清晰的、合乎逻辑的代码;

相必大多数人学Python是因为爬虫吧?

二、思路分析

我们如何来实现能聊天的机器人呢?你需要理解一个概念:

客户端发送请求 -> 服务器接受请求 -> 服务器处理请求并响应请求 -> 客户端接受响应

我们有现成的接口可以调用,也就是向服务器端发送请求之后就能获取到对应的数据,这样就达到了跟机器人聊天的目的;如果对接口不了解的可以理解成一个链接,简单的来说就是我们用这个链接发送什么,它就能回复我们什么!!!

三、准备阶段 

发送请求需要的东西:

 既然知道了浏览器可以向服务器发送请求可以获取到响应数据,那么代码中如何实现呢?这就要引入Python提供的一个库——urllib,这个库可以理解成是代码中的浏览器,只是为了方便你理解它,但不要把浏览器的概念用在它身上,我这么说肯定是不对的,也是为了方便小白能理解嘛!!!

两行代码就这样诞生啦:

其中import意思就是导入某个东西的意思,只有导入了我们才能在以下代码中使用它!

响应回来需要的东西: 

浏览器发送请求接受到响应数据之后会进行渲染的相关操作,也就是响应回来后的数据没法直接展示,还需要一些处理,当然在我们的代码接受到响应数据之后也是需要进行相关处理的 ,比如将数据转换成字典类型的数据,这样就能方便我们提取需要的东西,所以我们需要导入这么一个东西:

 四、代码实现

注:# -*- coding:utf-8 -*- 的意思是使用utf-8的编码格式

再看下效果:

有意思吧?代码已经放出来了,自己捣鼓捣鼓吧!!! 

注:# -*- coding:utf-8 -*- 的意思是使用utf-8的编码格式

# -*- coding:utf-8 -*-
# 向服务器发送请求
import urllib.request
# 对中文进行ASCII字符的转换
import urllib.parse
# 可以将字符串转换成字典类型
import json

# 从键盘输入
myStr = input("跟AI说句话吧:")
# 对输入的字转码
text = urllib.parse.quote(myStr)
# 将转码的内容拼接到链接上
url = '='.format('http://api.qingyunke.com/api.php?key=free&appid=0&msg', text)
# 向服务器发送请求
response = urllib.request.urlopen(url)
# 读取响应数据并转换成utf-8编码
responses = response.read().decode("utf-8")
# 转换成字典类型
responseText = json.loads(responses)
# 从字典类型取出键为content的值并打印输出
print(responseText["content"])

五、对代码的解释  

1、为什么要转码呢?

答:如果你输入的是英文的话是可以不需要进行转码的,但是中文的话需要进行转码,不然AI识别不出来你发的是啥!

2、为什么要将响应数据转换成utf-8编码?

答:不转换成utf-8编码的话读取的将会是乱码,后续也无法转换成字典类型

3、responseText["content"]是个啥?

 答:问这个问题的同学说明对Python的数据类型不了解,这是对Python中的字典类型取值的操作,那为啥双引号里面写的是content呢?看下图:

六、总结

 有意思吧?当你问它某地的天气情况,它也能回复你哟:

代码可以复制下来自己玩哟,当然你也可以自己加一些逻辑,比如一直循环下去,你就能一直跟它聊天啦!

🥇原创不易,还希望各位大佬支持一下!

👍点赞,你的认可是我创作的动力 !

🌟收藏,你的青睐是我努力的方向!

✏️评论,你的意见是我进步的财富! 

教程万字长文保姆级教你制作自己的多功能QQ机器人

转载请注明出处:小锋学长生活大爆炸( https://xfxuezhang.blog.csdn.net/)
若发现存在部分图片缺失,可以访问原文: 万字长文保姆级教你制作自己的多功能QQ机器人 - 小锋学长生活大爆炸

​​​​​​

目录

前言

功能清单

免费领取轻量应用云服务器

SSH连接服务器

常见Ubuntu软件安装与问题修复

搭建mirai环境

Python控制mirai篇

debug输出封装

交互授权

绑定bot

释放bot

未读消息的数量

获取最新的消息

解析消息内容

向好友发送消息

向群发送消息

向群发送富文本消息

Q群消息转发

类似QMsg酱的消息通知

多功能切换的实现设计

翻译查询

领取腾讯免费翻译API

机器人接入翻译功能

实时天气

领取免费的和风天气API

机器人接入天气功能

实时热搜

领取免费的天行热搜API

机器人接入热搜功能

照片上传

领取腾讯对象存储COS

机器人接入图片上传功能

自行添加小功能函数总结

控制树莓派舵机与屏显

腾讯云服务器搭建MQTT环境

待实现功能

接入控制ESP32(实现智能家居控制)

完整代码整理


前言

        QQ、微信是我们平常使用最多的通讯工具,网上也有很多通过软件去控制QQ/微信的开源工具,通过这些工具,我们可以实现许多有意思的效果,而不仅仅局限于消息聊天。
        自从微信网页版被官方禁用后,微信的软件工具几乎已经失效了,现有的一些是通过hook微信本身来实现,这种很容易被官方检测并封号。另一些是通过注册企业号来控制,但不直观且功能受限。
        这里我们借助相对更开放的QQ来制作我们的机器人,对比几款工具后,最终选择了mirai

先放一张整体结构图:

 

功能清单

        网上现有开源的机器人大多只是实现了类似“自动推送天气、接入图灵机器人自动聊天”等等,大多属于自娱自乐,没有发挥最大用途。
        因此,我们的QQ机器人(暂且取名为“小锋仔”)是根据日常所需而制,包含常用功能且设计得易于扩展。
目前包含的功能有:

  • 类似QMsg酱的消息通知
  • QQ群消息转发
  • 翻译查询
  • 照片上传
  • 实况天气
  • 实时热搜
  • 控制树莓派舵机
  • 控制树莓派屏显
  • ... ...

将来可能包含的功能有:

  • 接入控制ESP32(实现智能家居控制)

        接下来详细介绍如何自己搭建一个这样的QQ机器人。篇幅较长保姆级详细,建议收藏后慢慢看。

免费领取轻量应用云服务器

首先为了 能运行mirai,且随时随地能连接,我们需要有一个 具备公网IP的服务器。这里使用腾讯云的 免费服务器

        对于还不想买的童靴,可以免费领取腾讯云提供的1个月服务器试用套餐。步骤:

  1. 进入官网领取:云产品免费试用;需要选购的进:轻量应用服务器专场 (限时3年388,点我进);不清楚怎么操作的可以看教程:腾讯云产品免费试用教程
  2. 领取完成后,由于后面需要用到端口,因此这里我们提前开放2个端口:88889966

        这里腾讯云可能有个小特点。如果发现在控制台防火墙放行后,还是无法访问。需要再在服务器里放行一下端口。这里先写着,大家可以在后面一节中连接上了服务器,再回过来这里输入指令。

sudo apt install firewalld -y
sudo firewall-cmd --list-all
sudo firewall-cmd --permanent --zone=public --add-port=8888/tcp && sudo firewall-cmd --reload
sudo firewall-cmd --permanent --zone=public --add-port=9966/tcp && sudo firewall-cmd --reload
sudo systemctl start firewalld.service

SSH连接服务器

        服务器初始化完成后,就可以通过SSH去连接了。这里我们可以直接使用powershell来连接,其他SSH软件我强推mobaxterm!!安装包也已经准备好了:MobaXterm.exe

  1. 搜索打开powershell:

  1. 输入以下命令连接SSH:
ssh 用户名@<公网ip>
  1. 或者使用MobaXterm软件:

  1. 先更新一下软件库:
sudo apt upgrade -y
sudo apt autoremove -y
  1. 一般不建议使用管理员账户,因此我们要自己新建一个账户:
sudo adduser sxf


然后将账户加入sudoers组:

sudo apt install vim
sudo vim /etc/sudoers


        然后退出软件,重新用新建的账号登录即可。
        至此,服务器环境就搭建完成了。

常见Ubuntu软件安装与问题修复

        这篇博客里记录了很多我在使用过程中,常用软件的安装,非常详细且经过亲测,时不时也会更新内容,大家可以收藏以备下次使用。
Ubuntu20.04 + VirtualBox相关_小锋学长生活大爆炸的博客-CSDN博客


搭建mirai环境

        接下来就要在服务器上搭建QQ机器人(mirai)基础环境。搭建完成后,我们就可以远程跟机器人进行交互。
        官方mirai的github仓库:GitHub - mamoe/mirai: 高效率 QQ 机器人支持库
        由于github是国外的,而官方已经不再支持gitee的维护,因此如果大家无法访问上面的连接,可以用我帮大家下载下来的安装包:
        其他的一些文档:Mirai | mirai
        官方论坛:主页 | MiraiForum
下面开始正式安装:

  1. 先SSH连接上服务器,建议不要用root用户登录。
  2. 下载安装包mcl-installer-a02f711-linux-amd64:
mkdir qqbot
cd qqbot
wget http://xfxuezhang.cn/web/share/QQBot/mcl-installer-a02f711-linux-amd64
sudo chmod +x mcl-installer-a02f711-linux-amd64

        此时需要输入密码(在上面选购并装完服务器后会显示,当时要求记下的)。

./mcl-installer-a02f711-linux-amd64

        此时进入安装流程,弹出的几个选项都直接回车选默认即可。

  1. 安装完成后,还需要安装mirai-api-http。在当前页面下,继续输入:
./mcl --update-package net.mamoe:mirai-api-http --channel stable-v2 --type plugin
  1. 编辑_config/net.mamoe.mirai-api-http/setting.yml_配置文件 (没有则自行创建)
## 配置文件中的值,全为默认值

## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
adapters:
  - http
  - ws

## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
## 建议公网连接时开启
enableVerify: true
verifyKey: 1234567890

## 开启一些调式信息
debug: false

## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
## 开启后,接口中任何 sessionKey 不需要传递参数
## 若 console 中有多个 bot 登录,则行为未定义
## 确保 console 中只有一个 bot 登陆时启用
singleMode: false

## 历史消息的缓存大小
## 同时,也是 http adapter 的消息队列容量
cacheSize: 4096

## adapter 的单独配置,键名与 adapters 项配置相同
adapterSettings:
  ## 详情看 http adapter 使用说明 配置
  http:
    # 0.0.0.0是允许远程访问,localhost只能同机器访问
    host: 0.0.0.0
    port: 8888
    cors: ["*"]
    unreadQueueMaxSize: 100
  
  ## 详情看 websocket adapter 使用说明 配置
  ws:
    host: localhost
    port: 8080
    reservedSyncId: -1
  1. 启动mirai即可:
./mcl

        首次启动会自动下载jar包。等待启动完成后,输入"?",可以查看所有支持的mcl命令。

  1. 使用以下命令即可登录QQ号:
/login  [password] 

        如果想要启动mcl后自动登录QQ号,可以用:

/autoLogin add   

        也可以设置不同的设备登录。

/autoLogin setConfig  protocol ANDROID_PAD

        它对应的配置文件其实就在:config/Console/AutoLogin.yml

  1. 现在QQ风控很严了,第一次登录很有可能遇到“需要滑动验证码”的。建议申请小号使用,以免发生不测。并且首次使用时在QQ“账号安全设置”中关闭“安全登录检查”、“陌生设备登录保护”。如果遇到验证码,可以尝试:

    1. 将Captcha link通过另一个QQ,发给待登录的mirai-QQ,手机登录mirai-QQ并点击链接,手动完成滑块验证,然后回到mobaxterm输入回车;

  1. 如果不行,就参考这个链接的方法:GitHub - project-mirai/mirai-login-solver-selenium: SliderCaptcha solver
  2. 还不行,再参考这个链接的方法:mirai-login-solver-selenium | mirai
  3. 还有一个小技巧可以尝试。在手机端先通过手机号登录QQ,如果没问题,再通过手机号在mirai上登录。手机建议先登录上mirai-QQ,有时可能会弹窗提示“是否允许陌生设备登录”等等,要手动点确认的。
  4. 另外,最新申请的QQ号,一般可以成功登录mirai。
  5. 如果以上都不行,目前的终极方案是使用miraiAndroidMiraiAndroid
  • 在手机上的MiraiAndroid登录QQ后导出device.json
  • 将cache目录下的3个文件account.secrets、servers.json、session.bin也复制出来

  • 接下来点击左上角, 再点击“工具”。选择你机器人的账号, 选择 导出 DEVICE.JSON 将其导出。

  • 再次回到服务器端,进入 “bots/<你的QQ号>” 下面, 将导出的 device.json 复制放入。对应的cache文件夹也复制放入。

  • 再次执行 ./mcl 启动 mirai-console 看看效果。
  • 若仍有问题,欢迎加入文末Q群交流。
  1. 至此,mcl就已经能正常接收QQ消息了。而我们的实现代码对mcl的控制,就是通过mirai-api-http插件来实现的。根据上面第4步配置的_setting.yml_文件,再参考官方API文档HttpAdapter文档,即可实现互联互通。(讲起来比较麻烦,no bibi,后面直接show me the code)。

Python控制mirai篇

        当服务器成功运行了mirai后,我们就可以在本地进行Python脚本的编写了。由于最新的mirai-api-http变更过接口规范,因此网上某些一两年前的代码已经失效了。本教程对应的mirai-api-http使用的是最新的2.x版本。
        接下来的操作,都默认已经完成“启动mcl并login了QQ号”
        在上面setting.yml中,有两个配置项值得注意,他是我们脚本可以控制的密钥:

verifyKey: 1234567890
http: port: 8888

debug输出封装

        简单封装下。直接用print也是可以的。

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()

交互授权

        在交互前,脚本需要先向mirai获取一个verifyKey,之后在每个请求时候,都需要带上这个key,也叫session。其中,参数auth_key对应了上面setting.yml里的verifyKey。

auth_key = '1234567890'

def verifySession(self, auth_key):
    """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
    data = "verifyKey": auth_key
    url = self.addr+'verify'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        return res['session']
    return None

绑定bot

        使用此方法校验并激活你的Session,同时将Session与一个已登录的Bot绑定。

qq = '121215'             # mirai登录的那个QQ
session = 'grge8484'     # 上面verifySession函数的返回值

def bindSession(self, session, qq):
    """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
    data = "sessionKey": session, "qq": qq
    url = self.addr + 'bind'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        self.session = session
        return True
    return False

释放bot

        使用此方式释放session及其相关资源(Bot不会被释放)

def releaseSession(self, session, qq):
    """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
    data = "sessionKey": session, "qq": qq
    url = self.addr + 'release'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        return True
    return False

未读消息的数量

        获取当前有多少条未读消息。

def getMessageCount(self, session):
    url = self.addr + 'countMessage?sessionKey='+session
    res = requests.get(url).json()
    if res['code'] == 0:
        return res['data']
    return 0

获取最新的消息

        获取消息后会从队列中移除。

def fetchLatestMessage(self, session):
    url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
    res = requests.get(url).json()
    if res['code'] == 0:
        return res['data']
    return None

解析消息内容

        简单实现了部分消息类型的解析,会有消息丢失,请根据使用需求自行调整。

data = 'xxx'  # 可以是上面getMsgFromGroup函数的返回值

def parseGroupMsg(self, data):
    res = []
    if data is None:
        return res
    for item in data:
        if item['type'] == 'GroupMessage':
            type = item['messageChain'][-1]['type']
            if type == 'Image':
                text = item['messageChain'][-1]['url']
            elif type == 'Plain':
                text = item['messageChain'][-1]['text']
            elif type == 'Face':
                text = item['messageChain'][-1]['faceId']
            else:
                logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append('text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name)
                return res

向好友发送消息

        向指定好友发送消息。

def sendFriendMessage(self, session, qq, msg):
    msg_list = msg.split(r'\\n')
    msg_chain = [ "type": "Plain", "text": m+'\\n'  for m in msg_list]

    data = 
        "sessionKey": session,
        "target": qq,
        "messageChain": msg_chain
    
    url = self.addr + 'sendFriendMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0

向群发送消息

        也只是简单实现。

def sendMsgToGroup(self, session, group, msg):
    text = msg['text']
    type = msg['type']
    name = msg['name']
    group_id = msg['groupId']
    group_name = msg['groupName']
    content1 = "【消息中转助手】\\n用户:\\n群号:\\n群名:\\n消息:\\n".format(
        name, group_id, group_name, text)
    content2 = "【消息中转助手】\\n用户:\\n群号:\\n群名:\\n消息:\\n".format(
        name, group_id, group_name)
    logger.DebugLog(">> 消息类型:" + type)
    if type == 'Plain':
        message = ["type": type, "text": content1]
    elif type == 'Image':
        message = [
            "type": 'Plain', "text": content2,
            "type": type, "url": text]
    elif type == 'Face':
        message = ["type": 'Plain', "text": content2,
                   "type": type, "faceId": text]
    else:
        logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
        return 0
    data = 
        "sessionKey": session,
        "group": group,
        "messageChain": message
    
    logger.DebugLog(">> 消息内容:" + str(data))
    url = self.addr + 'sendGroupMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

向群发送富文本消息

        跟上面的差不多,消息类型变了一下,从而支持类似HTML形式的消息发送。

def sendPlainTextToGroup(self, session, group, msg):
    msg_list = msg.split(r'\\n')
    msg_chain = [ "type": "Plain", "text": m+'\\n'  for m in msg_list]
    data = 
        "sessionKey": session,
        "group": group,
        "messageChain": msg_chain
    
    url = self.addr + 'sendGroupMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

        以上就是几个简单、常用的函数。基于这些函数,就已经可以实现蛮多有趣的功能了。

Q群消息转发

        这部分可以直接参考之前的博客:Q群消息转发例程。其实也就是把上面的函数整合一下,放一个完整版:

import requests
from time import sleep

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()

logger = Logger()
class QQBot:
    def __init__(self):
        self.addr = 'http://43.143.12.250:8888/'
        self.session = None

    def verifySession(self, auth_key):
        """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
        data = "verifyKey": auth_key
        url = self.addr+'verify'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return res['session']
        return None

    def bindSession(self, session, qq):
        """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
        data = "sessionKey": session, "qq": qq
        url = self.addr + 'bind'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            self.session = session
            return True
        return False

    def releaseSession(self, session, qq):
        """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
        data = "sessionKey": session, "qq": qq
        url = self.addr + 'release'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return True
        return False

    def fetchLatestMessage(self, session):
        url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return None

    def parseGroupMsg(self, data):
        res = []
        if data is None:
            return res
        for item in data:
            if item['type'] == 'GroupMessage':
                type = item['messageChain'][-1]['type']
                if type == 'Image':
                    text = item['messageChain'][-1]['url']
                elif type == 'Plain':
                    text = item['messageChain'][-1]['text']
                elif type == 'Face':
                    text = item['messageChain'][-1]['faceId']
                else:
                    logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                    continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append('text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name)
        return res

    def getMessageCount(self, session):
        url = self.addr + 'countMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def sendPlainTextToGroup(self, session, group, msg):
        msg_list = msg.split(r'\\n')
        msg_chain = [ "type": "Plain", "text": m+'\\n'  for m in msg_list]
        data = 
          "sessionKey": session,
          "group": group,
          "messageChain": msg_chain
        
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToGroup(self, session, group, msg):
        text = msg['text']
        type = msg['type']
        name = msg['name']
        group_id = msg['groupId']
        group_name = msg['groupName']
        content1 = "【消息中转助手】\\n用户:\\n群号:\\n群名:\\n消息:\\n".format(
            name, group_id, group_name, text)
        content2 = "【消息中转助手】\\n用户:\\n群号:\\n群名:\\n消息:\\n".format(
            name, group_id, group_name)
        logger.DebugLog(">> 消息类型:" + type)
        if type == 'Plain':
            message = ["type": type, "text": content1]
        elif type == 'Image':
            message = [
                "type": 'Plain', "text": content2,
                "type": type, "url": text]
        elif type == 'Face':
            message = ["type": 'Plain', "text": content2,
                       "type": type, "faceId": text]
        else:
            logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
            return 0
        data = 
                "sessionKey": session,
                "group": group,
                "messageChain": message
                
        logger.DebugLog(">> 消息内容:" + str(data))
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
        # 对每条消息进行检查
        for msg in msg_data:
            group_id = msg['groupId']
            # 接收的消息群正确(目前只支持 消息类型)
            if group_id in receive_groups:
                # 依次将消息转发到目标群
                for g in send_groups:
                    logger.DebugLog(">> 当前群:"+g)
                    if g == group_id:
                        logger.DebugLog(">> 跳过此群")
                        continue
                    res = self.sendMsgToGroup(session, g, msg)
                    if res != 0:
                        logger.TraceLog(">> 转发成功!".format(g))

    def sendFriendMessage(self, session, qq, msg):
        msg_list = msg.split(r'\\n')
        msg_chain = [ "type": "Plain", "text": m+'\\n'  for m in msg_list]

        data = 
          "sessionKey": session,
          "target": qq,
          "messageChain": msg_chain
        
        url = self.addr + 'sendFriendMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0

def qqTransfer():
    with open('conf.json', 'r+', encoding="utf-8") as f:
        content = f.read()
    conf = json.loads(content)

    auth_key = conf['auth_key']
    bind_qq = conf['bind_qq']
    sleep_time = conf['sleep_time']
    debug_level = conf['debug_level']

    receive_groups = conf['receive_groups']
    send_groups = conf['send_groups']

    logger.setDebugLevel(debug_level)

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if cnt:
            logger.DebugLog('>> 有消息了 => '.format(cnt))
            logger.DebugLog('获取消息内容')
            data = bot.fetchLatestMessage(session)
            if len(data) == 0:
                logger.DebugLog('消息为空')
                continue
            logger.DebugLog(data)
            logger.DebugLog('解析消息内容')
            data = bot.parseGroupMsg(data)
            logger.DebugLog(data)
            logger.DebugLog('转发消息内容')
            bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
        # else:
        #     logger.DebugLog('空闲')
        sleep(sleep_time)
    bot.releaseSession(session, bind_qq)

        其中,conf.json内容为:


  "auth_key": "1234567890",
  "bind_qq":  "123456",                                                     # mirai登录的QQ (复制时记得删我)
  "sleep_time": 1,
  "receive_groups": ["913182235", "977307922"],  # 要接受消息的群 (复制时记得删我)
  "send_groups": ["913182235", "977307922"],         # 要发送消息的群 (复制时记得删我)
  "debug_level": "debug"

        下面,我们就先从类似QMsg酱的消息通知开始。

类似QMsg酱的消息通知

设计目标:通过调用指定的URL,小锋仔机器人就会给指定的好友发送指定的消息。
        关于QMsg酱的使用教程可以看:免费的QQ微信消息推送机器人
        前面我们特地开放了9966端口,因此可以使用Flask来监听这个端口。
        本着越简单越好的原则,我们把“发给好友还是群”、“目标好友或群的号”、“发送的内容”三部分都拼接到URL上,因此有:

http://43.143.12.250:9966/QQ/send/friend?target=123&msg=hello
http://43.143.12.250:9966/QQ/send/group?target=123&msg=hello

因此,代码可以写成:

from flask import Flask, request

app = Flask(__name__)

@app.route('/QQ/send/friend', methods=['GET'])
def qqListenMsgToFriend():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World! Friend!'

@app.route('/QQ/send/group', methods=['GET'])
def qqListenMsgToGroup():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendPlainTextToGroup(bot.session, qq, msg)
    return 'Hello World! Group!'

if __name__ == '__main__':
    app.run(port='9966', host='0.0.0.0')

        由于Flask和小锋仔QQBot都要阻塞运行,因此稍微变动一下,让小锋仔以子线程的形式运行即可。

if __name__ == '__main__':
    t = threading.Thread(target=qqTransfer)
    t.setDaemon(True)
    t.start()

    app.run(port='9966', host='0.0.0.0')

        测试一下:

http://localhost:9966/QQ/send/friend?target=1061700625&msg=hello


        如果我们把这个脚本放到服务器上去运行,那么链接就变成了:

http://43.143.12.250:9966/QQ/send/friend?target=1061700625&msg=hello

        当然,能发消息的前提是“先加好友”或“加群”啦。

多功能切换的实现设计

        上面我们进行了简单地尝鲜。
1、从这部分开始,我们涉及的功能比较杂,为了能更好的区分功能,需要设计一个简单的交互协议。

  • 我们发送的内容可以分为:功能选择 与 消息详情
  • 为了区分他俩,可以在选择功能时添加指定前缀,如“CMD+翻译”;
  • 小锋仔接收到后,进入翻译模式准备;
  • 发送指令详情时,就不加前缀。而小锋仔则将收到的消息进行翻译,再把结果返回。

        根据以上内容,小锋仔需要记录的状态信息至少有:

class StatusStore:
    def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
        self.from_qq = from_qq          # 发送者的QQ号
        self.is_cmd = is_cmd            # 是否是指令(选择功能)
        self.func_name = func_name      # 选择的功能的名称
        self.need_second = need_second  # 是否需要经过两步:先发cmd指令,再发详细内容
        self.msg = msg                  # 本次发送的消息内容
    
    def detail(self):
        return self.__dict__

2、并且我们设置,只有从指定QQ发过来消息,才能响应。因此在接收到消息时,需要判断对方的信息。对于好友类型的消息,mirai返回格式如消息类型说明


  "type": "FriendMessage",
  "sender": 
    "id": 123,
    "nickname": "",
    "remark": ""
  ,
  "messageChain": [] // 数组,内容为下文消息类型

因此,我们可以从"type"和 "sender:id"入手判断。
3、我们暂时考虑只有一个主QQ能发送指令的情况。
4、定义一个类来专门管理不同功能的函数,例如:

class MultiFunction:
    """多功能函数集合"""
    def __init__(self) -> None:
        pass

    @staticmethod
    def translate(original:str, convert:str='zh2en') -> str:
        return '假装是翻译结果' 
    
    @staticmethod
    def uploadImage(image_path:str) -> str:
        return '假装是上传结果' 

    @staticmethod
    def weather(city:str) -> str:
        return '假装是天气结果' 
    
    @staticmethod
    def hotNews(status_store:StatusStore) -> str:
        return '假装是热搜结果' 


# 多功能函数的映射
# function: 功能对应函数名
# need_second: 是否需要经过两步:先发cmd指令,再发详细内容
# desc: 需要经过两步时,第一次返回的提示语
function_map = 
    '翻译': 'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~', 
    '天气': 'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?', 
    '热搜': 'function': MultiFunction.hotNews, 'need_second': False


def choiceFunction(store_obj:StatusStore):
    res = ''
    if function_map.get(store_obj.func_name):
        res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
    return res 

5、大致实现流程的想法是:


        对应代码实现:

def analyzeFriendMsg(self, data):
    if data is None or data['type'] != 'FriendMessage':
        return None, None, None
    sender_id = data['sender']['id']
    msg_type = data['messageChain'][-1]['type']
    if msg_type == 'Plain':
        msg_text = data['messageChain'][-1]['text']
    elif msg_type == 'Image':
        msg_text = data['messageChain'][-1]['url']
    else:
        msg_text = ''
        return sender_id, msg_type, msg_text

        最终的框架就是:

def xiaofengzai():
    auth_key = '1234567890'     # settings.yml中的verifyKey
    bind_qq = '3126229950'      # mirai登录的QQ
    target_qq = '1061700625'    # 我们自己用的主QQ
    target_qq = int(target_qq)  # 接收到的消息里,QQ是int类型的
    sleep_time = 1              # 轮询间隔
    status_store = 

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if not cnt:
            sleep(sleep_time)
            continue
        logger.DebugLog('>> 有消息了 => '.format(cnt))
        logger.DebugLog('获取消息内容')
        data = bot.fetchLatestMessage(session)
        if len(data) == 0:
            logger.DebugLog('消息为空')
            sleep(sleep_time)
            continue
        logger.DebugLog(data)
        logger.DebugLog('解析消息内容')

        sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
        if not sender_id or sender_id != target_qq:
            sleep(sleep_time)
            continue

        if msg_text.strip().lower().startswith('cmd'):
            _, func_name = msg_text.strip().split('\\n')[0].split()
            func_name = func_name.strip()
            store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
            # 不需要发两次,直接调用函数返回结果即可
            func_info = function_map.get(func_name)
            if not func_info:
                res = '指令[]暂不支持'.format(func_name)
            elif func_info.get('need_second'):
                res = '收到你的指令:\\n'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
                # 添加或更新记录
                status_store[sender_id] = store_obj
            else:
                res = '请求结果为:\\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        else:
            res = '请先发送指令哦...'
            store_obj = status_store.get(sender_id)
            if store_obj and store_obj.is_cmd:
                store_obj.msg = msg_text
                res = '请求结果为:\\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        
        bot.sendFriendMessage(session, qq=sender_id, msg=res)

        看一下效果:


        至此,骨架有了,接下来开始填充功能了。

翻译查询

        根据上面的骨架可知,我们只需要实现MultiFunction类下的translate函数即可。如果想快速测试函数效果,可以使用以下代码,而不用先启动mirai:

res = choiceFunction(StatusStore(func_name='翻译', msg='你好'))
print(res)

领取腾讯免费翻译API

        要做翻译,最方便的就是调用API了(没错,调包侠!)。
        这里使用腾讯的翻译API,可以免费领取:领取腾讯翻译API。点进链接后,往下拖到“云产品体验”专区,选择“人工智能”,下面有“机器翻译”。他的调用量是每月更新,非常的良心了。


        点击“立即体验”,进入控制台界面,虽然上面显示的是“开通付费版”,但不用担心,他是有免费额度的,更何况你账户里又没充余额,哈哈哈。

        支持很多类型的翻译,这次我们先选文本翻译机器翻译 文本翻译-API 文档-文档中心-腾讯云

        我们用SDK的方式,免去了自己封装复杂的加密步骤:

pip install --upgrade tencentcloud-sdk-python

        然后去获取密钥API密钥管理,记下APPID、SecretId、SecretKey


机器人接入翻译功能

        小锋仔bot结合翻译功能,直接上代码:

import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tmt.v20180321 import tmt_client, models

def translate(original:str, convert:str='en'):
    secretId = 'xxx'  # 从API控制台获取
    secretKey = 'xxx' # 从API控制台获取
    AppId = 12123     # 从API控制台获取
    try:
        cred = credential.Credential(secretId, secretKey)
        client = tmt_client.TmtClient(cred, "ap-guangzhou")
        req = models.TextTranslateRequest()
        params = 
            "SourceText": original,
            "Source": "auto",
            "Target": convert,
            "ProjectId": AppId
        
        req.from_json_string(json.dumps(params))
        resp = client.TextTranslate(req)
        # print(resp.to_json_string())
        return resp.TargetText
    except TencentCloudSDKException as err:
        print(err)
        return ''

        使用测试效果:

print(choiceFunction(StatusStore(func_name='翻译', msg='你好')))

# 输出:
"TargetText": "Hello", "Source": "zh", "Target": "en", "RequestId": "a1b17f47-751e-44cd-89a5-6a22e9f2c444"
Hello


实时天气

领取免费的和风天气API

        天气部分,我们是用免费的和风天气API:实时天气 - API
        首先也要进行登录并获取KEY,这个步骤官网讲的很详细,图文并茂的,这边就不多写了,大家可以跳转过去(注意我们选的是Web API):创建应用和KEY - RESOURCE

机器人接入天气功能

        同样的,直接上代码:

def weather(city:str) -> str:
    url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
    url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
    weather_key = 'xxxxx'  # 和风天气控制台的key

    # 实况天气
    def getCityId(city_kw):
        url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
        city = requests.get(url_v2).json()['location'][0]
        return city['id']

    city_name = '广州'
    city_id = getCityId(city_name)
    url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
    res = requests.get(url).json()
    text = "<天气信息获取失败>"
    if res['code'] == '200' or res['code'] == 200:
        text = '实时天气:\\n 亲爱的 小主, 您所在的地区为 ,\\n 现在的天气是 ,\\n 气温 °, 湿度 %,\\n 体感气温为 °,\\n 风向 , 风速 km/h'.format(
            city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed']) 
    return text 

        测试效果:


阿里内部第一本“凤凰架构”,保姆级教你构建可靠大型分布式系统

帝王级教你如何注册使用ChatGPT教程及登录注册安装方法

保姆级教你三字诀15天玩会Linux系统-避免踩坑!!!收藏不亏

Python应用实战案例-pyspark库从安装到实战保姆级讲解

Python如何使用图灵的API Key搭建聊天机器人?

程序汪若依微服务华为云Linux部署保姆教程