DQN神经网络小结(Pytorch版)

Posted Huterox

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DQN神经网络小结(Pytorch版)相关的知识,希望对你有一定的参考价值。

文章目录

前言

阅读此篇文章你将懂得,什么是QN ,什么是 Sarsa。如何手动编写环境,你将知道什么是DQN。此篇文章也是参考了大量的资料最后总结出来的,同时这个也是我两天的小结。

在开始之前我们需要介绍一下什么是 QN 然后什么是 DQN 什么是RL,这个东西能干什么?有什么神奇之处,这个其实也是我再次体会到数学的魅力的一次邂逅。

是什么是RL

RL 是一种机器学习方式叫做强化学习,它有什么特点,区别与监督学习它不需要我们手动提供标签,区别去无监督学习,它有一定的反馈机制,等价于提供了“标签” 。

所以什么是RL,这玩意是一种特殊的学习策略。为什么要学这个,其实也是巧合,有个老哥刚好在玩这个,然后我就发现这个和GAN有点像,它的策略。GAN我不行,这个我觉得我还是可以look look 的。而且我发现我似乎找到了一个好玩的点,所以抱着试一试的心态俺想look look。

今天也是会有两篇博文,一篇是这个,还有一篇是关于操作系统的,昨天晚上推那个DQN头大,就随便写了个银行家算法换个脑子,随便交个操作系统作业,水篇博文,弥补一下最近欠下的博文。我可是日更作者!地球不停转,老子不放假!

什么是QN与Sarsa

这两个玩意呢,其实是都是强化学习的一种策略,一种叫做在线学习,一种叫做离线学习。两者其实都是类似的。

QN

SARSA

看到这两张图有点奇怪,没关系,因为接下来我们要搞的就是这两个玩意。

场景假设

为了更好地去理解RL,这里我将举个简单的例子。
我们举一个下五子棋的例子。
假设这个是一张棋盘。

看到没有,上面你下五子棋的过程其实就是类似于强化学习的一个过程,一个你训练完之后,使用模型的过程。

理论上来说,只要我知道了棋盘的所有情况,并且我得到了每一种情况的最优解,然后我找到当前情况下,我原来知道的“得分情况”选择最大的就好了。换一句话说其实就是 按照棋谱 去打,只不过这个棋谱非常的详细,可以直接判断出下载哪儿。而这个棋谱 是如何得到的,是通过强化学习得到的。

所以这里引出一个问题,现在我们知道了,我们想要无敌需要一个 棋谱。 但是棋谱要怎么拿到手咧?

这个就是我们强化学习的训练过程了。

训练过程

我们前面说了,我们训练的目的就是为了得到一本 超级流批 的棋谱。

那么这个棋谱怎么来,这里就要扯到,马尔可夫决策了。

这里我们再举一个简单一点的例子,因为下五子棋的例子还是比较复杂的,我这边作图不方便。

我们重新设计一个游戏,这个例子我也是参考了这个家伙的例子
python实现强化学习

不过我们要在这个例子做一个改造,我们重新设计这样的游戏。我们叫这个游戏叫 走棋

我们的目标就是为了后面能够得到一个 “棋谱” 不过这个棋谱是张这个样子的

小人会这样干,在当前第一步的时候,他会看 得分最高的选择是什么,是往右,于是他会往右走
然后一直往右走,知道到达重点。(他那个例子其实是走5步)

ok ,回到主题,我们要如何得到这种表呢,每次就是用马尔科夫决策。

这里的理论解释可以查看这篇文章,我这边主要是解释大体的参数,解读一些方程的含义和参数。

【详解+推导!!】马尔可夫决策过程

累计回报

关于这个方程,我想说的是,这个G1代表的是其实是当前走了这一步,我能够拿到的奖励加上,我走了这一步,我后面几步可能会得到的奖励。GAMME 表示的是损失率。为啥,我们需要有累计回报这中东西,其实很简单。

用我们一句古话来总结就是 “眼观六路,耳听八方”。强化学习最神奇的地方就在于,明明我每一步走的都是当前最优(奖励最大)为什么最后我走到的是整体最优,而不会陷入局部最优,问题就在这里。我们决定要选择这一步的时候,不仅仅考虑到了当前这一步能够拿到的奖励,我还考虑到了下一步,下下步,下下下步能够拿到的奖励,这样一来我选择的这一步就是在这一段周期内,的最优,这样一来就没那么容易出现局部最优的情况。

那么t 是什么,是 step 呀。

例如我G1 就是 当前这一步 R1 + 下一步的奖励 * GAMME …
那么对于第二步 G2 不就是 G2 + 下一步* GAMME …

值函数

状态行为函数

这两个函数,对于值函数,这个我们不用管,那篇文章很清楚了。
我这理还有提的就是 状态行为函数。

这个函数其实就是我们刚刚说的表格。

并且这个函数在我们的伪代码和实际代码当中体现很明显。

Q-Leaning

ok,到这里我想理论推导应该是差不多了,接下来是我们实际的代码。

名词与数据结构定义

现在我们来定义几个名词

Reward: 这个就是R 也就是咱们的奖励那个q_table 填写的值就是这玩意
Action: 这个是行为,对于我们那个“走棋”里面的左右动作
Status/State 这个就是状态,在当前状态下你做出的Action 能够得到的 Reward。

接下来是咱们在代码当中需要的数据结构定义了。


这里我们显然只需要一个玩意
一个二维表呀。

代码流程

这个的话就需要看到我们刚刚的伪代码了。这里大概就几步。
迭代中:
计算我们的实际值(累计回报)
更新行为函数(Q表)
我们都是按照最大的更新。

编码

import numpy as np
import pandas as pd
import time

np.random.seed(2) 
#作用和pytorch的那个一样


N_STATES = 6
ACTIONS = ['left','right']
EPSILON = 0.9 #greedy
ALPHA = 0.1 #学习速率
GAMMER = 0.9
ECPHOS = 10 #轮数
FRESH_TIME = 0.01 #演示效果用的


def Init(n_states,actions):
    #初始化
    table = pd.DataFrame(
        np.zeros((n_states-1,len(actions))),
        columns=actions
    )
    return table


def ChoseAction(state:int,QTable:pd.DataFrame):
    state_actions = QTable.iloc[state,:]
    # 大于0.9 也就是0.1部分随便选择
    gailv = np.random.uniform()
    if(gailv>EPSILON or state_actions.all(0)==0):
        action = np.random.choice(ACTIONS)
    else:
        action = state_actions.idxmax()
    return action

def GetReward(S,A):
    #我觉得这个有点类似与一个损失函数,只是这个损失函数很特殊
    #这个是模拟游戏环境,同时也是假设如果我选择了a1 我会得到的R和下一步对应的步数是几
    if (A=="right"):
        if(S==N_STATES-2):
            #我们是假设往右走的
            S_= 'win'
            R = 1
        else:
            S_ = S+1
            R = 0
    else:
        R = 0
        if S==0:
            S_ = S
        else:
            S_ = S-1
    return S_,R

def updateEnvShow(S,ecpho,step_counter):
    #在终端可视化显示行走的过程
    env_list = ['-'] * (N_STATES-1) +['F']
    if(S=='win'):
        interaction = "当前训练轮数 %s: 胜利时走的步数= %s "%(ecpho+1,step_counter)
        print("\\r".format(interaction),end="")
        time.sleep(2)
        print("\\r",end="")
    else:
        env_list[S]='o'
        interaction=".".join(env_list)
        print("\\r".format(interaction),end='')
        time.sleep(FRESH_TIME)


def QLearning():
    QTable = Init(N_STATES, ACTIONS)  
    for ecpho in range(ECPHOS): 
        step_counter = 0
        S = 0
        # 是否回合结束
        isWin = False
        updateEnvShow(S, ecpho, step_counter)
        while not isWin:

            #选择行为
            A = ChoseAction(S, QTable)
            # 得到当前行为会得到的Reward,以及下一步的情况
            S_, R = GetReward(S, A)
            # 估算的(状态-行为)值
            q_predict = QTable.loc[S, A]
            if S_ != 'win':

                # 实际的(状态-行为)值 这个就是类似与G1
                q_target = R + GAMMER * QTable.iloc[S_, :].max()

            else:
                #  实际的(状态-行为)值 (回合结束)
                q_target = R
                isWin = True    

            QTable.loc[S, A] += ALPHA * (q_target - q_predict)  #  QTable 更新
            S = S_  # 探索者移动到下一个 state
            # 环境更新显示
            updateEnvShow(S, ecpho, step_counter+1)

            step_counter += 1
    return QTable
if __name__ == '__main__':
    QTable = QLearning()
    print("\\r\\nQ-table:\\n")
    print(QTable)


SARSA

现在我想大家应该已经理解了QLearning ,那么此时查看SARSA。

这个算法其实和QLearning很像

区别在哪,或者说为什么要有这个算法咧?

选择动作函数

其实区别就在于选择下一步的策略。
我们可以重点看到这个函数:

我们返回的动作有一定概率不是奖励最大的动作。

对于QLearning来说,他每一次都会现在新的动作,也就是每一次都有概率不是选择最优的动作。

但是Sarsa不是呀,它下一个动作就是选择下一步当中奖励最大的。

换一句话说QLearning 胆子大,Sarsa保守。有着不同的收敛性,相对而言QL会想着找到更好的路,Sarsa比较保守,走“稳路”。

代码

代码也是类似的,只需要修改一个函数

def SARSA():
    QTable = Init(N_STATES, ACTIONS)
    for ecpho in range(ECPHOS):
        step_counter = 0
        S = 0
        # 是否回合结束
        isWin = False
        updateEnvShow(S, ecpho, step_counter)
        A = ChoseAction(S, QTable)  # 先初始化选择行为
        while not isWin:

            S_, R = GetReward(S, A)
            try:
                A_ = ChoseAction(S_, QTable)
            except:
                # 这里说明已经到了终点(如果报错)
                pass
            q_predict = QTable.loc[S, A]
            if S_ != 'win':
                q_target = R + GAMMER * QTable.iloc[S_, :].max()

            else:
                q_target = R
                isWin = True

            QTable.loc[S, A] += ALPHA * (q_target - q_predict)  #  QTable 更新

            S = S_  
            A = A_  
            updateEnvShow(S, ecpho, step_counter+1)

            step_counter += 1
    return QTable

DQN 神经网络

OK,前面铺垫了辣么多,是时候来讲讲咱们的DQN了。
还记得我先前说的那个五子棋的玩意吗,我们说我们需要得到一个棋谱,这个棋谱记录了所有的状态,我们只要根据那个对应的状态得到的得分表去选择我们的得分最多的那个位置。但是问题来了,例如像围棋这种有很多情况的玩意,你压根没法去存储那样的棋谱,并且你寻找这样的情况也是非常困难的。

所以有什么方式能够优化呢。
我们原来说 Q(s,a) 是什么东西,是一个“状态行为函数”

是一个函数呀,既然如此,我为什么不能去直接使用一种函数呢,你只需要输入当前的State然后我就能告诉你对应的action 的值

那么我们怎么才能找到这样的函数呀?
什么东西能够达到这样的效果咧?

当然是神经网络呀!这玩意不是可以“拟合”出任何我想要的函数嘛!!!
于是我们使用神经网络来代替了我们的Q表

流程

此时我们的流程也发生了变化。
这里的话我们是有两个版本的一个是2013年的

还有一个是现在使用的2015年的

这里多引入了一个记忆库的玩意,原因是为了打破原来数据之间的连贯性,也就是数据之间的强相关性。

原来我们的数据和上一步或者上几步之间是紧密相连的,换一句话来说,对于相邻的数据之间的相关性较高,这个就会导致一个问题,例如我们在做拟合的时候,就会导致局部拟合效果很好但是全局效果差,也就是两个图像部分贴合但是整体贴合度差。(还好老子玩过数学建模)

所以为了降低这玩意,这里选择了存储一定步骤的数据,然后随机选择一部分,然后去训练,你拟合,这样就可以降低训练数据之间的相关性。

你可以这样理解。如果我的输入是连续的,不是打乱的,此时我在做拟合,那么,为了拟合当前的这一端,我可能就会破坏另一端,最后当我拟合完毕后,可能只有一端是贴合的。现在我数据大散,那么我的拟合的点在图像上是均匀的,那么总体上贴合就会比较好。

那么流程如下:

预估“表”与实际“表”

此外还有一个细节。在我们原来的时候

但是现在的话,由于我们是直接使用了这种“函数”所以我们可以单独使用两个神经网络去分别代表实际和估计(预测)



而我们的损失函数也很巧妙,什么时候我们会有最优解,显然最明显的特征是,我们的系统趋于稳定。
于是:

编码

坑点

在开始之前先来说说我踩到的坑。
第一个:

对于二维张量来说,max(a,1)是取行的最大值,因为我们这里的action是1x2的,要按照行来,但是一维的张量不行,于是只能取0

第二个:

是我们构建神经网络的坑,我原来使用的是ReLu()也就是nn下面的,但是报错,有毛病,后来我又试了一些Function下面的relu,结果ok了,得到了我想要的输出维度。

环境修改

现在俺们仿造一下gym,我们自己把刚刚的那个“走棋” 给做成环境出来。

我们封装一下:


import time

"""
这个环境是这样的,
每一步的observer只有一个,
到重点一共是5步,往右走一共6个格子,所以终点的下标是4
"""

class Env(object):

    def __init__(self):

        self.N_STATES = 6
        self.Actions = ['left','right']
        self.FRESH_TIME = 0.001
        self.Win = 'win'

    def updateEnvShow(self,S, ecpho, step_counter):
        # 在终端可视化显示行走的过程
        env_list = ['-'] * (self.N_STATES - 1) + ['F']
        if (S == self.Win):
            interaction = "当前训练轮数 %s: 胜利时走的步数= %s " % (ecpho + 1, step_counter)
            print("\\r".format(interaction), end="")
            time.sleep(2)
            print("\\r", end="")
        else:

            env_list[S] = 'o'
            interaction = ".".join(env_list)
            print("\\rins:当前训练轮数:ec,当前步数st".format(ins=interaction,ec=ecpho+1,st=step_counter),end="")
            time.sleep(self.FRESH_TIME)

    def getStatus(self):
        return 1

    def getActions(self):
        return self.Actions

    def GetReward(self,S, A):
        # 我觉得这个有点类似与一个损失函数,只是这个损失函数很特殊
        # 这个是模拟游戏环境,同时也是假设如果我选择了a1 我会得到的R和下一步对应的步数是几

        if (A == 1):
            if (S ==  - 2):
                # 我们是假设往右走的
                S_ = self.Win
                R = 1
            else:
                S_ = S + 1
                R = 0
        else:
            R = 0
            if S == 0:
                S_ = S
            else:
                S_ = S - 1
        return S_, R
    def reset(self):
        #开始
        self.updateEnvShow(0,0,0)
        return 0

运行代码

现在我们开始进行完整的编码

import torch
from torch.nn import *
from torch import nn
from DQNNetWork.Env import Env
import numpy as np
import torch.nn.functional as F

#初始化运行环境
env = Env()

N_Status= env.getStatus()
HIDDEN_NUMBERS = 5
ACTIONS = env.getActions()
ACTIONSLEN = len(ACTIONS)
MEMORY_CAPACITY = 30
LR = 0.01
EPSILON = 0.9
BATCH_SIZE = 2
TARGET_REPLACE_ITER = 10
GAMMA = 0.9
ECPHOS = 50
class NetWork(nn.Module):
    #代替表格Q的神经网络
    def __init__(self, ):
        super(NetWork, self).__init__()
        self.fc1 = nn.Linear(N_Status, 50)
        self.fc1.weight.data.normal_(0, 0.1)   # initialization
        self.out = nn.Linear(50, ACTIONSLEN)
        self.out.weight.data.normal_(0, 0.1)   # initialization

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        actions_value = self.out(x)
        return actions_value


class DQN(object):
    def __init__(self):

        #在我们使用表格的时候,我们需要的是一个现实计算的V,和一个估计的
        #然而实际上我们做的时候其实只是把上一轮的作为预估的表然后更新下一轮
        #那么现在我们直接使用两个神经网络一个就是实际的“表”一个是预估的表
        #还有一个好处,原来用一个表是预估和实际的步长只有1,现在我可以随意设置
        #于是这个过程就变成了两个网络相互学习的过程
        self.eval_net, self.target_net = NetWork(), NetWork()

        self.LearnStepCount = 0
        self.MemoryCount = 0                                         # 记录了几条记忆
        self.Memory= np.zeros((MEMORY_CAPACITY, N_Status * 2 + 2))     # 初始化记忆
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)
        self.loss_func = nn.MSELoss()


    #我们的Status是只有一个状态的
    def ChooseAction(self,s):
        s = torch.FloatTensor([s])

        if np.random.uniform() < EPSILON:  # greedy
            actions_value = self.eval_net.forward(s)
            #由于我们是1x2的所以我们是0
            action = torch.max(actions_value, 0)[1].data.numpy()
            #选择出我们value最大对应的下标0,1
        else:
            action = np.random.randint(0, ACTIONSLEN)
        return action

    def Remember(self, s, a, r, s_):
        #这个是我们的记忆体
        transition = np.hstack((s, [a, r], s_))
        index = self.MemoryCount % MEMORY_CAPACITY
        self.Memory[index, :] = transition
        self.MemoryCount += 1

    def Learn(self):
        # 达到一定的训练步数后,我们更新我们的target网络
        #我们的这个函数的目的只有一个得到俺们的Q表,Q神经网络
        if self.LearnStepCount % TARGET_REPLACE_ITER == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())
        self.LearnStepCount += 1

        # sample batch transitions
        SelectMemory = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
        selectM = self.Memory[SelectMemory, :]
        S_s = torch.FloatTensor(selectM[:, :N_Status])
        S_a = torch.LongTensor(selectM[:, N_Status:N_Status+1].astype(int))
        S_r = torch.FloatTensor(selectM[:, N_Status+1:N_Status+2])
        S_s_ = torch.FloatTensor(selectM[:, -N_Status:])

        #这一步得到了我们一个batch_size的最佳值(最佳)
        q_eval = self.eval_net(S_s)
        q_eval = q_eval.gather(1, S_a)
        #这个是下一步动作对应的值
        q_next = self.target_net(S_s_).detach()
        #更新我们的G
        # shape (batch, 1)
        q_target = S_r + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 

以上是关于DQN神经网络小结(Pytorch版)的主要内容,如果未能解决你的问题,请参考以下文章

强化学习 DQN pytorch实例

DQN 强化学习

基于Pytorch的强化学习(DQN)之REINFORCE VS A2C

基于Pytorch的强化学习(DQN)之价值学习

《动手学深度学习》线性回归(PyTorch版)

pytorch 笔记: DQN(experience replay)