强化学习入门级实践教学
Posted 微笑小星
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了强化学习入门级实践教学相关的知识,希望对你有一定的参考价值。
参考视频:https://www.bilibili.com/video/BV1yv411i7xd
代码下载:https://github.com/PaddlePaddle/PARL
可以先阅读我的文章强化学习纲要,本文针对强化学习的入门级讲解。代码主要参考强化学习算法框架库:PARL
资料推荐
-
书籍:《Reinforcement Learning: An Introduction》
-
视频:David Silver经典强化学习公开课、UC Berkeley CS285、斯坦福CS234
-
经典论文:
-
DQN:https://arxiv.org/pdf/1312.5602.pdf
-
A3C: https://www.jmlr.org/proceedings/papers/v48/mniha16.pdf
-
DDPG: https://arxiv.org/pdf/1509.02971
-
PPO: https://arxiv.org/pdf/1707.06347
-
-
前沿研究方向:Model-base RL、Hierarchical RL、Multi Agent RL、Meta Learning。
序章
Agent学习的两种方案
一种是基于价值(value-based),一种是基于策略(policy-based)
基于价值:我们给每一个状态都赋予一个“价值”的概念,例如上图C状态的价值大于A状态,也就是往C走离奖励更近。我们的做法是让Agent总是往价值高的地方移动,这样就能找出一个最优的策略。一旦函数优化到最优了,相同的输入永远是同一个输出。
代表方法:Sarsa、Q-learning、DQN。
基于策略:我们直接让一条策略走到底,然后用最后的reward来判断策略是好是坏。好的策略能在以后的行为中获得更高的触发几率。由于输出的是几率,因此同样的输入会获得不同的输出,随机性更强。
代表方法:Policy Gradient。
RL概览分类
强化学习分为无模型(model-free)和基于模型(model-based),无模型的研究会更加热门,上面的两种方案都是无模型的分类。
Model-free: 不需要知道状态之间的转移概率(transition probability)。
Model-based: 需要知道状态之间的转移概率。
而在Model-free中又有三种分类:
算法库
这里我们使用强化学习算法框架库:PARL。这里涵盖了很多的经典算法,又能复现一些最新的流行算法。看代码的学习方式,也是最快上手的学习方式。直接扫左上角二维码来学习。其中多智能体是一个热门的研究方向,被认为更接近人类社会的交互形式。
RL编程实践:GYM
前面的是和算法相关的库。另外一类是和环境相关的库。GYM是学术界比较喜欢的环境库。环境分为离散控制场景和连续控制场景,离散控制是输出只有有限个量,例如控制方向时只有向左向右,一般使用atari环境评估。连续控制是输出是一个连续的量,例如机械臂旋转的角度,一般使用mujoco环境来评估。
环境安装
在安装了python环境的前提下,打开cmd,输入以下命令:
pip install paddlepaddle==1.6.3 # 网络超时加上-i https://pypi.tuna.tsinghua.edu.cn/simple,安装GPU版本要用paddlepaddle-gpu
pip install parl==1.3.1 #如果安装不成功,在install后加上--user
pip install gym
程序示例及PARL的优势
import gym
from gridworld import CliffWalkingWapper
import turtle
# 创建环境
env = gym.make("CliffWalking-v0")
# 绘制一个图形界面,不写这一行只有文字界面
env = CliffWalkingWapper(env)
# 重置界面,开始新的一轮
env.reset()
# 展示界面
env.render()
# 跟环境交互一步,如果有返回值第一个是纵坐标,第二个是reward,第三个是一轮是否结束
# step(1)是往右走,step(2)是往下走,step(3)是向左走
env.step(0)
env.render()
# 让程序运行结束后界面不立刻关闭
turtle.exitonclick()
PARL对框架库做了很好的抽象,所有的算法几乎都集中于Model,Algorithm,Agent三个类。其中Algoritm已经实现了核心的部分,另外两部分开发者可以很方便实现订制。在上面下载的PARL中,有许多算法的Example。
从上面随便找一个example,然后对model和Agent文件做一定修改,就可以移植到新的应用场景中。
PARL的工业应用能力很强。只需要加两行代码,就可以把单机训练变成多机训练。PARL的并行能力很强,和Python不同,真正做到多线程运行,节省大量时间
配置好环境后,直接运行QuickStart里面的程序,看看是否成功,此处本人没有运行成功,报错信息如下:
C:\\Python39\\lib\\site-packages\\parl\\remote\\communication.py:38: FutureWarning: 'pyarrow.default_serialization_context' is deprecated as of 2.0.0 and will be removed in a future version. Use pickle or the pyarrow IPC functionality instead.
context = pyarrow.default_serialization_context()
W0718 17:57:30.795331 4240 device_context.cc:404] Please NOTE: device: 0, GPU Compute Capability: 6.1, Driver API Version: 11.1, Runtime API Version: 10.2
[07-18 17:57:30 MainThread @train.py:73] obs_dim 4, act_dim 2
W0718 17:57:30.798295 4240 dynamic_loader.cc:238] Note: [Recommend] copy cudnn into CUDA installation directory.
For instance, download cudnn-10.0-windows10-x64-v7.6.5.32.zip from NVIDIA's official website,
then, unzip it and copy it into C:\\Program Files\\NVIDIA GPU Computing Toolkit\\CUDA\\v10.0
You should do this according to your CUDA installation directory and CUDNN version.
安装CUDNN依旧测试失败,如果有人知道缘由,请私聊。
基于表格型方法求解RL
强化学习MDP四元组
这就是强化学习MDP四元组。我们用状态转移概率表述:在 s t s_t st状态选择 a t a_t at的动作转移到 s t + 1 s_t+1 st+1而且拿到奖励 r t r_t rt的概率。这样的决策过程是马尔科夫决策过程(MDP)。这是一个序列决策的经典表达方式。
状态转移和序列决策
每次我们只能走其中的一条完整通路,这就是一个观察状态和执行决策不断循环的过程。每次我们都用两个函数描述环境。一个是Probability Function(P函数),反映了环境的随机性。一个状态如果必然转移到下一个特定的状态,那么状态转移概率是100%。如果有多种可能的情况,每个的状态转移概率在0到100%之间。和状态转移概率成对的是Reward Function(R函数),描述了环境的好坏。P函数和R函数已知,则称这个环境已知,也称为Model-based。**如果这些条件已知,那么我们就可以用动态规划来寻找最优策略。**但这里我们不讲动态规划。
这里我们针对的是在环境未知的情况下的解决方法,因为在实际问题中的状态转移概率往往是未知的。对于P函数和R函数未知的状况我们称为Model-free。
Q表格
Q表格指导每一个Step的动作选择,目标导向是未来的总收益。由于收益往往具有延迟,因此未来的总收益才能代表当前动作选择的价值。但是由于前面的动作对越往后的状态影响越小,因此我们需要引入衰减因子。
**强化的概念是我们可以用下一个状态的价值更新现在状态的价值。**因此我们每一个Step都可以更新Q表格,这是一种时序差分的更新方法。
时序差分(Temporal Difference)
推荐一个有意思的网站:https://cs.stanford.edu/people/karpathy/reinforcejs/gridworld_td.html
只有一个格子的reward为1,有很多格子reward为-1,随着小球的探索,有价值的格子也会把周围格子的分数拉高,形成一个整体的“评分体系”。只需要沿着分数增加的方向走一定能找到reward为1的点。
这就是一种时序差分的更新方法。
我们要让
Q
(
S
t
,
A
t
)
Q(S_t,A_t)
Q(St,At)逐步逼近理想的值,因此我们每次只更新一点点,α就是更新的速率。通过不断的更新我们可以让每一个状态的
G
t
G_t
Gt都满足:
G
t
=
R
t
+
1
+
γ
G
t
+
1
G_t = R_t+1 + \\gamma G_t+1
Gt=Rt+1+γGt+1
这就是Sarsa算法,命名方式是按照这五个值来进行计算。上面的格子的价值更新就是按照上面的公式来的。
Sarsa算法代码解析
Agent最重要的是实现两个功能,一个是根据算法选Action,第二个是学习Q表格。并且每一个step都包含这两步。
我们要另开一个agent.py文件,声明一个Agent类,Sample函数选择要输出Action,learn函数来更新Q。
class SarsaAgent(object):
def __init__(self,obs_n,act_n,learning_rate=0.01,gamma=0.9,e_greed=0.1):
self.act_n = act_n # 动作维度,有几个动作可选
self.lr = learning_rate # 学习率
self.gamma = gamma # reward的衰减率
self.epsilon = e_greed # 按一定概率随机选动作
self.Q = np.zeros((obs_n, act_n)) #生成一个矩阵作为Q表格,参数分别是状态的维度和动作维度,有多少格子就有多少状态
# 根据输入观察值,返回输出的Action对应的索引,带探索
def sample(self, obs):
if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作
action = self.predict(obs)
else:
action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作
return action
# 根据输入观察值,输出分数最高的一个动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ = np.max(Q_list)
#where函数返回一个包含数组和数据类型的元组,取第一个才是数组
action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个action
action = np.random.choice(action_list)
return action
# 学习方法,也就是更新Q-table的方法,采用Sarsa算法来更新Q表格
def learn(self, obs, action, reward, next_obs, next_action, done):
""" on-policy
obs: 交互前的obs, s_t
action: 本次交互选择的action, a_t
reward: 本次动作获得的奖励r
next_obs: 本次交互后的obs, s_t+1
next_action: 根据当前Q表格, 针对next_obs会选择的动作, a_t+1
done: episode是否结束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 没有下一个状态了
else:
target_Q = reward + self.gamma * self.Q[next_obs,
next_action] # Sarsa
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q
# 保存Q表格
def save(self):
npy_file = './q_table.npy'
np.save(npy_file, self.Q)
print(npy_file + ' saved.')
# 读取Q表格
def restore(self, npy_file='./q_table.npy'):
self.Q = np.load(npy_file)
print(npy_file + ' loaded.')
然后再开一个文件写我们的main函数:
import gym
from gridworld import CliffWalkingWapper, FrozenLakeWapper
from agent import SarsaAgent
import time
def run_episode(env, agent, render=False):
total_steps = 0 # 记录每个episode走了多少step
total_reward = 0
obs = env.reset() # 重置环境, 重新开一局(即开始新的一个episode)
action = agent.sample(obs) # 根据算法选择一个动作
while True:
next_obs, reward, done, _ = env.step(action) # 执行动作,action在0~3代表不同方向,并返回下一个状态
next_action = agent.sample(next_obs) # 根据算法选择下一个动作
# 训练 Sarsa 算法,更新Q表格
agent.learn(obs, action, reward, next_obs, next_action, done)
action = next_action # 把下一个动作更新为现在的动作
obs = next_obs # 把下一个状态更新为现在的状态
total_reward += reward
total_steps += 1 # 计算step数
if render:
env.render() #渲染新的一帧图形
if done:
break
return total_reward, total_steps
# 测试函数
def test_episode(env, agent):
total_reward = 0
obs = env.reset()
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done, _ = env.step(action)
total_reward += reward
obs = next_obs
time.sleep(0.5)
env.render()
if done:
print('test reward = %.1f' % (total_reward))
break
def main():
# env = gym.make("FrozenLake-v0", is_slippery=False) # 0 left, 1 down, 2 right, 3 up
# env = FrozenLakeWapper(env)
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
# 创建一个图形界面
env = CliffWalkingWapper(env)
# 实例化agent
agent = SarsaAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1)
is_render = False
for episode in range(500):
ep_reward, ep_steps = run_episode(env, agent, is_render)
print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps,
ep_reward))
# 每隔20个episode渲染一下看看效果
if episode % 20 == 0:
is_render = True
else:
is_render = False
# 训练结束,查看算法效果
test_episode(env, agent)
if __name__ == "__main__":
main()
生成图形环境的文件gridworld.py的代码如下:
import gym
import turtle
import numpy as np
# turtle tutorial : https://docs.python.org/3.3/library/turtle.html
def GridWorld(gridmap=None, is_slippery=False):
if gridmap is None:
gridmap = ['SFFF', 'FHFH', 'FFFH', 'HFFG']
env = gym.make("FrozenLake-v0", desc=gridmap, is_slippery=False)
env = FrozenLakeWapper(env)
return env
class FrozenLakeWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.max_y = env.desc.shape[0]
self.max_x = env.desc.shape[1]
self.t = None
self.unit = 50
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for _ in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for i in range(self.desc.shape[0]):
for j in range(self.desc.shape[1]):
x = j
y = self.max_y - 1 - i
if self.desc[i][j] == b'S': # Start
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'F': # Frozen ice
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'G': # Goal
self.draw_box(x, y, 'yellow')
elif self.desc[i][j] == b'H': # Hole
self.draw_box(x, y, 'black')
else:
self.draw_box(x, y, 'white')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
class CliffWalkingWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.t = None
self.unit = 50
self.max_x = 12
self.max_y = 4
def draw_x_line(self, y, x0, x1, color='gray'):
assert x1 > x0
self.t以上是关于强化学习入门级实践教学的主要内容,如果未能解决你的问题,请参考以下文章
利用 AI 强化学习算法,训练50级比卡超,单挑70级超梦!