强化学习入门级实践教学

Posted 微笑小星

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了强化学习入门级实践教学相关的知识,希望对你有一定的参考价值。

参考视频:https://www.bilibili.com/video/BV1yv411i7xd

代码下载:https://github.com/PaddlePaddle/PARL

可以先阅读我的文章强化学习纲要,本文针对强化学习的入门级讲解。代码主要参考强化学习算法框架库:PARL

资料推荐

  • 书籍:《Reinforcement Learning: An Introduction》

  • 视频:David Silver经典强化学习公开课、UC Berkeley CS285、斯坦福CS234

  • 经典论文:

    • DQN:https://arxiv.org/pdf/1312.5602.pdf

    • A3C: https://www.jmlr.org/proceedings/papers/v48/mniha16.pdf

    • DDPG: https://arxiv.org/pdf/1509.02971

    • PPO: https://arxiv.org/pdf/1707.06347

  • 前沿研究方向:Model-base RL、Hierarchical RL、Multi Agent RL、Meta Learning。

序章

Agent学习的两种方案

一种是基于价值(value-based),一种是基于策略(policy-based)

基于价值:我们给每一个状态都赋予一个“价值”的概念,例如上图C状态的价值大于A状态,也就是往C走离奖励更近。我们的做法是让Agent总是往价值高的地方移动,这样就能找出一个最优的策略。一旦函数优化到最优了,相同的输入永远是同一个输出。

代表方法:Sarsa、Q-learning、DQN。

基于策略:我们直接让一条策略走到底,然后用最后的reward来判断策略是好是坏。好的策略能在以后的行为中获得更高的触发几率。由于输出的是几率,因此同样的输入会获得不同的输出,随机性更强。

代表方法:Policy Gradient。

RL概览分类

强化学习分为无模型(model-free)和基于模型(model-based),无模型的研究会更加热门,上面的两种方案都是无模型的分类。

Model-free: 不需要知道状态之间的转移概率(transition probability)。

Model-based: 需要知道状态之间的转移概率。

而在Model-free中又有三种分类:

算法库

这里我们使用强化学习算法框架库:PARL。这里涵盖了很多的经典算法,又能复现一些最新的流行算法。看代码的学习方式,也是最快上手的学习方式。直接扫左上角二维码来学习。其中多智能体是一个热门的研究方向,被认为更接近人类社会的交互形式。

RL编程实践:GYM

前面的是和算法相关的库。另外一类是和环境相关的库。GYM是学术界比较喜欢的环境库。环境分为离散控制场景和连续控制场景,离散控制是输出只有有限个量,例如控制方向时只有向左向右,一般使用atari环境评估。连续控制是输出是一个连续的量,例如机械臂旋转的角度,一般使用mujoco环境来评估。

环境安装

在安装了python环境的前提下,打开cmd,输入以下命令:

pip install paddlepaddle==1.6.3 # 网络超时加上-i https://pypi.tuna.tsinghua.edu.cn/simple,安装GPU版本要用paddlepaddle-gpu
pip install parl==1.3.1     #如果安装不成功,在install后加上--user
pip install gym

程序示例及PARL的优势

import gym
from gridworld import CliffWalkingWapper
import turtle

# 创建环境
env = gym.make("CliffWalking-v0")
# 绘制一个图形界面,不写这一行只有文字界面
env = CliffWalkingWapper(env)
# 重置界面,开始新的一轮
env.reset()
# 展示界面
env.render()

# 跟环境交互一步,如果有返回值第一个是纵坐标,第二个是reward,第三个是一轮是否结束
# step(1)是往右走,step(2)是往下走,step(3)是向左走
env.step(0)
env.render()

# 让程序运行结束后界面不立刻关闭
turtle.exitonclick()

PARL对框架库做了很好的抽象,所有的算法几乎都集中于Model,Algorithm,Agent三个类。其中Algoritm已经实现了核心的部分,另外两部分开发者可以很方便实现订制。在上面下载的PARL中,有许多算法的Example。

从上面随便找一个example,然后对model和Agent文件做一定修改,就可以移植到新的应用场景中。

PARL的工业应用能力很强。只需要加两行代码,就可以把单机训练变成多机训练。PARL的并行能力很强,和Python不同,真正做到多线程运行,节省大量时间

配置好环境后,直接运行QuickStart里面的程序,看看是否成功,此处本人没有运行成功,报错信息如下:

C:\\Python39\\lib\\site-packages\\parl\\remote\\communication.py:38: FutureWarning: 'pyarrow.default_serialization_context' is deprecated as of 2.0.0 and will be removed in a future version. Use pickle or the pyarrow IPC functionality instead.
  context = pyarrow.default_serialization_context()
W0718 17:57:30.795331  4240 device_context.cc:404] Please NOTE: device: 0, GPU Compute Capability: 6.1, Driver API Version: 11.1, Runtime API Version: 10.2
[07-18 17:57:30 MainThread @train.py:73] obs_dim 4, act_dim 2
W0718 17:57:30.798295  4240 dynamic_loader.cc:238] Note: [Recommend] copy cudnn into CUDA installation directory. 
 For instance, download cudnn-10.0-windows10-x64-v7.6.5.32.zip from NVIDIA's official website, 
then, unzip it and copy it into C:\\Program Files\\NVIDIA GPU Computing Toolkit\\CUDA\\v10.0
You should do this according to your CUDA installation directory and CUDNN version.

安装CUDNN依旧测试失败,如果有人知道缘由,请私聊。

基于表格型方法求解RL

强化学习MDP四元组

这就是强化学习MDP四元组。我们用状态转移概率表述:在 s t s_t st状态选择 a t a_t at的动作转移到 s t + 1 s_t+1 st+1而且拿到奖励 r t r_t rt的概率。这样的决策过程是马尔科夫决策过程(MDP)。这是一个序列决策的经典表达方式。

状态转移和序列决策

每次我们只能走其中的一条完整通路,这就是一个观察状态和执行决策不断循环的过程。每次我们都用两个函数描述环境。一个是Probability Function(P函数),反映了环境的随机性。一个状态如果必然转移到下一个特定的状态,那么状态转移概率是100%。如果有多种可能的情况,每个的状态转移概率在0到100%之间。和状态转移概率成对的是Reward Function(R函数),描述了环境的好坏。P函数和R函数已知,则称这个环境已知,也称为Model-based。**如果这些条件已知,那么我们就可以用动态规划来寻找最优策略。**但这里我们不讲动态规划。

这里我们针对的是在环境未知的情况下的解决方法,因为在实际问题中的状态转移概率往往是未知的。对于P函数和R函数未知的状况我们称为Model-free

Q表格

Q表格指导每一个Step的动作选择,目标导向是未来的总收益。由于收益往往具有延迟,因此未来的总收益才能代表当前动作选择的价值。但是由于前面的动作对越往后的状态影响越小,因此我们需要引入衰减因子。

**强化的概念是我们可以用下一个状态的价值更新现在状态的价值。**因此我们每一个Step都可以更新Q表格,这是一种时序差分的更新方法。

时序差分(Temporal Difference)

推荐一个有意思的网站:https://cs.stanford.edu/people/karpathy/reinforcejs/gridworld_td.html

只有一个格子的reward为1,有很多格子reward为-1,随着小球的探索,有价值的格子也会把周围格子的分数拉高,形成一个整体的“评分体系”。只需要沿着分数增加的方向走一定能找到reward为1的点。

这就是一种时序差分的更新方法。

我们要让 Q ( S t , A t ) Q(S_t,A_t) Q(St,At)逐步逼近理想的值,因此我们每次只更新一点点,α就是更新的速率。通过不断的更新我们可以让每一个状态的 G t G_t Gt都满足:
G t = R t + 1 + γ G t + 1 G_t = R_t+1 + \\gamma G_t+1 Gt=Rt+1+γGt+1
这就是Sarsa算法,命名方式是按照这五个值来进行计算。上面的格子的价值更新就是按照上面的公式来的。

Sarsa算法代码解析

Agent最重要的是实现两个功能,一个是根据算法选Action,第二个是学习Q表格。并且每一个step都包含这两步。

我们要另开一个agent.py文件,声明一个Agent类,Sample函数选择要输出Action,learn函数来更新Q。

class SarsaAgent(object):
    def __init__(self,obs_n,act_n,learning_rate=0.01,gamma=0.9,e_greed=0.1):
        self.act_n = act_n  # 动作维度,有几个动作可选
        self.lr = learning_rate  # 学习率
        self.gamma = gamma  # reward的衰减率
        self.epsilon = e_greed  # 按一定概率随机选动作
        self.Q = np.zeros((obs_n, act_n)) #生成一个矩阵作为Q表格,参数分别是状态的维度和动作维度,有多少格子就有多少状态

    # 根据输入观察值,返回输出的Action对应的索引,带探索
    def sample(self, obs):
        if np.random.uniform(0, 1) < (1.0 - self.epsilon):  #根据table的Q值选动作
            action = self.predict(obs)
        else:
            action = np.random.choice(self.act_n)  #有一定概率随机探索选取一个动作
        return action

    # 根据输入观察值,输出分数最高的一个动作值
    def predict(self, obs):
        Q_list = self.Q[obs, :]
        maxQ = np.max(Q_list)
        #where函数返回一个包含数组和数据类型的元组,取第一个才是数组
        action_list = np.where(Q_list == maxQ)[0]  # maxQ可能对应多个action
        action = np.random.choice(action_list)
        return action

    # 学习方法,也就是更新Q-table的方法,采用Sarsa算法来更新Q表格
    def learn(self, obs, action, reward, next_obs, next_action, done):
        """ on-policy
            obs: 交互前的obs, s_t
            action: 本次交互选择的action, a_t
            reward: 本次动作获得的奖励r
            next_obs: 本次交互后的obs, s_t+1
            next_action: 根据当前Q表格, 针对next_obs会选择的动作, a_t+1
            done: episode是否结束
        """
        predict_Q = self.Q[obs, action]
        if done:
            target_Q = reward  # 没有下一个状态了
        else:
            target_Q = reward + self.gamma * self.Q[next_obs,
                                                    next_action]  # Sarsa
        self.Q[obs, action] += self.lr * (target_Q - predict_Q)  # 修正q

    # 保存Q表格
    def save(self):
        npy_file = './q_table.npy'
        np.save(npy_file, self.Q)
        print(npy_file + ' saved.')
	# 读取Q表格
    def restore(self, npy_file='./q_table.npy'):
        self.Q = np.load(npy_file)
        print(npy_file + ' loaded.')

然后再开一个文件写我们的main函数:

import gym
from gridworld import CliffWalkingWapper, FrozenLakeWapper
from agent import SarsaAgent
import time


def run_episode(env, agent, render=False):
    total_steps = 0  # 记录每个episode走了多少step
    total_reward = 0

    obs = env.reset()  # 重置环境, 重新开一局(即开始新的一个episode)
    action = agent.sample(obs)  # 根据算法选择一个动作

    while True:
        next_obs, reward, done, _ = env.step(action)  # 执行动作,action在0~3代表不同方向,并返回下一个状态
        next_action = agent.sample(next_obs)  # 根据算法选择下一个动作
        # 训练 Sarsa 算法,更新Q表格
        agent.learn(obs, action, reward, next_obs, next_action, done)

        action = next_action # 把下一个动作更新为现在的动作
        obs = next_obs  # 把下一个状态更新为现在的状态
        total_reward += reward
        total_steps += 1  # 计算step数
        if render:
            env.render()  #渲染新的一帧图形
        if done:
            break
    return total_reward, total_steps

# 测试函数
def test_episode(env, agent):
    total_reward = 0
    obs = env.reset()
    while True:
        action = agent.predict(obs)  # greedy
        next_obs, reward, done, _ = env.step(action)
        total_reward += reward
        obs = next_obs
        time.sleep(0.5)
        env.render()
        if done:
            print('test reward = %.1f' % (total_reward))
            break


def main():
    # env = gym.make("FrozenLake-v0", is_slippery=False)  # 0 left, 1 down, 2 right, 3 up
    # env = FrozenLakeWapper(env)

    env = gym.make("CliffWalking-v0")  # 0 up, 1 right, 2 down, 3 left
    # 创建一个图形界面
    env = CliffWalkingWapper(env)

    # 实例化agent
    agent = SarsaAgent(
        obs_n=env.observation_space.n,
        act_n=env.action_space.n,
        learning_rate=0.1,
        gamma=0.9,
        e_greed=0.1)

    is_render = False
    for episode in range(500):
        ep_reward, ep_steps = run_episode(env, agent, is_render)
        print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps,
                                                          ep_reward))

        # 每隔20个episode渲染一下看看效果
        if episode % 20 == 0:
            is_render = True
        else:
            is_render = False
    # 训练结束,查看算法效果
    test_episode(env, agent)


if __name__ == "__main__":
    main()

生成图形环境的文件gridworld.py的代码如下:

import gym
import turtle
import numpy as np

# turtle tutorial : https://docs.python.org/3.3/library/turtle.html


def GridWorld(gridmap=None, is_slippery=False):
    if gridmap is None:
        gridmap = ['SFFF', 'FHFH', 'FFFH', 'HFFG']
    env = gym.make("FrozenLake-v0", desc=gridmap, is_slippery=False)
    env = FrozenLakeWapper(env)
    return env


class FrozenLakeWapper(gym.Wrapper):
    def __init__(self, env):
        gym.Wrapper.__init__(self, env)
        self.max_y = env.desc.shape[0]
        self.max_x = env.desc.shape[1]
        self.t = None
        self.unit = 50

    def draw_box(self, x, y, fillcolor='', line_color='gray'):
        self.t.up()
        self.t.goto(x * self.unit, y * self.unit)
        self.t.color(line_color)
        self.t.fillcolor(fillcolor)
        self.t.setheading(90)
        self.t.down()
        self.t.begin_fill()
        for _ in range(4):
            self.t.forward(self.unit)
            self.t.right(90)
        self.t.end_fill()

    def move_player(self, x, y):
        self.t.up()
        self.t.setheading(90)
        self.t.fillcolor('red')
        self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)

    def render(self):
        if self.t == None:
            self.t = turtle.Turtle()
            self.wn = turtle.Screen()
            self.wn.setup(self.unit * self.max_x + 100,
                          self.unit * self.max_y + 100)
            self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
                                        self.unit * self.max_y)
            self.t.shape('circle')
            self.t.width(2)
            self.t.speed(0)
            self.t.color('gray')
            for i in range(self.desc.shape[0]):
                for j in range(self.desc.shape[1]):
                    x = j
                    y = self.max_y - 1 - i
                    if self.desc[i][j] == b'S':  # Start
                        self.draw_box(x, y, 'white')
                    elif self.desc[i][j] == b'F':  # Frozen ice
                        self.draw_box(x, y, 'white')
                    elif self.desc[i][j] == b'G':  # Goal
                        self.draw_box(x, y, 'yellow')
                    elif self.desc[i][j] == b'H':  # Hole
                        self.draw_box(x, y, 'black')
                    else:
                        self.draw_box(x, y, 'white')
            self.t.shape('turtle')

        x_pos = self.s % self.max_x
        y_pos = self.max_y - 1 - int(self.s / self.max_x)
        self.move_player(x_pos, y_pos)


class CliffWalkingWapper(gym.Wrapper):
    def __init__(self, env):
        gym.Wrapper.__init__(self, env)
        self.t = None
        self.unit = 50
        self.max_x = 12
        self.max_y = 4

    def draw_x_line(self, y, x0, x1, color='gray'):
        assert x1 > x0
        self.t以上是关于强化学习入门级实践教学的主要内容,如果未能解决你的问题,请参考以下文章

利用AI强化学习训练50级比卡超单挑70级超梦!

利用AI强化学习训练50级比卡超单挑70级超梦!

利用AI强化学习训练50级比卡超单挑70级超梦!

利用 AI 强化学习算法,训练50级比卡超,单挑70级超梦!

利用 AI 强化学习算法,训练50级比卡超,单挑70级超梦!

递归分类最佳实践:如何在强化学习中用示例代替奖励