python alicebot(ok,这个alicebot先停一下,看Nonebot)

Posted 蹇爱黄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python alicebot(ok,这个alicebot先停一下,看Nonebot)相关的知识,希望对你有一定的参考价值。

关于Alicebot

介绍

AliceBot 是一个简单的 Python 异步多后端机器人框架,支持多种协议适配,可以轻松地编写易用学习和使用的插件来拓展其功能。
AliceBot 仅支持 Python 3.8+ 版本。

它是如何工作的?

AliceBot 在使用时,首先需要你实例化一个 Bot 对象,Bot 对象负责加载配置文件,维护一个插件列表和一个适配器列表并提供了一个供适配器使用的通用的事件分发方法。

协议适配器(Adapter)负责和协议后端进行通讯,当协议后端发送一个新的事件(Event)给适配器后,适配器会按插件优先级分发事件给各个插件进行处理。

插件(Plugin)是由你编写的用于处理事件的类,它必须具有两个方法,rule() 和 handle() ,rule() 方法要求返回一个 bool 值,表示当前事件是否要交由此插件处理,当 rule() 方法返回 True 时,handle() 方法会被调用以进行事件处理。

按priority(优先权)来分配

使用Alicebot

安装

使用 Python 软件包安装程序(pip)进行安装:

pip install alicebot

安装适配器

AliceBot 本身只是一个聊天机器人框架,需要额外安装对应协议的适配器来支持特定的协议,你可以使用 pip 安装协议适配器:
每条都执行以下就行了

pip install alicebot-adapter-cqhttp
pip install alicebot-adapter-mirai
pip install alicebot-adapter-dingtalk

第一个项目

1、新建项目

我是在windows上进行使用的,所以直接打开pycharm新建一个项目就行了

2.创建一个 main.py 文件并写入以下内容

from alicebot import Bot

bot = Bot()

if __name__ == "__main__":
    bot.run()

3.创建一个 config.toml 文件并写入以下内容

[bot]
plugin_dirs = ["plugins"]
adapters = ["alicebot.adapter.cqhttp"]

4.创建一个 plugins 目录

mkdir plugins

5.试试运行 main.py 吧!

python main.py
你应该会看到以下输出的日志

目录结构

AliceBot 推荐的目录结构如下:

.
├── plugins (插件目录)
│   └── xxx.py
├── config.toml (配置文件)
└── main.py

其中 main.py 和 config.toml 文件如上文所示。

配置协议端

上面的例子中使用了 alicebot.adapter.cqhttp 协议适配器,它是 OneBot v11 协议(原 CKYU 平台的 CQHTTP 协议)的适配器,需要一个兼容 OneBot 协议的协议端进行通讯,以下是一些常用的支持 OneBot 协议的 QQ 协议端:
go-cqhttp
mirai + onebot-kotlin
oicq
更多信息详见 CQHTTP 协议使用指南 。

你也可以安装其他协议适配器或者尝试自己写一个协议适配器。

基本配置

配置文件

AliceBot 的所有配置均储存在配置文件 config.toml 文件中。
config.toml 文件为标准的 TOML v1.0.0 文件。TOML 是一种“语义明显且易于阅读的最小化配置文件格式”。建议在开始之前先简单了解一下 TOML 语言的基础格式。
AliceBot 的配置项将存储在不同的表(table)中,AliceBot 自身的配置位于 bot 表中,而所有适配器和插件的配置则分别位于 adapter 和 plugin 表中。
AliceBot 自身拥有如下配置项:

  • plugins

将被加载的插件列表,将被 Bot 类的 load_plugins() 方法加载。

  • plugin_dirs

将被加载的插件目录列表,将被 Bot 类的 load_plugins_from_dirs() 方法加载。

  • adapters

将被加载的适配器列表,将依次被 Bot 类的 load_adapters() 方法加载。

与日志记录相关的配置位于 bot.log 表中,如下:

  • level

日志级别。

  • verbose_exception

详细的异常记录,设置为 True 时会在日志中添加异常的 Traceback。

不同的适配器或插件的配置将位于 adapter 和 plugin 的子表中。
例如一个包含 cqhttp 适配器配置的配置文件如下:

[bot]
## 这里是 AliceBot 自身的配置
plugins = []
plugin_dirs = ["plugins"]
adapters = ["alicebot.adapter.cqhttp"]

[bot.log]
## 这里是 AliceBot 自身日志相关的配置
level = "INFO"
verbose_exception = true

[adapter.cqhttp]
## 这里是 CQHTTP 适配器的配置
adapter_type = "reverse-ws"
host = "127.0.0.1"
port = 8080
url = "/cqhttp/ws"

你可以写入任意未被定义的配置项,它们同样会被 AliceBot 加载,这些自定义配置可以用于插件中,如你可以通过定义 superuser 来表示控制当前当前机器的特殊用户,或者用 nickname 表示当前机器人的昵称。

## 这里是任意未被使用的键名
superuser = 10001
nickname = "小爱"

[bot]
## 这里是 AliceBot 自身的配置
plugins = []
plugin_dirs = ["plugins"]
adapters = ["alicebot.adapter.cqhttp"]

在插件中,整个配置可以通过 self.bot.config 来访问。例如:

from alicebot import Plugin


class HalloAlice(Plugin):
    async def handle(self) -> None:
        await self.event.reply(f"Hello, I am self.bot.config.nickname!")

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return (
            self.event.user_id == self.bot.config.superuser
            and str(self.event.message).lower() == "hello"
        )

自定义配置文件或不使用配置文件

你可以在实例化 Bot 对象时传入 config_file 或 config_dict 属性来实现自定义配置文件或者不使用配置文件或者直接在 Python 文件中配置。

AliceBot 会判断 config_file 的拓展名,允许 .toml 或 .json 文件。如果配置文件是 JSON 文件,则要求文件是使用 UTF-8 编码的标准 JSON 文件,内容等同于上述 TOML 格式配置文件对应的 JSON 格式内容。

当指定 config_dict 属性时,AliceBot 将不再读取配置文件,并直接从给定的配置字典读取配置。

# 自定义配置文件名
bot = Bot(config_file="my_config.json")

# 不使用配置文件
bot = Bot(
    config_dict=
        "bot": 
            "plugin_dirs": ["plugins"],
            "adapters": ["alicebot.adapter.cqhttp"],
        
    
)

插件基础

加载插件

在 快速上手 一章中我们创建了一个 plugins 目录,并在配置文件中设置其为插件目录。所以,放入 plugins 目录的任何不以 _ 开头的 Python 模块都会被自动加载并测试是否是插件。你也可以不配置 plugin_dirs 并通过以下方式“程序式”地加载插件。
但通常情况下,不需要使用下面的方法,推荐通过配置 plugin_dirs 或 plugins 配置项来加载插件。

加载插件目录

在 main.py 文件中添加以下行:

from alicebot import Bot

bot = Bot()
bot.load_plugins_from_dirs(["plugins", "/home/xxx/alicebot/plugins"])

if __name__ == "__main__":
    bot.run()

这实际上和配置 plugin_dirs 为 ["plugins", "/home/xxx/alicebot/plugins"] 是相同的,目录可以是相对路径或者绝对路径。以 _ 开头的插件不会被加载。

加载单个插件

在 main.py 文件中添加以下行:

from alicebot import Bot

class TestPlugin(Plugin):
    pass

bot = Bot()
bot.load_plugins("plugins.hello", TestPlugin)

if __name__ == "__main__":
    bot.run()

load_plugins 方法可以传入插件类、字符串或者 pathlib.Path 对象,如果为后两者会分别视为插件模块的名称(格式和 Python import 语句相同)和插件模块文件的路径进行加载。

编写插件

每个插件即是一个插件类。

虽然并非强制,但建议将一个插件类存放在一个单独的 Python 模块中。

.
├── plugins
│   ├── a.py (单个 Python 文件)
│   └── b (Python 包)
│      └── __init__.py
├── config.toml
└── main.py

插件类都必须是 Plugin 类的子类,并必须实现 rule() 和 handle() 方法。

from alicebot import Plugin


class TestPlugin(Plugin):
    priority: int = 0
    block: bool = False

    async def handle(self) -> None:
        pass

    async def rule(self) -> bool:
        return True

其中类名 TestPlugin 建议不要重复。
priority 属性表示插件的优先级,数字越小表示优先级越高。

block 属性表示当前插件执行结束后是否阻止事件的传播,如果设置为 True ,则当前插件执行完毕后会停止当前事件传播,比当前插件优先级低的插件将不会被执行。

以上两个属性都是可选的,默认为 0 和 False 。

如 它是如何工作的? 一节中所说,当协议适配器产生一个事件(比如机器人接收到了一条消息)后,会按照优先级分发事件给各个插件,AliceBot 会依次执行每个插件的 rule() 方法,根据返回值判断是否要执行 handle() 方法。

Plugin 类内置了一些属性和方法:

  • self.event :当前正在被此插件处理的事件。
  • self.name :插件类名称。
  • self.bot :机器人对象。
  • self.config :插件配置。
  • self.state :插件状态。
  • self.stop() :停止当前事件传播。
  • self.skip() :跳过自身继续当前事件传播。
    除了 self.event 之外的属性和方法将在 插件进阶 一节中详细介绍。

不同适配器产生的事件是不同的,下文以 CQHTTP 适配器为例编写一个 Hello 插件。

编写 rule() 方法

from alicebot import Plugin


class HalloAlice(Plugin):
    async def handle(self) -> None:
        pass

    async def rule(self) -> bool:
        return (
            self.event.adapter.name == "cqhttp"
            and self.event.type == "message"
            and str(self.event.message).lower() == "hello"
        )

如果你觉得把所有条件都写在一行不够好看的话也可以这样写:

from alicebot import Plugin


class HalloAlice(Plugin):
    async def handle(self) -> None:
        pass

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return str(self.event.message).lower() == "hello"

由于不同的适配器产生的事件是不同的,所以应该先判断产生当前事件的适配器名称。

然后应该判断当前事件的类型,CQHTTP 适配器产生的事件的类型有:message 、 notice 和 request ,只有 message 类型的适配器才有 message 属性,这个插件只对消息事件进行响应。

CQHTTP 适配器消息事件的 message 属性表示当前接收到的消息,类型是 CQHTTPMessage ,是 AliceBot 内置 Message 类的子类。

AliceBot 内置的 Message 类实现了许多实用的方法,建议所有适配器开发者尽可能使用,具体使用在 插件进阶 中有提到。在这里可以直接使用 str() 函数将 Message 类型的 self.event.message 转换为字符串。

除此之外,在 rule() 方法中常用的还有 self.event.message.startswith(\'xxx\') 和 self.event.message.endswith(\'xxx\') 。相当于字符串的 startswith() 和 endswith() 方法。

编写 handle() 方法

from alicebot import Plugin


class HalloAlice(Plugin):
    async def handle(self) -> None:
        await self.event.reply("Hello, Alice!")

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return str(self.event.message).lower() == "hello"

正如上面所说的,当 rule() 方法返回 True 时, handle() 方法将会被调用。这里,我们使用了 message 类型的事件的一个方法,reply() 用于快捷回复当前消息,而不需要额外指定发送消息的接收者。

reply() 方法是一个异步方法,所以在调用它时必须加上 await ,表示等待它直到返回结果。

下面,让我们再看一个例子来学习更多用法。

示例:天气插件

from alicebot import Plugin
from alicebot.exceptions import GetEventTimeout


class Weather(Plugin):
    async def handle(self) -> None:
        args = self.event.get_plain_text().split(" ")
        if len(args) >= 2:
            await self.event.reply(await self.get_weather(args[1]))
        else:
            await self.event.reply("请输入想要查询天气的城市:")
            try:
                city_event = await self.event.adapter.get(
                    lambda x: x.type == "message", timeout=10
                )
            except GetEventTimeout:
                return
            else:
                await self.event.reply(
                    await self.get_weather(city_event.get_plain_text())
                )

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return self.event.message.startswith("天气")

    @staticmethod
    async def get_weather(city):
        if city not in ["北京", "上海"]:
            return "你想查询的城市暂不支持!"
        return f"city的天气是..."

你可以通过向机器人发送 天气 北京 格式的消息来获取天气信息,同时,也可以只发送 天气 ,这时机器人会向你询问城市,再次发送要查询的城市名称即可查询天气。

在这个例子中,插件需要去进一步获取接收到的下一条消息,这里,我们使用了 get() 方法,它的使用方法可以参考 API 文档

简而言之,它就是用于获取符合条件的事件的,同样的,它也是一个异步方法,请使用 await 等待。

需要注意的是, get() 方法可以指定超时时间,以避免插件一直等待获取事件,当超时发生时,它会引发 alicebot.exception.GetEventTimeout 异常,请留心对这种情况的处理。

此外,这里的 get_weather() 方法并没有接入真正的天气数据,你可以使用任意的天气 API 代替,但请注意,在进行网络请求时需要使用基于协程异步的网络库,如 aiohttp 或 httpx ,而不能使用如 requests 等同步的网络请求库,这会导致程序被阻塞。

相信阅读到这里,你应该已经可以写出一个 AliceBot 插件了。接下来建议你继续阅读 插件进阶 和你将要使用的适配器的教程。

插件进阶

事件传播控制

有时候,我们可能需要对事件的传播进行一些控制,除了基础的定义 block 属性来决定此插件执行结束后是否继续进行事件传播外,AliceBot 还提供了一些方法以供高级的逻辑控制。

skip() 方法

skip() 方法用于跳过当前插件继续事件传播。通常来说,你也可以在 handle() 方法使用 return 语句实现类似的效果,但这个方法在某些情况下可以简化一些操作。如:

from alicebot import Plugin


class TestPlugin(Plugin):
    async def handle(self) -> None:
        await self.foo()

    async def rule(self) -> bool:
        return True

    async def foo(self):
        self.skip()

也就是 skip() 方法可以在插件类的任何方法内被调用并立刻生效。

stop() 方法

stop() 方法用于结束当前事件传播。但请注意,当此方法被调用后当前事件传播并不会被立即结束,与本插件同优先级的插件仍会被执行完毕。也就是说,本方法的作用是使比当前插件优先级低的插件不被执行。

stop() 方法和 block 属性

你可能会发现,设置 block 属性为 True 和在 handle() 方法最后加一句 self.stop() 没什么区别。实际上,它们在大多数情况下确实没有区别,除了一种情况,即当 handle() 方法中出现异常时,最后的 self.stop() 语句不会被执行,但 block 属性仍会起效。也就是, block 属性为 True 和下面的示例效果大致上等同:

from alicebot import Plugin


class TestPlugin(Plugin):
    async def handle(self) -> None:
        try:
            # do something
            pass
        finally:
            self.stop()

    async def rule(self) -> bool:
        return True

原理

实际上,这两个方法是使用 Python 的异常处理实现的,所以,在编写插件时请注意不要捕获所有异常,如:

# 错误:skip() 方法不会生效
from alicebot import Plugin


class TestPlugin(Plugin):
    async def handle(self) -> None:
        try:
            self.skip()
        except BaseException:
            pass

    async def rule(self) -> bool:
        return True

而应该至少使用:

from alicebot import Plugin


class TestPlugin(Plugin):
    async def handle(self) -> None:
        try:
            self.skip()
        except Exception:
            pass

    async def rule(self) -> bool:
        return True

状态存储

插件类在每次处理事件时都会被实例化,也就是说,对于每个事件,都是不同的 Plugin 实例,但有时候,我们可能会希望插件可以存储一些数据,每次实例化后都可以使用,这时候就要用到状态存储了。
插件类有一个 state 属性,你可以将其视为一个普通的属性,初始值是 None ,但是,它会被永久存储下来,即使在同一个插件的不同的实例中访问它也会得到相同的值。
下面我们使用一个计数插件为例:

from alicebot import Plugin


class Count(Plugin):
    async def handle(self) -> None:
        if self.state is None:
            self.state = 0
        self.state += 1
        await self.event.reply(f"count: self.state")

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return self.event.message.get_plain_text() == "count"

每次收到 count 消息,插件都会回复一个数字,并且是递增的。

你可以根据需要对 state 赋值为任何数据。

但是 state 是被存储在内存中的,则意味着,当你关闭 AliceBot 再次打开,并不会得到存储的状态,你可以通过 Python 自带的 pickle 或 json 库进行序列化,将状态保存到文件中。

此外, state 每个插件是独立的,它可以在同一个插件的不同实例中共享,但不能在不同插件中共享,如果你需要在不同插件中共享状态,可以使用全局状态,即 self.bot.global_state 属性,全局状态是一个 Python 字典。

下面的示例是两个不同的插件,实现分别对一个数字全局状态进行增减。

from alicebot import Plugin


class GlobalStateTest1(Plugin):
    async def handle(self) -> None:
        if self.bot.global_state.get("count", None) is None:
            self.bot.global_state["count"] = 0
        self.bot.global_state["count"] += 1
        await self.event.reply(f\'add: self.bot.global_state["count"]\')

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return self.event.message.get_plain_text() == "add"
from alicebot import Plugin


class GlobalStateTest2(Plugin):
    async def handle(self) -> None:
        if self.bot.global_state.get("count", None) is None:
            self.bot.global_state["count"] = 0
        self.bot.global_state["count"] -= 1
        await self.event.reply(f"sub: self.bot.global_state[\'count\']")

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return self.event.message.get_plain_text() == "sub"

插件配置

在 基础配置 一节中提到,你可以直接通过 self.bot.config 访问当前机器人的配置。但有时我们可能会希望插件配置像适配器配置一样放在一个独立的键中,以提高这个插件的可移植性,那么我们可以这样处理:

from alicebot import Plugin, ConfigModel


class TestPlugin(Plugin):
    class Config(ConfigModel):
        __config_name__ = "test_plugin"
        a: str = "test"
        b: int = 1

    async def handle(self) -> None:
        print(self.config.test_plugin.a)
        print(self.config.test_plugin.b)

    async def rule(self) -> bool:
        return True

config.toml 文件对应如下:

[plugin.test_plugin]
a = "abc"
b = 123

需要在插件类内编写一个名称为 Config 的继承于 alicebot.ConfigModel 的类,这是一个 pydantic 的模型类,具体可以参考 pydantic 的文档 ,简而言之,格式为:

变量名称: 类型[ = 默认值]

如果不指定默认值,且类型不是 Optional[...], Union[None, ...] 或 Any,则这个字段是必填的,在非必需的情况下,建议不要在插件中使用必填字段,这会导致不指定这个字段时,不只是这个插件,整个 AliceBot 都不能运行。

特别的是,Config 类中必须要有一个 config_name 属性,表示这个插件在配置文件中对应的键名。

如果你将插件类放在 Python 包中的话,建议把 Config 类放在单独的 config.py 文件中。

然后在 init.py 中导入:

from alicebot import Plugin

from .config import Config


class TestPlugin(Plugin):
    Config = Config

    async def handle(self) -> None:
        print(self.config.test_plugin.a)
        print(self.config.test_plugin.b)

    async def rule(self) -> bool:
        return True

注意

配置类的类名必须为 Config 且必须包含 config_name 属性。

get() 方法

在 插件基础 中相信你已经使用过了 get() 方法。

实际上,Bot 和 Adapter 类都有一个 get() 方法。

self.bot.get()
self.event.adapter.get()

它们的区别在于,Bot 的 get() 方法会捕获所有适配器产生的事件,而 Adapter 的 get() 方法仅会捕获此适配器产生的事件,即相当于在条件中隐含了判断适配器是自身的条件。

初始化后处理

插件类会在事件传播时被实例化,对于每个事件都会实例化一个新的插件类,如果你需要在被实例化时就进行一些操作,为了避免一些不必要的麻烦,建议不要直接重写 init() 方法。为此,AliceBot 提供了一个 post_init() 方法。post_init() 方法将被 init() 方法在最后调用,默认没有定义任何内容,你可以通过重写该方法进行初始化后处理。

初始化后处理

插件类会在事件传播时被实例化,对于每个事件都会实例化一个新的插件类,如果你需要在被实例化时就进行一些操作,为了避免一些不必要的麻烦,建议不要直接重写 init() 方法。为此,AliceBot 提供了一个 post_init() 方法。post_init() 方法将被 init() 方法在最后调用,默认没有定义任何内容,你可以通过重写该方法进行初始化后处理。

from alicebot import Plugin


class TestPlugin(Plugin):
    def __post_init__(self):
        pass

    async def handle(self) -> None:
        pass

    async def rule(self) -> bool:
        return True

输出日志

如果你想要在插件中输出到控制台什么东西的话,建议不要直接使用 print(),可以这样输出日志:

from alicebot import Plugin
from alicebot.log import logger


class TestPlugin(Plugin):
    async def handle(self) -> None:
        logger.info("TestPlugin Processing!")

    async def rule(self) -> bool:
        return True

为多个插件的共用部分创建基类

如果你编写了许多个有着不少相同的部分的插件,编写多次相同代码显然不是一个优雅的解决方案,我们可以利 用类的继承创建一个所有插件共用的基类。

from abc import ABC

class BasePlugin(Plugin, ABC):
    ...

注意:这个插件基类必须像上面那样显式继承自 abc.ABC 类。

插件类的泛型支持

插件类 Plugin 是一个泛型类,可以提供更加完善的类型提示。
比如::

from typing import Union

from alicebot import Plugin, ConfigModel
from alicebot.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent


class Config(ConfigModel):
    prefix: str = "count: "
    suffix: str = ""


class Count(Plugin[Union[PrivateMessageEvent, GroupMessageEvent], int, Config]):
    Config = Config

    async def handle(self) -> None:
        if self.state is None:
            self.state = 0
        self.state += 1
        await self.event.reply(
            self.config.prefix + str(self.state) + self.config.suffix
        )

    async def rule(self) -> bool:
        if self.event.adapter.name != "cqhttp":
            return False
        if self.event.type != "message":
            return False
        return self.event.message.get_plain_text() == "count"

这里相当于指明了此插件所处理的事件的类型为 Union[PrivateMessageEvent, GroupMessageEvent] ,储存的状态类型为 int ,配置类为 Config。

内置消息

AliceBot 内置了一个消息类,并且建议所有适配器开发者尽可能使用,它提供了许多实用的功能,可以方便地构造富文本消息。

AliceBot 内置了 Message 和 MessageSegment 类,即消息和消息字段。

大多数适配器的消息类均是内置消息类的子类,但又有一些特殊的用法,可以参考适配器的文档。

内置的消息类和消息字段类基本上是对 OneBot 协议消息类的一个实现。

消息类

消息类(Message)是 list 的子类,可以视作是消息字段的列表,但额外提供了以下功能:

重写了 init() 方法可以在初始化时传入 str, Mapping, Iterable[Mapping], MessageSegment, Message 类型的对象,其中 str 原生并不支持,需要适配器开发者实现。当传入与自身类型相同的 Message 对象时,会产生一个新的内容相同的 Message 对象。当传入 MessageSegment 对象时,会将此消息字段添加到列表中。而 Mapping 和 Iterable[Mapping] 主要是为了在适配器产生事件时方便使用 pydantic 进行处理,普通用户无需关心。

msg_seg = MessageSegment()
mas_seg.type = "text"
msg_seg["text"] = "Hello"
msg = Message(msg_seg)

msg = Message("Hello")  # 内置的 Message 原生并不支持这种用法

msg = Message(msg)

实现了 + 和 += 运算符可以直接与 Message,MessageSegment, str 类型的对象相加。

msg = Message()

msg_seg = MessageSegment()
mas_seg.type = "text"
msg_seg["text"] = "Hello"

msg += msg_seg
msg = msg + "Hello"  # 内置的 Message 原生并不支持这种用法

实现了 startswith() , endswith() 和 replace() 方法,类似字符串的对应方法,但可以传入 MessageSegment 或 str 类型的对象,具体请参考 API文档 。

msg.startswith("a")

消息字段

消息字段类(MessageSegment)是一个数据类,同时继承自 Mapping,之所以没有使用 pydantic 的模型类是为了方便在适配器中转化为 json。

它拥有两个字段 type 和 data ,分别表示消息字段的类型和内容。type 类型是 str, data 是 dict,你可以直接对 MessageSegment 对象使用对字典的相关操作,这和对 data 字段进行操作是相同的。如:

from alicebot.message import MessageSegment

msg_seg = MessageSegment("text")
# msg_seg.type = "text"

msg_seg["text"] = "Hello"  # 等同与 mag_seg.data[\'text\'] = \'Hello\'
print(msg_seg.get("text"))  # 等同与 print(msg_seg.data.get(\'text\'))
print(msg_seg.data)

消息字段对象也可以直接与其他对象相加,会返回一个消息类。

from alicebot.message import MessageSegment

msg_seg = MessageSegment("text")
# msg_seg.type = "text"

msg_seg["text"] = "Hello"  # 等同与 mag_seg.data[\'text\'] = \'Hello\'
print(msg_seg.get("text"))  # 等同与 print(msg_seg.data.get(\'text\'))
print(msg_seg.data)

msg_seg1 = MessageSegment("text1")
msg_seg += msg_seg1
print(msg_seg)

示例

下面让我们来实践一下,尝试使用 CQHTTP 协议适配器的消息类构建一个富文本消息。
ok我第一次看富文本的概念:富文本消息是一种包含丰富格式和多媒体内容的消息。相比纯文本消息,富文本消息可以包括更多样化的内容元素,如图片、视频、链接、表情等,同时还可以对文本进行排版、加粗、斜体、下划线、颜色等格式设置。这种消息形式通常用于在线聊天、社交媒体、邮件等场景中,能够更好地传递信息和表达情感。
下图的代码意思是:回复一条包含文本和图片的QQ消息,以响应用户对"hello"的输入。
···python
from alicebot import Plugin
from alicebot.adapter.cqhttp.message import CQHTTPMessage, CQHTTPMessageSegment

class Hello(Plugin):
async def handle(self) -> None:
msg = CQHTTPMessage()
msg += CQHTTPMessageSegment.text("Hello")
msg += CQHTTPMessageSegment.image("file:///path/hello.png")
await self.event.reply(msg)

async def rule(self) -> bool:
    if self.event.adapter.name != "cqhttp":
        return False
    if self.event.type != "message":
        return False
    return str(self.event.message) == "hello"

ok,这个alicebot先停一下,看Nonebot

全局变量如何在 Python 并行编程中工作?

【中文标题】全局变量如何在 Python 并行编程中工作?【英文标题】:How global variable works in parallel programming with Python? 【发布时间】:2021-12-16 15:24:00 【问题描述】:

我有这个代码。在顺序方法中打印消息“no ok”,而在并行方法中,打印消息 ["ok", "ok", "ok"] 而不是 ["not ok", "not ok", "不好”] 我所期望的。

如何更改变量 globVar 而不将其作为“测试”函数中的参数?

import multiprocessing

global globVar
globVar = 'ok'

def test(arg1):
    print(arg1)
    return globVar
    
if __name__ == "__main__" :
    globVar = 'not ok'

    #Sequential
    print(test(0))    
    
    #Parallel 
    pool = multiprocessing.Pool()
    argList = [0,1,2]
    result = pool.map(test,argList)
    pool.close()

【问题讨论】:

子进程将脚本作为外部模块加载,因此它们会忽略 if __name__ == '__main__' 位中的任何内容。所以他们看到globVar,因为它是在它之外定义的,如'ok' @AJ Biffl 谢谢你的回答。所以如果我把它作为参数,我只能修改变量 globVar 的值? 【参考方案1】:

TL;DR.您可以跳到最后一段以获得解决方案或阅读所有内容以了解实际情况。

您没有使用您的平台(例如windowslinux)标记您的问题,作为发布带有multiprocessing 标记的问题的指南要求您这样做;全局变量的行为(盎格鲁语的“行为”)很大程度上取决于平台。

在使用spawn 方法创建新进程的平台(例如 Windows)上,在使用 pool = multiprocessing.Pool() 语句创建的池中创建和初始化每个进程时,会创建一个新的空地址空间并创建一个新的启动 Python 解释器,重新读取并重新执行源程序,以便在最终调用工作函数 test 之前初始化地址空间。这意味着全局范围内的每个语句,即导入语句、变量声明、函数声明等,都是为此目的而执行的。但是,在新的子流程变量中,__name__not 为“__main__”,因此不会执行 if __name__ == "__main__" : 块中的任何语句。这就是为什么对于 Windows 平台,您必须将创建新进程的代码放在这样的块中。如果不这样做,将导致无限递归的进程创建循环,否则未被检测到。

因此,如果您在 Windows 下运行,您的主进程在创建池之前已将 globVar 设置为“不正常”。但是,当进程在调用test 之前被初始化时,您的源代码会被重新执行,并且每个进程(在其自己的地址空间中运行,因此拥有自己的globVar 副本)将该变量重新初始化回'ok'。 test 将看到的值,前面的语句暗示修改globVar 的本地副本不会反映回主进程。

现在在使用fork 创建新进程的平台上,例如Linux,情况有些不同。创建子进程时,每个子进程都以只读方式继承父进程的地址空间,并且只有在尝试修改内存时才会获得副本(“写入时复制”)。这显然是一种更有效的流程创建机制。所以在这种情况下,test 将看到globVar 的值为“not ok”,因为这是创建子流程时的值。但是如果test 更新globVar,“写入时复制”机制将确保它正在更新存在于本地地址空间中的globVar。所以再次主进程不会看到更新的值。

因此,让工作函数返回值作为test 函数正在执行的操作是反映主进程结果的标准方式。 您的问题是 您没有以您期望的 globVar 值开始这可以通过使用正确的 globVar 值初始化池的进程来解决initializerinitargs 参数到Pool 构造函数(参见documentation):

import multiprocessing

global globVar
globVar = 'ok'

def init_processes(gVar):
    global globVar
    globVar = gVar

def test(arg1):
    print(arg1)
    return globVar

if __name__ == "__main__" :
    globVar = 'not ok'

    #Sequential
    print(test(0))

    #Parallel
    pool = multiprocessing.Pool(initializer=init_processes, initargs=(globVar,))
    argList = [0,1,2]
    result = pool.map(test,argList)
    pool.close()
    print(result)

打印:

0
not ok
0
1
2
['not ok', 'not ok', 'not ok']

【讨论】:

感谢您的回答!它有效,您帮助我了解了问题所在。

以上是关于python alicebot(ok,这个alicebot先停一下,看Nonebot)的主要内容,如果未能解决你的问题,请参考以下文章

NLP开发Python实现聊天机器人(ALICE)

Learn Python 009: Dictionary

石子游戏 java实现及python实现

石子游戏 java实现及python实现

python---BFS

Python第四章