深度强化学习-DDPG算法原理与代码

Posted indigo love

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度强化学习-DDPG算法原理与代码相关的知识,希望对你有一定的参考价值。

深度强化学习-DDPG算法原理与代码

引言

1 DDPG算法简介

2 DDPG算法原理

2.1 经验回放

2.2 目标网络

2.2.1 算法更新过程

2.2.2 目标网络的更新

2.2.3 引入目标网络的目的

2.3 噪声探索

3 DDPG算法伪代码

 4 代码实现

5 实验结果

6 结论


引言

Deep Deterministic Policy Gradient (DDPG)算法是DeepMind团队提出的一种专门用于解决连续控制问题的在线式(on-line)深度强化学习算法,它其实本质上借鉴了Deep Q-Network (DQN)算法里面的一些思想。本文就带领大家了解一下这个算法,论文和代码的链接见下方。

论文:https://arxiv.org/pdf/1509.02971.pdf

代码:https://github.com/indigoLovee/DDPG

喜欢的话请点个star哦!

1 DDPG算法简介

在DDPG算法之前,我们在求解连续动作空间问题时,主要有两种方式:一是对连续动作做离散化处理,然后再利用强化学习算法(例如DQN)进行求解。二是使用Policy Gradient (PG)算法 (例如Reinforce) 直接求解。但是对于方式一,离散化处理在一定程度上脱离了工程实际;对于方式二,PG算法在求解连续控制问题时效果往往不尽人意。为此,DDPG算法横空出世,在许多连续控制问题上取得了非常不错的效果。

DDPG算法是Actor-Critic (AC) 框架下的一种在线式深度强化学习算法,因此算法内部包括Actor网络和Critic网络,每个网络分别遵从各自的更新法则进行更新,从而使得累计期望回报最大化。

2 DDPG算法原理

DDPG算法将确定性策略梯度算法和DQN算法中的相关技术结合在一起,之前我们在讲DQN算法时,详细说明了其中的两个重要的技术:经验回放和目标网络。具体而言,DDPG算法主要包括以下三个关键技术:

(1)经验回放:智能体将得到的经验数据放入Replay Buffer中,更新网络参数时按照批量采样。

(2)目标网络:在Actor网络和Critic网络外再使用一套用于估计目标的Target Actor网络和Target Critic网络。在更新目标网络时,为了避免参数更新过快,采用软更新方式。

(3)噪声探索:确定性策略输出的动作为确定性动作,缺乏对环境的探索。在训练阶段,给Actor网络输出的动作加入噪声,从而让智能体具备一定的探索能力。

2.1 经验回放

经验回放就是一种让经验概率分布变得稳定的技术,可以提高训练的稳定性。经验回放主要有“存储”和“回放”两大关键步骤:

存储:将经验以形式存储在经验池中。

回放:按照某种规则从经验池中采样一条或多条经验数据。

存储的角度来看,经验回放可以分为集中式回放和分布式回放:
集中式回放:智能体在一个环境中运行,把经验统一存储在经验池中。

分布式回放:多个智能体同时在多个环境中运行,并将经验统一存储在经验池中。由于多个智能体同时生成经验,所以能够使用更多资源的同时更快地收集经验。

采样的角度来看,经验回放可以分为均匀回放和优先回放:
均匀回放:等概率从经验池中采样经验。
优先回放:为经验池中每条经验指定一个优先级,在采样经验时更倾向于选择优先级更高的经验。一般的做法是,如果某条经验(例如经验)的优先级为,那么选取该经验的概率为:

优先回放可以具体参照这篇论文 :优先经验回放

经验回放的优点:

1.在训练Q网络时,可以打破数据之间的相关性,使得数据满足独立同分布,从而减小参数更新的方差,提高收敛速度。

2.能够重复使用经验,数据利用率高,对于数据获取困难的情况尤其有用。

经验回放的缺点:

无法应用于回合更新和多步学习算法。但是将经验回放应用于Q学习,就规避了这个缺点。

 代码中采用集中式均匀回放,具体如下:

import numpy as np


class ReplayBuffer:
    def __init__(self, max_size, state_dim, action_dim, batch_size):
        self.mem_size = max_size
        self.batch_size = batch_size
        self.mem_cnt = 0

        self.state_memory = np.zeros((self.mem_size, state_dim))
        self.action_memory = np.zeros((self.mem_size, action_dim))
        self.reward_memory = np.zeros((self.mem_size, ))
        self.next_state_memory = np.zeros((self.mem_size, state_dim))
        self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)

    def store_transition(self, state, action, reward, state_, done):
        mem_idx = self.mem_cnt % self.mem_size

        self.state_memory[mem_idx] = state
        self.action_memory[mem_idx] = action
        self.reward_memory[mem_idx] = reward
        self.next_state_memory[mem_idx] = state_
        self.terminal_memory[mem_idx] = done

        self.mem_cnt += 1

    def sample_buffer(self):
        mem_len = min(self.mem_size, self.mem_cnt)
        batch = np.random.choice(mem_len, self.batch_size, replace=False)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.next_state_memory[batch]
        terminals = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminals

    def ready(self):
        return self.mem_cnt >= self.batch_size

2.2 目标网络

由于DDPG算法是基于AC框架,因此算法中必然含有Actor和Critic网络。另外每个网络都有其对应的目标网络,所以DDPG算法中包括四个网络,分别是Actor网络,Critic网络,Target Actor网络和Target Critic网络 。本节主要介绍一下DDPG算法的更新过程,目标网络的更新方式以及引入目标网络的目的

2.2.1 算法更新过程

算法更新主要更新的是Actor和Critic网络的参数,其中Actor网络通过最大化累积期望回报来更新,Critic网络通过最小化评估值与目标值之间的误差来更新在训练阶段,我们从Replay Buffer中采样一个批次的数据,假设采样到的一条数据为,Actor和Critic网络更新过程如下。

Critic网络更新过程:利用Target Actor网络计算出状态下的动作:

 这里需要注意:计算出动作后不需要加入噪声。然后利用Target Critic网络计算出状态动作对的目标值:

接着利用 Critic网络计算出状态动作对的评估值:

 最后利用梯度下降算法最小化评估值和期望值之间的差值,从而对Critic网络中的参数进行更新:

 上述过程其实和DQN算法非常类似。

Actor网络更新过程:利用Actor网络计算出状态下的动作:

这里需要注意:计算出动作后不需要加入噪声。然后利用Critic网络计算出状态动作对的评估值(即累积期望回报):

最后利用梯度上升算法最大化累积期望回报(代码实现是采用梯度下降算法优化,其实本质上都是一样的),从而对Actor网络中的参数进行更新。

至此我们就完成了对Actor和Critic网络的更新。

2.2.2 目标网络的更新

对于目标网络的更新,DDPG算法中采用软更新方式,也可以称为指数平均移动 (Exponential Moving Average, EMA)。即引入一个学习率(或者成为动量),将旧的目标网络参数和新的对应网络参数做加权平均,然后赋值给目标网络:

Target Actor网络更新过程:

Target Critic网络更新过程:

 学习率(动量),通常取值0.005。

2.2.3 引入目标网络的目的

我们在前面提到过,引入目标网络的目的其实和DQN算法的想法是一样的。由于目标网络中的参数是通过软更新的方式来缓慢更新的,因此它的输出会更加稳定,利用目标网络来计算目标值自然也会更加稳定,从而进一步保证Critic网络的学习过程更加平稳。试想,如果直接使用Critic网络来计算目标值

那么由于Critic网络在不断更新,网络波动剧烈,自然目标值的变化也很剧烈。在学习过程中,让Critic网络的评估值追逐一个变化剧烈的目标,很容易出现网络震荡,从而导致整个学习过程坍塌。

上述是一种目的,其实还有另外一个目的。当使用Critic网络来计算目标值时(如上式所示),它其实本质上是一种自举 (Bootstrapping) 的过程。然后让不断逼近,很容易导致网络过估计。因为当出现过估计时,会将其回传至,导致该项也出现了过估计,从而形成了一种正反馈,最终导致整个网络出现过估计。

自举 (Bootstrapping)

表示在当前值函数的计算过程中,会利用到后续的状态值函数或动作值函数,即利用到后续的状态或者状态动作对。

那么过估计会出现什么问题呢?如果过估计是均匀的,对于最终的决策不会造成影响;但是如果不均匀,对于最终的决策会产生很大影响。我们举个栗子吧,大家很容易就能明白。

 上图中我们假设有三个动作,每个动作的实际动作价值依次是200,100和230,显然动作3的动作价值是最高的,智能体会选择动作3。如果网络出现过估计,并且是均匀的,假设过估计的量是100,那么网络评估出来的动作价值就依次是300,200和330,显然动作3的动作价值依然是最高的,智能体依旧会选择动作3。因此,均匀过估计对于最终的决策并不会产生影响。

 同样我们假设有三个动作,每个动作的实际动作价值依次是200,100和230,显然动作3的动作价值是最高的,智能体会选择动作3。如果网络出现不均匀过估计,评估出来的动作价值依次是280,300和240,此时显然动作2的动作价值是最高的,智能体会选择动作2。但是实际上动作2的真实动作价值是最低的,即该动作是最差的。因此,不均匀过估计对于最终的决策会产生很大的影响。

然而实际上网络的过估计是非均匀的,因此需要避免这个问题,本质上就是要解决Bootstrapping问题。采用目标网络后就能解决这个问题

此时我们再让逼近目标值时,就已经不再是自举了(大家可以对照自举的含义仔细观察一下)。

2.3 噪声探索

 探索对于智能体来说是至关重要的,而确定性策略“天生”就缺乏探索能力,因此我们需要人为地给输出的动作上加入噪声,从而让智能体具备探索能力。在DDPG算法中,作者采用Ornstein Uhlenbeck过程作为动作噪声。Ornstein Uhlenbeck过程是用下列随机微分方程定义的 (以一维的情况为例):

其中是参数(),是标准Brownain运动。当初始扰动是在原点的单点分布(即限定),并且时,上述方程的解为

 (证明:将代入,化简可得。将此式从0积到,得。当时化简可得结果。)

这个解得均值为0,方差为,协方差为

(证明:由于均值为0,所以。另外,Ito Isometry告诉我们,所以,进一步化简可得结果。)

对于总有,所以 。据此可知,使用Ornstein Uhlenbeck过程让相邻扰动正相关,进而让动作向相近的方向偏移。

OU噪声的代码实现:

class OUActionNoise:
    def __init__(self, mu, sigma=0.15, theta=0.2, dt=1e-2, x0=None):
        self.theta = theta
        self.mu = mu
        self.sigma = sigma
        self.dt = dt
        self.x0 = x0
        self.reset()

    def __call__(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \\
                self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
        self.x_prev = x

        return x

    def reset(self):
        self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)

看完OU噪声后,可能很多小伙伴是懵的,这个也太复杂了。不过我会告诉大家,其实OU噪声是没必要的,因为我们完全可以采用服从正态分布的噪声来取代它,实验结果也证实了这一点。因此Twin Delayed Deep Deterministic policy gradient (TD3)算法舍弃了OU噪声,而是采用服从正态分布的噪声,实现起来更加简单。

另外还需要提醒大家一点:噪声只会加在训练阶段Actor网络输出的动作上,推理阶段不要加上噪声,以及在更新网络参数时也不要加上噪声,前面已经提醒过了。因为我们只需要在训练阶段让智能体具备探索能力,推理时是不需要的。

3 DDPG算法伪代码

 4 代码实现

Actor和Critic网络的代码实现(networks.py):

import torch as T
import torch.nn as nn
import torch.optim as optim

device = T.device("cuda:0" if T.cuda.is_available() else "cpu")


def weight_init(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.0)
    elif isinstance(m, nn.BatchNorm1d):
        nn.init.constant_(m.weight, 1.0)
        nn.init.constant_(m.bias, 0.0)


class ActorNetwork(nn.Module):
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
        super(ActorNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, fc1_dim)
        self.ln1 = nn.LayerNorm(fc1_dim)
        self.fc2 = nn.Linear(fc1_dim, fc2_dim)
        self.ln2 = nn.LayerNorm(fc2_dim)
        self.action = nn.Linear(fc2_dim, action_dim)

        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        self.apply(weight_init)
        self.to(device)

    def forward(self, state):
        x = T.relu(self.ln1(self.fc1(state)))
        x = T.relu(self.ln2(self.fc2(x)))
        action = T.tanh(self.action(x))

        return action

    def save_checkpoint(self, checkpoint_file):
        T.save(self.state_dict(), checkpoint_file)

    def load_checkpoint(self, checkpoint_file):
        self.load_state_dict(T.load(checkpoint_file))


class CriticNetwork(nn.Module):
    def __init__(self, beta, state_dim, action_dim, fc1_dim, fc2_dim):
        super(CriticNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, fc1_dim)
        self.ln1 = nn.LayerNorm(fc1_dim)
        self.fc2 = nn.Linear(fc1_dim, fc2_dim)
        self.ln2 = nn.LayerNorm(fc2_dim)
        self.fc3 = nn.Linear(action_dim, fc2_dim)
        self.q = nn.Linear(fc2_dim, 1)

        self.optimizer = optim.Adam(self.parameters(), lr=beta, weight_decay=0.001)
        self.apply(weight_init)
        self.to(device)

    def forward(self, state, action):
        x_s = T.relu(self.ln1(self.fc1(state)))
        x_s = self.ln2(self.fc2(x_s))
        x_a = self.fc3(action)
        x = T.relu(x_s + x_a)
        q = self.q(x)

        return q

    def save_checkpoint(self, checkpoint_file):
        T.save(self.state_dict(), checkpoint_file)

    def load_checkpoint(self, checkpoint_file):
        self.load_state_dict(T.load(checkpoint_file))

注:Actor和Critic网络中前面两个Linear层后面都跟上了Layer Normalization (LN)层。因为我在实验时发现了一个非常有趣的现象,如果不加LN层,或者加入Batch Normalization (BN)层,整个训练过程很容易坍塌或者训练效果很差,具体原因我也不是特别清楚。感兴趣的小伙伴可以把代码git下来跑一遍,如果知道原因的话不妨一起交流。

DDPG算法的代码实现(DDPG.py):

import torch as T
import torch.nn.functional as F
import numpy as np
from networks import ActorNetwork, CriticNetwork
from buffer import ReplayBuffer

device = T.device("cuda:0" if T.cuda.is_available() else "cpu")


class DDPG:
    def __init__(self, alpha, beta, state_dim, action_dim, actor_fc1_dim,
                 actor_fc2_dim, critic_fc1_dim, critic_fc2_dim, ckpt_dir,
                 gamma=0.99, tau=0.005, action_noise=0.1, max_size=1000000,
                 batch_size=256):
        self.gamma = gamma
        self.tau = tau
        self.action_noise = action_noise
        self.checkpoint_dir = ckpt_dir

        self.actor = ActorNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                  fc1_dim=actor_fc1_dim, fc2_dim=actor_fc2_dim)
        self.target_actor = ActorNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                         fc1_dim=actor_fc1_dim, fc2_dim=actor_fc2_dim)
        self.critic = CriticNetwork(beta=beta, state_dim=state_dim, action_dim=action_dim,
                                    fc1_dim=critic_fc1_dim, fc2_dim=critic_fc2_dim)
        self.target_critic = CriticNetwork(beta=beta, state_dim=state_dim, action_dim=action_dim,
                                           fc1_dim=critic_fc1_dim, fc2_dim=critic_fc2_dim)

        self.memory = ReplayBuffer(max_size=max_size, state_dim=state_dim, action_dim=action_dim,
                                   batch_size=batch_size)

        self.update_network_parameters(tau=1.0)

    def update_network_parameters(self, tau=None):
        if tau is None:
            tau = self.tau

        for actor_params, target_actor_params in zip(self.actor.parameters(),
                                                     self.target_actor.parameters()):
            target_actor_params.data.copy_(tau * actor_params + (1 - tau) * target_actor_params)

        for critic_params, target_critic_params in zip(self.critic.parameters(),
                                                       self.target_critic.parameters()):
            target_critic_params.data.copy_(tau * critic_params + (1 - tau) * target_critic_params)

    def remember(self, state, action, reward, state_, done):
        self.memory.store_transition(state, action, reward, state_, done)

    def choose_action(self, observation, train=True):
        self.actor.eval()
        state = T.tensor([observation], dtype=T.float).to(device)
        action = self.actor.forward(state).squeeze()

        if train:
            noise = T.tensor(np.random.normal(loc=0.0, scale=self.action_noise),
                             dtype=T.float).to(device)
            action = T.clamp(action+noise, -1, 1)
        self.actor.train()

        return action.detach().cpu().numpy()

    def learn(self):
        if not self.memory.ready():
            return

        states, actions, reward, states_, terminals = self.memory.sample_buffer()
        states_tensor = T.tensor(states, dtype=T.float).to(device)
        actions_tensor = T.tensor(actions, dtype=T.float).to(device)
        rewards_tensor = T.tensor(reward, dtype=T.float).to(device)
        next_states_tensor = T.tensor(states_, dtype=T.float).to(device)
        terminals_tensor = T.tensor(terminals).to(device)

        with T.no_grad():
            next_actions_tensor = self.target_actor.forward(next_states_tensor)
            q_ = self.target_critic.forward(next_states_tensor, next_actions_tensor).view(-1)
            q_[terminals_tensor] = 0.0
            target = rewards_tensor + self.gamma * q_
        q = self.critic.forward(states_tensor, actions_tensor).view(-1)

        critic_loss = F.mse_loss(q, target.detach())
        self.critic.optimizer.zero_grad()
        critic_loss.backward()
        self.critic.optimizer.step()

        new_actions_tensor = self.actor.forward(states_tensor)
        actor_loss = -T.mean(self.critic(states_tensor, new_actions_tensor))
        self.actor.optimizer.zero_grad()
        actor_loss.backward()
        self.actor.optimizer.step()

        self.update_network_parameters()

    def save_models(self, episode):
        self.actor.save_checkpoint(self.checkpoint_dir + 'Actor/DDPG_actor_.pth'.format(episode))
        print('Saving actor network successfully!')
        self.target_actor.save_checkpoint(self.checkpoint_dir +
                                          'Target_actor/DDPG_target_actor_.pth'.format(episode))
        print('Saving target_actor network successfully!')
        self.critic.save_checkpoint(self.checkpoint_dir + 'Critic/DDPG_critic_'.format(episode))
        print('Saving critic network successfully!')
        self.target_critic.save_checkpoint(self.checkpoint_dir +
                                           'Target_critic/DDPG_target_critic_'.format(episode))
        print('Saving target critic network successfully!')

    def load_models(self, episode):
        self.actor.load_checkpoint(self.checkpoint_dir + 'Actor/DDPG_actor_.pth'.format(episode))
        print('Loading actor network successfully!')
        self.target_actor.load_checkpoint(self.checkpoint_dir +
                                          'Target_actor/DDPG_target_actor_.pth'.format(episode))
        print('Loading target_actor network successfully!')
        self.critic.load_checkpoint(self.checkpoint_dir + 'Critic/DDPG_critic_'.format(episode))
        print('Loading critic network successfully!')
        self.target_critic.load_checkpoint(self.checkpoint_dir +
                                           'Target_critic/DDPG_target_critic_'.format(episode))
        print('Loading target critic network successfully!')

算法仿真环境是gym库中的LunarLanderContinuous-v2环境,因此需要先配置好gym库。进入Aanconda中对应的Python环境中,执行下面的指令

pip install gym

但是,这样安装的gym库只包括少量的内置环境,如算法环境、简单文字游戏环境和经典控制环境,无法使用LunarLanderContinuous-v2。因此还要安装一些其他依赖项,具体可以参照这篇blog: AttributeError: module ‘gym.envs.box2d‘ has no attribute ‘LunarLander‘ 解决办法。如果已经配置好环境,那请忽略这一段。

训练脚本(train.py):

import gym
import numpy as np
import argparse
from DDPG import DDPG
from utils import create_directory, plot_learning_curve, scale_action

parser = argparse.ArgumentParser("DDPG parameters")
parser.add_argument('--max_episodes', type=int, default=1000)
parser.add_argument('--checkpoint_dir', type=str, default='./checkpoints/DDPG/')
parser.add_argument('--figure_file', type=str, default='./output_images/reward.png')

args = parser.parse_args()


def main():
    env = gym.make('LunarLanderContinuous-v2')
    agent = DDPG(alpha=0.0003, beta=0.0003, state_dim=env.observation_space.shape[0],
                 action_dim=env.action_space.shape[0], actor_fc1_dim=400, actor_fc2_dim=300,
                 critic_fc1_dim=400, critic_fc2_dim=300, ckpt_dir=args.checkpoint_dir,
                 batch_size=256)
    create_directory(args.checkpoint_dir,
                     sub_paths=['Actor', 'Target_actor', 'Critic', 'Target_critic'])

    reward_history = []
    avg_reward_history = []
    for episode in range(args.max_episodes):
        done = False
        total_reward = 0
        observation = env.reset()
        while not done:
            action = agent.choose_action(observation, train=True)
            action_ = scale_action(action.copy(), env.action_space.high, env.action_space.low)
            observation_, reward, done, info = env.step(action_)
            agent.remember(observation, action, reward, observation_, done)
            agent.learn()
            total_reward += reward
            observation = observation_

        reward_history.append(total_reward)
        avg_reward = np.mean(reward_history[-100:])
        avg_reward_history.append(avg_reward)
        print('Ep:  Reward: :.1f AvgReward: :.1f'.format(episode+1, total_reward, avg_reward))

        if (episode + 1) % 200 == 0:
            agent.save_models(episode+1)

    episodes = [i+1 for i in range(args.max_episodes)]
    plot_learning_curve(episodes, avg_reward_history, title='AvgReward',
                        ylabel='reward', figure_file=args.figure_file)


if __name__ == '__main__':
    main()

训练脚本中有三个参数,max_episodes表示训练幕数,checkpoint_dir表示训练权重保存路径,figure_file表示训练结果的保存路径(其实是一张累积奖励曲线图),按照默认设置即可。

训练时还会用到画图函数和创建文件夹函数,它们都被放在utils.py脚本中:

import os
import numpy as np
import matplotlib.pyplot as plt


class OUActionNoise:
    def __init__(self, mu, sigma=0.15, theta=0.2, dt=1e-2, x0=None):
        self.theta = theta
        self.mu = mu
        self.sigma = sigma
        self.dt = dt
        self.x0 = x0
        self.reset()

    def __call__(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \\
                self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
        self.x_prev = x

        return x

    def reset(self):
        self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)


def create_directory(path: str, sub_paths: list):
    for sub_path in sub_paths:
        if not os.path.exists(path + sub_path):
            os.makedirs(path + sub_path, exist_ok=True)
            print('Create path:  successfully'.format(path+sub_path))
        else:
            print('Path:  is already existence'.format(path+sub_path))


def plot_learning_curve(episodes, records, title, ylabel, figure_file):
    plt.figure()
    plt.plot(episodes, records, color='r', linestyle='-')
    plt.title(title)
    plt.xlabel('episode')
    plt.ylabel(ylabel)

    plt.show()
    plt.savefig(figure_file)


def scale_action(action, high, low):
    action = np.clip(action, -1, 1)
    weight = (high - low) / 2
    bias = (high + low) / 2
    action_ = action * weight + bias

    return action_

另外我们还提供了测试代码,主要用于测试训练效果以及观察环境的动态渲染 (test.py):

import gym
import imageio
import argparse
from DDPG import DDPG
from utils import scale_action

parser = argparse.ArgumentParser()
parser.add_argument('--filename', type=str, default='./output_images/LunarLander.gif')
parser.add_argument('--checkpoint_dir', type=str, default='./checkpoints/DDPG/')
parser.add_argument('--save_video', type=bool, default=True)
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--render', type=bool, default=True)

args = parser.parse_args()


def main():
    env = gym.make('LunarLanderContinuous-v2')
    agent = DDPG(alpha=0.0003, beta=0.0003, state_dim=env.observation_space.shape[0],
                 action_dim=env.action_space.shape[0], actor_fc1_dim=400, actor_fc2_dim=300,
                 critic_fc1_dim=400, critic_fc2_dim=300, ckpt_dir=args.checkpoint_dir,
                 batch_size=256)
    agent.load_models(1000)
    video = imageio.get_writer(args.filename, fps=args.fps)

    done = False
    observation = env.reset()
    while not done:
        if args.render:
            env.render()
        action = agent.choose_action(observation, train=True)
        action_ = scale_action(action.copy(), env.action_space.high, env.action_space.low)
        observation_, reward, done, info = env.step(action_)
        observation = observation_
        if args.save_video:
            video.append_data(env.render(mode='rgb_array'))


if __name__ == '__main__':
    main()

测试脚本中包括五个参数,filename表示环境动态图的保存路径,checkpoint_dir表示加载的权重路径,save_video表示是否要保存动态图,fps表示动态图的帧率,rander表示是否开启环境渲染。大家只需要调整save_video和rander这两个参数,其余保持默认即可。

5 实验结果

通过平均奖励曲线可以看出,大概迭代到700步左右时算法趋于收敛。 

这是测试效果图,智能体能够很好地完成降落任务! 

6 结论

本文主要讲解了DDPG算法的原理以及代码实现。尽管它是一个非常优秀的算法,但是仍然存在一些问题需要改进,例如过估计。后面我们会讲解一下TD3算法,它其实就是在DDPG算法的基础做了一些改进工作,克服了DDPG算法中的一些问题,从而让算法的性能得到显著提升。

以上如果有出现错误的地方,欢迎各位怒斥!

深度强化学习-Dueling DQN算法原理与代码

Dueling Deep Q Network(Dueling DQN)是对DQN算法的改进,有效提升了算法的性能。如果对DQN算法还不太了解的话,可以参考我的这篇博文:深度强化学习-DQN算法原理与代码,里面详细讲述了DQN算法的原理和代码实现。本文就带领大家了解一下Dueling DQN算法,论文链接见下方。

论文:http://proceedings.mlr.press/v48/wangf16.pdf

代码:https://github.com/indigoLovee/Dueling_DQN

喜欢的话可以点个star呢。

1 Dueling DQN算法简介

Dueling DQN算法提出了一种新的神经网络结构——对偶网络(duel network)。网络的输入与DQN和DDQN算法的输入一样,均为状态信息,但是输出却有所不同。Dueling DQN算法的输出包括两个分支,分别是该状态的状态价值V(标量)和每个动作的优势值A(与动作空间同维度的向量)。DQN和DDQN算法的输出只有一个分支,为该状态下每个动作的动作价值(与动作空间同维度的向量)。具体的网络结构如下图所示:

单分支网络结构(DQN和DDQN)

 对偶网络结构(Dueling DQN)

在DQN算法的网络结构中,输入为一张或多张照片,利用卷积网络提取图像特征,之后经过全连接层输出每个动作的动作价值;在Dueling DQN算法的网络结构中,输入同样为一张或多张照片,然后利用卷积网络提取图像特征获取特征向量,输出时会经过两个全连接层分支,分别对应状态价值和优势值,最后将状态价值和优势值相加即可得到每个动作的动作价值(即绿色连线操作)。

为什么要采用对偶网络结构?

其实动机很简单:很多游戏的Q值,只受当前状态影响,无论采取什么动作区别不大。如下图所示:

 这是Atari game中的一个赛车游戏,Value表示状态价值,Advantage表示动作优势值,图中黄色部分表示注意力。当前方没有车辆时,智能体左右移动并没有影响,说明动作对Q值没有影响,但是状态对Q值很有影响。从上面两幅图可以看出,Value更加关注远方道路,因为开的越远,对应的状态值越大;Advantage没有特别注意的地方,说明动作没有影响。当前方存在车辆阻挡时,智能体的动作选择至关重要,说明动作对Q值存在影响,同样状态对Q值也会存在影响。从下面两幅图可以看出,Value同样更加关注远方道路,但是Advantge此时会关注前方车辆,因为如果不采取相应动作,智能体很有可能会发生碰撞,分数就会很低。对偶网络思想符合很多场景的设定。

2 Dueling DQN算法原理

2.1 优势函数(Advantage function)

在讲优势函数之前,我们先来回顾一些基本概念吧。

折扣回报(Discounted return):

表示从t时刻开始,未来所有奖励的加权求和。在t时刻,是未知的,它依赖于未来所有状态和动作。

动作价值函数(Action-value function):

 是回报的条件期望,将t+1时刻以后的状态和动作全部消掉。依赖于状态、动作以及策略

状态价值函数(State-value function):

的期望,将中的动作消掉。依赖于状态  和策略

最优动作价值函数(Optimal action-value function):

 动作价值函数依赖于策略,对关于求最大值,消除掉,得到只依赖于状态和动作,不依赖于策略用于评价在状态下做动作的好坏。

 最优状态价值函数(Optimal state_value function):

关于求 最大值,消除掉,得到只依赖于状态, 不依赖于策略用于评价状态的好坏。

 最优优势函数(Optimal advantage function):

 将作为baseline,则优势函数表示动作相对于baseline的优势。动作越好,其优势越大。

 2.2 优势函数性质

 为了推导优势函数的性质,这里给出定理1(大家可以从确定性策略角度来理解定理1,这里不给出该定理的证明):

定理1:

针对优势函数

 两边同时对动作取最大值,得到

 根据定理1进一步得到

 接着对优势函数做变换,得到

不妨在上式右边减去 ,由于其等于0,等号不变,得到

看到这里小伙伴可能会疑惑,为什么要 减去 ,这不是多此一举吗?不急,这个后面2.4节会解释。至此,我们得到了定理2,这是一个非常重要的等式,Dueling Network就是根据定理2得到的。

定理2:

 2.3 对偶网络

在DQN算法中,我们利用神经网络来近似最优动作值函数,从而得到最优策略,其中为网络参数。但是在Dueling Network中,我们利用神经网路来近似最优优势函数,利用神经网络来近似最优状态值函数,其中均为网络参数。

根据定理2,将其中的用神经网络来替代,用神经网络来代替,得到

其中近似,它被成为Dueling Network。令,则

现在,我们发现Dueling Network与DQN完全一致,包括输入和输出,唯一不同在于神经网络的结构。Dueling Network的网络结构更好,因此它的表现比DQN更好。另外,DQN中的一些技巧也同样可以用于Dueling Network中,例如优先经验回放、Double DQN和Multi-step TD target等。

2.4 唯一性

 观察下面两个等式:

下面的式子相较于上面的式子只是减去了,前面已经讨论过 ,既然如此,这一项有必要的吗,直接用上面的式子不就行了吗?

答案是有必要的。因为上面的式子存在一个缺点:无法通过学习来唯一确定举个栗子:令,那么

因此结果不唯一。这种不唯一性会带来什么问题呢?

如果神经网络V和A上下波动,幅度相同,方向相反,那么网络的输出相同,但是由于两个网络都在上下波动,两个网络都不稳定,因此都训练不好。

针对上面式子的不唯一性,下面的式子可以解决。通过加入最大化强可以有效避免不唯一性问题,仍然用上面的栗子来分析,令,那么

 此时发生了变化,解决了不唯一性问题,这就是要加上最大化项的原因了。

2.5 两种数学形式

 通过前面的推导,我们得到了Dueling Network的数学形式为

 实际中将最大化形式变成均值形式效果更好,更稳定,其数学形式如下

 这种替换没有任何理论依据,就是实验效果好,简单!粗暴!有效!

3 Dueling DQN算法代码

经验回放采用集中式均匀回放,代码如下(脚本buffer.py):

import numpy as np


class ReplayBuffer:
    def __init__(self, state_dim, action_dim, max_size, batch_size):
        self.mem_size = max_size
        self.mem_cnt = 0
        self.batch_size = batch_size

        self.state_memory = np.zeros((self.mem_size, state_dim))
        self.action_memory = np.zeros((self.mem_size, ))
        self.reward_memory = np.zeros((self.mem_size, ))
        self.next_state_memory = np.zeros((self.mem_size, state_dim))
        self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)

    def store_transition(self, state, action, reward, state_, done):
        mem_idx = self.mem_cnt % self.mem_size

        self.state_memory[mem_idx] = state
        self.action_memory[mem_idx] = action
        self.reward_memory[mem_idx] = reward
        self.next_state_memory[mem_idx] = state_
        self.terminal_memory[mem_idx] = done

        self.mem_cnt += 1

    def sample_buffer(self):
        mem_len = min(self.mem_size, self.mem_cnt)

        batch = np.random.choice(mem_len, self.batch_size, replace=False)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.next_state_memory[batch]
        terminals = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminals

    def ready(self):
        return self.mem_cnt > self.batch_size

目标网络的更新方式为软更新,Dueling DQN的实现代码如下(脚本Dueling_DQN.py):

import torch as T
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from buffer import ReplayBuffer

device = T.device("cuda:0" if T.cuda.is_available() else "cpu")


class DuelingDeepQNetwork(nn.Module):
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
        super(DuelingDeepQNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim, fc1_dim)
        self.fc2 = nn.Linear(fc1_dim, fc2_dim)
        self.V = nn.Linear(fc2_dim, 1)
        self.A = nn.Linear(fc2_dim, action_dim)

        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        self.to(device)

    def forward(self, state):
        x = T.relu(self.fc1(state))
        x = T.relu(self.fc2(x))

        V = self.V(x)
        A = self.A(x)

        return V, A

    def save_checkpoint(self, checkpoint_file):
        T.save(self.state_dict(), checkpoint_file, _use_new_zipfile_serialization=False)

    def load_checkpoint(self, checkpoint_file):
        self.load_state_dict(T.load(checkpoint_file))


class DuelingDQN:
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim, ckpt_dir,
                 gamma=0.99, tau=0.005, epsilon=1.0, eps_end=0.01, eps_dec=5e-4,
                 max_size=1000000, batch_size=256):
        self.gamma = gamma
        self.tau = tau
        self.batch_size = batch_size
        self.epsilon = epsilon
        self.eps_min = eps_end
        self.eps_dec = eps_dec
        self.checkpoint_dir = ckpt_dir
        self.action_space = [i for i in range(action_dim)]

        self.q_eval = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                          fc1_dim=fc1_dim, fc2_dim=fc2_dim)
        self.q_target = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                            fc1_dim=fc1_dim, fc2_dim=fc2_dim)

        self.memory = ReplayBuffer(state_dim=state_dim, action_dim=action_dim,
                                   max_size=max_size, batch_size=batch_size)

        self.update_network_parameters(tau=1.0)

    def update_network_parameters(self, tau=None):
        if tau is None:
            tau = self.tau

        for q_target_params, q_eval_params in zip(self.q_target.parameters(), self.q_eval.parameters()):
            q_target_params.data.copy_(tau * q_eval_params + (1 - tau) * q_target_params)

    def remember(self, state, action, reward, stata_, done):
        self.memory.store_transition(state, action, reward, stata_, done)

    def decrement_epsilon(self):
        self.epsilon = self.epsilon - self.eps_dec \\
            if self.epsilon > self.eps_min else self.eps_min

    def choose_action(self, observation, isTrain=True):
        state = T.tensor([observation], dtype=T.float).to(device)
        _, A = self.q_eval.forward(state)
        action = T.argmax(A).item()

        if (np.random.random() < self.epsilon) and isTrain:
            action = np.random.choice(self.action_space)

        return action

    def learn(self):
        if not self.memory.ready():
            return

        states, actions, rewards, next_states, terminals = self.memory.sample_buffer()
        batch_idx = T.arange(self.batch_size, dtype=T.long).to(device)
        states_tensor = T.tensor(states, dtype=T.float).to(device)
        actions_tensor = T.tensor(actions, dtype=T.long).to(device)
        rewards_tensor = T.tensor(rewards, dtype=T.float).to(device)
        next_states_tensor = T.tensor(next_states, dtype=T.float).to(device)
        terminals_tensor = T.tensor(terminals).to(device)

        with T.no_grad():
            V_, A_ = self.q_target.forward(next_states_tensor)
            q_ = V_ + A_ - T.mean(A_, dim=-1, keepdim=True)
            q_[terminals_tensor] = 0.0
            target = rewards_tensor + self.gamma * T.max(q_, dim=-1)[0]
        V, A = self.q_eval.forward(states_tensor)
        q = (V + A - T.mean(A, dim=-1, keepdim=True))[batch_idx, actions_tensor]

        loss = F.mse_loss(q, target.detach())
        self.q_eval.optimizer.zero_grad()
        loss.backward()
        self.q_eval.optimizer.step()

        self.update_network_parameters()
        self.decrement_epsilon()

    def save_models(self, episode):
        self.q_eval.save_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_.pth'.format(episode))
        print('Saving Q_eval network successfully!')
        self.q_target.save_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_.pth'.format(episode))
        print('Saving Q_target network successfully!')

    def load_models(self, episode):
        self.q_eval.load_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_.pth'.format(episode))
        print('Loading Q_eval network successfully!')
        self.q_target.load_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_.pth'.format(episode))
        print('Loading Q_target network successfully!')

算法仿真环境为gym库中的LunarLander-v2,因此需要先配置好gym库。进入Anaconda3中对应的Python环境中,执行下面的指令

pip install gym

但是,这样安装的gym库只包括少量的内置环境,如算法环境、简单文字游戏和经典控制环境,无法使用LunarLander-v2。因此还需要安装一些其他依赖项,具体可以参考我的这篇博文:AttributeError: module ‘gym.envs.box2d‘ has no attribute ‘LunarLander‘ 解决办法

让智能体在环境中训练500轮,训练代码如下(脚本train.py):

import gym
import numpy as np
import argparse
from utils import plot_learning_curve, create_directory
from Dueling_DQN import DuelingDQN

parser = argparse.ArgumentParser()
parser.add_argument('--max_episodes', type=int, default=500)
parser.add_argument('--ckpt_dir', type=str, default='./checkpoints/DuelingDQN/')
parser.add_argument('--reward_path', type=str, default='./output_images/reward.png')
parser.add_argument('--epsilon_path', type=str, default='./output_images/epsilon.png')

args = parser.parse_args()


def main():
    env = gym.make('LunarLander-v2')
    agent = DuelingDQN(alpha=0.0003, state_dim=env.observation_space.shape[0], action_dim=env.action_space.n,
                       fc1_dim=256, fc2_dim=256, ckpt_dir=args.ckpt_dir, gamma=0.99, tau=0.005, epsilon=1.0,
                       eps_end=0.05, eps_dec=5e-4, max_size=1000000, batch_size=256)
    create_directory(args.ckpt_dir, sub_dirs=['Q_eval', 'Q_target'])
    total_rewards, avg_rewards, eps_history = [], [], []

    for episode in range(args.max_episodes):
        total_reward = 0
        done = False
        observation = env.reset()
        while not done:
            action = agent.choose_action(observation, isTrain=True)
            observation_, reward, done, info = env.step(action)
            agent.remember(observation, action, reward, observation_, done)
            agent.learn()
            total_reward += reward
            observation = observation_

        total_rewards.append(total_reward)
        avg_reward = np.mean(total_rewards[-100:])
        avg_rewards.append(avg_reward)
        eps_history.append(agent.epsilon)
        print('EP: reward: avg_reward: epsilon:'.
              format(episode + 1, total_reward, avg_reward, agent.epsilon))

        if (episode + 1) % 50 == 0:
            agent.save_models(episode + 1)

    episodes = [i for i in range(args.max_episodes)]
    plot_learning_curve(episodes, avg_rewards, 'Reward', 'reward', args.reward_path)
    plot_learning_curve(episodes, eps_history, 'Epsilon', 'epsilon', args.epsilon_path)


if __name__ == '__main__':
    main()

训练时还会用到画图函数和创建文件夹函数,它们均放置在utils.py脚本中,具体代码如下:

import os
import matplotlib.pyplot as plt


def plot_learning_curve(episodes, records, title, ylabel, figure_file):
    plt.figure()
    plt.plot(episodes, records, linestyle='-', color='r')
    plt.title(title)
    plt.xlabel('episode')
    plt.ylabel(ylabel)

    plt.show()
    plt.savefig(figure_file)


def create_directory(path: str, sub_dirs: list):
    for sub_dir in sub_dirs:
        if os.path.exists(path + sub_dir):
            print(path + sub_dir + ' is already exist!')
        else:
            os.makedirs(path + sub_dir, exist_ok=True)
            print(path + sub_dir + ' create successfully!')

仿真结果如下图所示:

 平均累积奖励曲线

epsilon变化曲线

 通过平均累积奖励可以看出,Dueling DQN算法大约在250步时趋于收敛。

以上是关于深度强化学习-DDPG算法原理与代码的主要内容,如果未能解决你的问题,请参考以下文章

深度强化学习-Dueling DQN算法原理与代码

深度强化学习-DQN算法原理与代码

深度强化学习(Deep Reinforcement Learning)入门:RL base & DQN-DDPG-A3C introduction

DDPG算法解析

强化学习调参技巧一: DDPG算法训练动作选择边界值_分析解决

《机器学习-原理算法与应用》配套PPT第四部分(深度学习概论自动编码器强化学习聚类算法半监督学习等)