Python状态机(transitions模块)

Posted Jenrey

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python状态机(transitions模块)相关的知识,希望对你有一定的参考价值。

文章目录

1、为什么要用状态机?

用python做一个比较复杂的小项目,需要根据不同的输入,控制摄像头采集执行不同的任务。虽然用流程方式实现了,但阅读起来费劲,还容易出错。所以就用了状态机。

2、状态机是什么?

有限状态机(Finite-state machine, FSM),又称有限状态自动机,简称状态机,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学模型。

FSM是一种算法思想,简单而言,有限状态机由一组状态、一个初始状态、输入和根据输入及现有状态转换为下一个状态的转换函数组成。

有限状态机是有限个状态以及在这些状态之间的转移和动作等行为的数学模型,例如交通信号灯系统。

3、状态图是什么?

一般可以用状态图来对一个状态机进行精确地描述。

例如这个可乐机的状态图,这里可乐卖50美分 。

4、transitions是什么?

transitions是专门设计好的应用于python的一个有限状态机设计库。

当然状态机的设计可以有很多其他的方法,选用该方法其一是因为更加简单,其二是专业人士写的状态机代码相比于自己写的肯定更加完备一点。

官网

https://github.com/pytransitions/transitions

安装

$ pip3 install transitions

使用状态机必须定义的两个要素

  • state:状态节点
  • transition:状态转移。用于从一个状态节点移动到另一个状态节点

二、实战应用

以物体状态变化为例

1、规划state、transition

state:状态节点的说明

所谓状态机,最先接触的肯定是状态,因此在设计状态机之前需要先明确有哪几种状态。

大自然中物质有4种状态:固态、液态、气态、等离子态(等离子态是热的带电气体)。

所以这里我们规划了物质的4种状态,如下表

条件/当前状态固态 solid液态 liquid气态 gas等离子态 plasma
熔化 melt液态 liquid
蒸发 evaporate气态 gas
升华 sublimate气态 gas
电离 ionize等离子态 plasma

transition:状态转移

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vV8fPnDL-1673197686411)(…/pic/image-20230108094511221.png)]

固态→熔化→液态
液态→蒸发→气态
固态→升华→气态
气态→电离→等离子态

2、编写代码

创建一个继承object的类Matter的实体对象lump,然后调用transitions.Machine()将状态机绑定到这个实体对象上。

最终得到了两个东西,一个是状态机对象machine,一个是具体的实体对象lump

之后设定状态机是用machine,运行状态机是用具体的实体对象lump

创建一个基础类

先定义一个类,把它当成基础模型,Matter就是物质类,lump不好理解的话就理解成’冰’。

## 1、创建基础类
class Matter:
    pass

lump = Matter()

state:状态的定义

支持两种定义方式

方式一:状态可以是列表的形式,列表里的形式可以是类、字符串或字典:

from transitions import Machine, State

## 1、创建基础类
class Matter:
    pass

lump = Matter()


## 2、定义状态
states = [
    State(name='solid'),  # 类,<State('solid')@4472798992>
    'liquid',  # 字符串
    'name': 'gas',  # 字典
    "plasma"
]


## 3、将状态加入到状态机,并将状态机绑定到lump实例对象上
machine = Machine(lump, states)  # <transitions.core.Machine object at 0x10bb1a8b0>

方式二:也可以应用到transitions模块提供的State类进行初始化,再用add_states()进行添加:

from transitions import Machine, State

## 1、创建基础类
class Matter:
    pass

lump = Matter()


## 2、绑定
machine = Machine(lump)


## 3、定义状态
solid = State('solid')
liquid = State('liquid')
gas = State('gas')
plasma = State('plasma')


## 4、将状态加入到状态机上
machine.add_states([solid, liquid, gas]) # <transitions.core.Machine object at 0x10ac6df10>

callbacks相关

有三种方式:

  • 使用states的on_enteron_exit属性进行设置。
    • on_enter:进入该状态后做的事,此时状态已经转换了。
    • on_exit:即将离开某状态时做的事,此时状态未转换。
  • 给基础类增加动态方法:on_enter_stateNameon_exit_stateName
  • transition的before属性、after属性
    • before: 在 属性转换方法 调用即将开始前进行回调,此时属性未转变。
    • after: 在 属性转换方法 调用即将结束前进行回调,此时属性已改变。

先后顺序(使用转换条件进行状态转换,即执行lump.melt(),实现由solid -> liquid):

  1. transition的回调before属性
  2. state=solid的回调on_exit属性
  3. 基础类动态方法的回调on_exit_solid()方法
  4. 【此时状态已改变】
  5. state=liquid的回调on_enter属性
  6. 基础类动态方法的回调on_enter_liquid()方法
  7. transition的回调after属性

第一种:states的on_enteron_exit

可以理解为“回调”或者“钩子”。

  • on_enter:进入该状态后做的事(会执行Matter类中相应名字的函数),注意此时状态已经转换了。
  • on_exit:即将离开某状态时做的事(会执行Matter类中相应名字的函数),注意此时状态未转换。
from transitions import Machine, State

## 1、创建基础类
class Matter(object):
    def say_hello(self):
        print("hello, new state!")

    def say_goodbye(self):
        print("goodbye, old state!")

lump = Matter()


## 2、定义状态,同时给某些状态设置回调
states = [
    State(name='solid', on_exit=['say_goodbye']),
    'liquid',
    'name': 'gas', 'on_exit': ['say_goodbye'],
    'name': 'plasma', 'on_enter': ['say_hello']
]

## 3、绑定并将状态加入到状态机上
machine = Machine(lump, states=states)


## 4、追加设置某些状态的回调
machine.on_enter_liquid('say_hello')  # 使用'on_enter_stateName,动态方法'的方式设置进入liquid状态时执行的回调

第二种:给基础类增加动态方法:on_enter_stateNameon_exit_stateName

# 代码看不懂可以先往下看,回头再来看即可
from transitions import State, Machine

class Matter:
    def say_hello(self):
        print('hello, ' + self.state + ' state!')

    def say_goodbye(self):
        print('goodbye, ' + self.state + ' state!')

    def on_exit_solid(self):
        print('Current ' + self.state + ' state!')

    def on_enter_liquid(self):
        print('Current ' + self.state + ' state!')

    def __init__(self):
        states = [
            State(name='solid', on_enter='say_hello', on_exit='say_goodbye'),
            State(name='liquid', on_enter='say_hello', on_exit='say_goodbye'),
            State(name='gas', on_enter='say_hello', on_exit='say_goodbye'),
            State(name='plasma', on_enter='say_hello', on_exit='say_goodbye'),
        ]

        self.machine = Machine(model=self, states=states, initial=states[0])

        self.machine.add_transition(trigger='melt', source='solid', dest='liquid')
        self.machine.add_transition(trigger='evaporate', source='liquid', dest='gas')
        self.machine.add_transition(trigger='sublimate', source='solid', dest='gas')
        self.machine.add_transition(trigger='ionize', source='gas', dest='plasma')


lump = Matter()
print(lump.state)  # solid

lump.melt()
# goodbye, solid state!
# Current solid state!
# hello, liquid state!
# Current liquid state!

print(lump.state)  # liquid
print(lump.is_solid())  # Fasle
print(lump.is_liquid())  # True

第三种:transition的回调before属性、after属性

from transitions import State, Machine

class Matter:
    def say_hello(self):
        print('hello, ' + self.state + ' state!')

    def say_goodbye(self):
        print('goodbye, ' + self.state + ' state!')

    def on_exit_solid(self):
        print('exit Current ' + self.state + ' state!')

    def on_enter_liquid(self):
        print('enter Current ' + self.state + ' state!')

    def hello(self):
        print('hello after, ' + self.state + ' state!')

    def goodbye(self):
        print('goodbye before, ' + self.state + ' state!')

    def __init__(self):
        states = [
            State(name='solid', on_enter='say_hello', on_exit='say_goodbye'),
            State(name='liquid', on_enter='say_hello', on_exit='say_goodbye'),
            State(name='gas', on_enter='say_hello', on_exit='say_goodbye'),
            State(name='plasma', on_enter='say_hello', on_exit='say_goodbye'),
        ]

        self.machine = Machine(model=self, states=states, initial=states[0])

        # before: 在 属性转换方法 调用开始之前进行回调,此时属性未转变
        # after: 在 属性转换方法 调用即将结束前进行回调,此时属性已改变
        self.machine.add_transition(trigger='melt', source='solid', dest='liquid', before='goodbye', after='hello')
        self.machine.add_transition(trigger='evaporate', source='liquid', dest='gas')
        self.machine.add_transition(trigger='sublimate', source='solid', dest='gas')
        self.machine.add_transition(trigger='ionize', source='gas', dest='plasma')

lump = Matter()
print(lump.state)  # solid
lump.melt()
# goodbye before, solid state!      # transition的回调`before`属性
# goodbye, solid state!             # state=solid的回调`on_exit`属性
# exit Current solid state!         # 基础类动态方法的回调`on_exit_stateName`方法

# hello, liquid state!              # state=liquid的回调`on_enter`属性
# enter Current liquid state!       # 基础类动态方法的回调`on_enter_stateName`方法
# hello after, liquid state!        # transition的回调`after`属性

print(lump.state)  # liquid
print(lump.is_solid())  # Fasle
print(lump.is_liquid())  # True

transition:状态转换的定义

支持两种定义方式

方式一:列表的形式,状态切换的格式如下所示:

from transitions import Machine, State

## 1、创建基础类
class Matter(object):
    def say_hello(self):
        print("hello, new state!")

    def say_goodbye(self):
        print("goodbye, old state!")

lump = Matter()


## 2、定义状态,同时给某些状态设置回调
states = [
    State(name='solid', on_exit=['say_goodbye']),
    'liquid',
    'name': 'gas', 'on_exit': ['say_goodbye'],
    State(name='plasma', on_enter=['say_hello'])
]


## 3、定义转换规则
transitions = [
     'trigger': 'melt', 'source': 'solid', 'dest': 'liquid' ,
     'trigger': 'evaporate', 'source': 'liquid', 'dest': 'gas' ,
     'trigger': 'sublimate', 'source': 'solid', 'dest': 'gas' ,
     'trigger': 'ionize', 'source': 'gas', 'dest': 'plasma' ,
     'trigger': 'any', 'source': '*', 'dest': 'solid'  # *为任意状态
]


## 4、绑定并将状态及转换规则加入到状态机上
machine = Machine(model=lump, states=states, transitions=transitions)

定义转换规则可以简化为:

transitions = [
    ['melt', 'solid', 'liquid'],
    ['evaporate', 'liquid', 'gas'],
    ['sublimate', 'solid', 'gas'],
    ['ionize', 'gas', 'plasma'],
    ['any', '*', 'solid']
]

方式二:通过add_transition函数添加转换:

from transitions import Machine, State

## 1、创建基础类
class Matter(object):
    def say_hello(self):
        print("hello, new state!")

    def say_goodbye(self):
        print("goodbye, old state!")

lump = Matter()


## 2、定义状态,同时给某些状态设置回调
states = [
    State(name='solid', on_exit=['say_goodbye']),
    'liquid',
    'name': 'gas', 'on_exit': ['say_goodbye'],
    State(name='plasma', on_enter=['say_hello'])
]


# 3、绑定并将状态加入到状态机上,同时指定初始状态为'solid'
# 如果未指定初始状态则默认为'initial'
machine = Machine(model=lump, states=states, initial='solid')


# 4、添加转换条件
machine.add_transition('melt', source='solid', dest='liquid')
# machine.add_transition('melt', source='solid', dest='plasma') # 只有第一个匹配melt的transition有效,因为它们trigger相同,source也相同,所以即使写了此行也不会生效。
# machine.add_transition('melt', source='gas', dest='plasma') # 相同trigger时source不同的择机匹配,即melt()的source支持solid或gas,如果source都不是则抛出异常
machine.add_transition('evaporate', source='liquid', dest='gas')
machine.add_transition('sublimate', source='solid', dest='gas')
machine.add_transition('ionize', source='gas', dest='plasma')

使用

## 把 完成"状态转换的定义"后的任意代码放到此处即可

# 打印当前状态
print(lump.state)

# is_stateName判断当前状态是否为solid
print(lump.is_solid()) # 返回布尔值

# (非常不推荐使用,会无视设置的转换条件)强制转换到liquid状态
lump.to_liquid()

# (推荐)使用转换条件进行状态转换,如不符合设置的转换条件则会抛出异常
lump.melt()

# (也可)使用trigger方法调用melt转化,如不符合设置的转换条件则会抛出异常
lump.trigger("melt")

# 获取solid状态触发器
machine.get_triggers('solid')

3、代码总结

单纯绘图

主要用来前期理清 state、transition思路

from transitions.extensions import GraphMachine

"""
    GraphMachine依赖graphviz,需要先安装`pip3 install graphviz`
    并确保Graphviz可执行文件在您的系统上PATH
    https://github.com/xflr6/graphviz
    http://graphviz.org/
"""


class Matter:
    pass


states = ['固态 solid', '液态 liquid', '气态 gas', '等离子态 plasma']
transitions = [
    'trigger': '熔化 melt', 'source': '固态 solid', 'dest': '液态 liquid',
    'trigger': '蒸发 evaporate', 'source': '液态 liquid', 'dest': '气态 gas',
    'trigger': '升华 sublimate', 'source': '固态 solid', 'dest': '气态 gas',
    'trigger': '电离 ionize', 'source': '气态 gas', 'dest': '等离子态 plasma'
]
machine = GraphMachine(Matter(), states=states, transitions=transitions, initial='固态 solid')
# machine = GraphMachine(lump, states=states, transitions=transitions, initial='solid', show_auto_transitions=True)  # 完整流程
graph = machine.get_graph()
graph.edge_attr['fontname'] = 'Microsoft Yahei'
graph.node_attr['fontname'] = 'Microsoft Yahei'
graph.graph_attr['fontname'] = 'Microsoft Yahei'
graph.graph_attr['dpi'] = '300'  # 设置分辨率
graph.graph_attr.pop('label')  # 删除标题
graph.draw('result.png', prog='dot')

封装写法

from transitions import Machine


class Matter:
    states = ['solid', 'liquid', 'gas', 'plasma']  # 状态有固态、液态、气态、等离子态
    transitions = [
        'trigger': 'melt', 'source': 'solid', 'dest': 'liquid'

c# 状态机实现

c#仿boost statechart的状态机。去年转到unity使用c#,statechart原来的风格蛮爽的,缺点是编译忒慢,在c#则编译根本不是问题。

不一样的地方首先是简单!因为没做一些东西如region。其次是每个状态是持久存在的,不像boost statechart当transit时重建。所以entry,exit自己管一下清理。

重入时不包括双方最近的共同outer state,个人喜好。

这不是一个快速的状态机,在它该用的地方保证很爽就是了。不该用的地方比如字符匹配就不要用了。

 

最近想实现一下行为树,但重新回顾这个状态机,我发现就AI来说,2者其实并无什么质变。该状态机如果实现动态的树结构,那其实就是另一个变异的行为树了。

 

using System;
using System.Collections;
using System.Collections.Generic;

namespace StateChart
{
    public enum EResult
    { 
        None,
        Forward,
        Resume,
        Defered,
    }

    public enum EHistory
    {
        Shallow,
        Deep,
    }

public abstract class IEvent { public Type type { get { return GetType(); } } }
public delegate void Reaction<T>(T fsm); public delegate EResult Reaction<U, T>(U fsm, T evt); interface IReaction { EResult Execute<FSM, EVENT>(FSM fsm_, EVENT evt); } public class CReaction<FSM, EVENT> : IReaction { Reaction<FSM, EVENT> reaction; public CReaction(Reaction<FSM, EVENT> reaction_) { reaction = reaction_; } public EResult Execute<F, E>(F fsm_, E evt) { return reaction((FSM)(object)fsm_, (EVENT)(object)evt); } } public abstract class IState<FSM> where FSM : IStateMachine<FSM> { public Type type { get { return GetType(); } } public EHistory History { get; set; } public Reaction<FSM> Entry { get; set; } public Reaction<FSM> Exit { get; set; } //could calc on runtime, but we need more fast spped this time. public int Depth { get; set; } public IState<FSM> OuterState { get; set; } public IState<FSM> ActiveState { get; set; } Dictionary<Type, IReaction> reactions = new Dictionary<Type, IReaction>(); Dictionary<Type, Type> transitions = new Dictionary<Type, Type>(); List<IState<FSM>> subStates = new List<IState<FSM>>(); IState<FSM> initState = null; public IState<FSM> InitState { get { if (initState == null) if (subStates.Count > 0) initState = subStates[0]; return initState; } set { initState = value; } } public IState(IState<FSM> ostate) { History = EHistory.Shallow; OuterState = ostate; if (OuterState != null) OuterState.AddSubState(this); } public IState(IState<FSM> ostate, EHistory history_) { OuterState = ostate; History = history_; if (OuterState != null) OuterState.AddSubState(this); } public void DoEntry(FSM fsm_) { //UnityEngine.Debug.Log("Entry: " + type.ToString()); Console.WriteLine("Entry: " + type.ToString()); if (Entry != null) Entry(fsm_); else OnEntry(fsm_); } public void DoExit(FSM fsm_) { //UnityEngine.Debug.Log("Exit : " + type.ToString()); Console.WriteLine("Exit : " + type.ToString()); if (Exit != null) Exit(fsm_); else OnExit(fsm_); } protected virtual void OnEntry(FSM fsm_) { } protected virtual void OnExit(FSM fsm_) { } public EResult Process<EVENT>(FSM fsm_, EVENT evt) where EVENT : IEvent { IReaction reaction = null; bool hasit = reactions.TryGetValue(evt.type, out reaction); if (!hasit) return EResult.Forward; return reaction.Execute<FSM, EVENT>(fsm_, evt); } public void Bind<EVENT>(Reaction<FSM, EVENT> reaction) where EVENT : IEvent { if (transitions.ContainsKey(typeof(EVENT))) throw new System.InvalidOperationException(); IReaction ireaction = new CReaction<FSM, EVENT>(reaction); reactions.Add(typeof(EVENT), ireaction); } public void Bind<EVENT, TSTATE>() where EVENT : IEvent where TSTATE : IState<FSM> { if (reactions.ContainsKey(typeof(EVENT))) throw new System.InvalidOperationException(); transitions.Add(typeof(EVENT), typeof(TSTATE)); } public void AddSubState(IState<FSM> sstate) { IState<FSM> state = subStates.Find((x) => x.type == sstate.type); if (state != null) return; subStates.Add(sstate); } public IEnumerable<IState<FSM>> IterateSubState() { foreach (IState<FSM> state in subStates) yield return state; } } }

 

using System;
using System.Collections;
using System.Collections.Generic;

namespace StateChart
{

    public abstract class IStateMachine<HOST>  where HOST : IStateMachine<HOST>
    {
        Dictionary<Type, IState<HOST>> typeStates = new Dictionary<Type, IState<HOST>>();
        List<IState<HOST>> activeStates = new List<IState<HOST>>();
        Queue<IEvent> eventQueue = new Queue<IEvent>();
        IState<HOST> outestState = null;
        bool bSuspend = false;

        public IStateMachine() { }

        public void Init(IState<HOST> state) 
        {
            IState<HOST> pstate = state;

            //add outer states
            while (pstate.OuterState != null) {
                pstate.OuterState.ActiveState = pstate;
                activeStates.Add(pstate);
                pstate = pstate.OuterState;
            } 
            activeStates.Add(pstate);
            outestState = pstate;
            
            //build global type-to-state table
            BuildStateTable(outestState, 0);

            //add init sub states
            pstate = state;
            while (pstate.InitState != null) {
                pstate.ActiveState = pstate.InitState;
                pstate = state.InitState;
                if(pstate != null) activeStates.Add(pstate);
            }

            activeStates.Sort((x, y) => x.Depth - y.Depth);
            foreach (IState<HOST> astate in activeStates) {
                astate.DoEntry((HOST)this);
            }
        }

        void BuildStateTable(IState<HOST> state, int depth_) 
        {
            if (state == null) return;
            state.Depth = depth_;
            typeStates.Add(state.type, state);
            foreach (IState<HOST> sstate in state.IterateSubState()) { 
                BuildStateTable(sstate, depth_ + 1); 
            }
        }

        EResult Transit(IState<HOST> state)
        {
            IState<HOST> lstate = null;

            lstate = outestState;
            while (lstate.ActiveState != null) {  // we could save it if state tree is too high.
                lstate = lstate.ActiveState;
            }

            IState<HOST> rstate = state;
            if (state.History == EHistory.Shallow)
                while (rstate.InitState != null)
                    rstate = state.InitState;
            else
                while (rstate.ActiveState != null)
                    rstate = rstate.ActiveState;


            IState<HOST> ltail = lstate;  //save tail of active states
            IState<HOST> rtail = rstate;    //save tail of init states

            int dis = lstate.Depth - rstate.Depth;
            if (dis > 0)
                { IState<HOST> tstate = lstate; lstate = rstate; rstate = tstate; } //rstate will be deepest state

            dis = Math.Abs(dis);
            for (int i = 0; i < dis; i++)  {
                rstate = rstate.OuterState;
            }
            if (rstate == lstate)  //is family
                return EResult.None;
            do
            { //find nearest outer state
                rstate = rstate.OuterState;
                lstate = lstate.OuterState;
            } while (lstate != rstate);

            do  // call exit chain 
            {
                ltail.DoExit((HOST)this);
                ltail = ltail.OuterState;
            } while (ltail != lstate);

            //add tail chain active states
            activeStates.RemoveRange(rstate.Depth + 1, activeStates.Count - rstate.Depth - 1);
            do
            {
                activeStates.Add(rtail);
                lstate = rtail;
                rtail = rtail.OuterState;
                rtail.ActiveState = lstate;
            } while (rtail != rstate);

            // do entry chain
            while (rstate.ActiveState != null)
            {
                rstate = rstate.ActiveState;
                rstate.DoEntry((HOST)this);
            }

            activeStates.Sort((x, y) => x.Depth - y.Depth);
            return EResult.None;
        }

        public EResult Transit(Type stateType)
        {
            IState<HOST> state = null;
            if (!typeStates.TryGetValue(stateType, out state))
                return EResult.None;
            return Transit(state);
        }

        public EResult Transit<TSTATE>()
        { return Transit(typeof(TSTATE)); }

        public void Process<EVENT>(EVENT evt) where EVENT : IEvent
        {
            if (bSuspend) return;

            eventQueue.Enqueue(evt);
            int eventCount = eventQueue.Count;
            while (eventCount > 0){
                eventCount--;            
                IEvent pevent = eventQueue.Dequeue();
                foreach (IState<HOST> state in activeStates)
                    if (bSuspend || state.Process((HOST)this, pevent) == EResult.None)
                        break;
            }
        }

        public void PostEvent<EVENT>(EVENT evt) where EVENT : IEvent
        {
            if (bSuspend) return;
            eventQueue.Enqueue(evt);
        }

        public void Suspend()
        { bSuspend = true; }
        public void Resume()
        { bSuspend = false; }
    }

 

以上是关于Python状态机(transitions模块)的主要内容,如果未能解决你的问题,请参考以下文章

状态机的一般实现

过渡 - transition

Qt移动应用开发:应用粒子特效

c# 状态机实现

有没有典型的状态机实现模式?

Python之路第八篇:堡垒机实例以及数据库操作