使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理

Posted Alex_996

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理相关的知识,希望对你有一定的参考价值。

在我们的自动驾驶汽车的第四部分,Carla, Python, TensorFlow,和强化学习项目,我们将为我们的实际代理编码。在前一篇教程中,我们研究了环境类,我们的代理将与之交互。

在考虑如何创建代理时,我们还必须考虑模型本身。假设你已经完成了强化学习教程(你最好这样做,否则事情会让你感到非常困惑),你知道代理所采取的每一步都伴随着一个合理的预测(取决于探索/epsilon),而且还伴随着一个调整!这意味着我们要同时进行训练和预测,但我们的代理必须获得尽可能高的FPS(帧/秒)。

为了实现这一点,我们可以使用多处理或线程。线程处理允许我们保持事情相当简单。以后,我可能至少会在某个时候为这个任务开放一些多处理代码,但现在,它是线程化的。

import glob
import os
import sys
import random
import time
import numpy as np
import cv2
import math

try:
    sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
        sys.version_info.major,
        sys.version_info.minor,
        'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
    pass
import carla


SHOW_PREVIEW = False
IM_WIDTH = 640
IM_HEIGHT = 480
SECONDS_PER_EPISODE = 10


class CarEnv:
    SHOW_CAM = SHOW_PREVIEW
    STEER_AMT = 1.0
    im_width = IM_WIDTH
    im_height = IM_HEIGHT
    front_camera = None

    def __init__(self):
        self.client = carla.Client("localhost", 2000)
        self.client.set_timeout(2.0)
        self.world = self.client.get_world()
        self.blueprint_library = self.world.get_blueprint_library()
        self.model_3 = self.blueprint_library.filter("model3")[0]

    def reset(self):
        self.collision_hist = []
        self.actor_list = []

        self.transform = random.choice(self.world.get_map().get_spawn_points())
        self.vehicle = self.world.spawn_actor(self.model_3, self.transform)
        self.actor_list.append(self.vehicle)

        self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')
        self.rgb_cam.set_attribute("image_size_x", f"self.im_width")
        self.rgb_cam.set_attribute("image_size_y", f"self.im_height")
        self.rgb_cam.set_attribute("fov", f"110")

        transform = carla.Transform(carla.Location(x=2.5, z=0.7))
        self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
        self.actor_list.append(self.sensor)
        self.sensor.listen(lambda data: self.process_img(data))

        self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
        time.sleep(4)

        colsensor = self.blueprint_library.find("sensor.other.collision")
        self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)
        self.actor_list.append(self.colsensor)
        self.colsensor.listen(lambda event: self.collision_data(event))

        while self.front_camera is None:
            time.sleep(0.01)

        self.episode_start = time.time()
        self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))

        return self.front_camera

    def collision_data(self, event):
        self.collision_hist.append(event)

    def process_img(self, image):
        i = np.array(image.raw_data)
        #print(i.shape)
        i2 = i.reshape((self.im_height, self.im_width, 4))
        i3 = i2[:, :, :3]
        if self.SHOW_CAM:
            cv2.imshow("", i3)
            cv2.waitKey(1)
        self.front_camera = i3

    def step(self, action):
        if action == 0:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
        elif action == 1:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))
        elif action == 2:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))

        v = self.vehicle.get_velocity()
        kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))

        if len(self.collision_hist) != 0:
            done = True
            reward = -200
        elif kmh < 50:
            done = False
            reward = -1
        else:
            done = False
            reward = 1

        if self.episode_start + SECONDS_PER_EPISODE < time.time():
            done = True

        return self.front_camera, reward, done, None

现在,我们将创建一个新类,DQNAgent类。我们将从:

class DQNAgent:
    def __init__(self):
        self.model = self.create_model()
        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights())

这和强化学习教程中的概念是一样的,我们有一个不断进化的主网络,然后是目标网络,我们更新每n个东西,其中n是你想要的东西,比如步骤或情节。

当我们训练时,我们从回放记忆中随机选择的数据中进行训练:

        self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)

出于与之前(RL教程)相同的原因,我们将修改TensorBoard:

        self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/MODEL_NAME-int(time.time())")

现在,我们可以通过设置一些最终值来包装init方法:

        self.target_update_counter = 0  # will track when it's time to update the target model
        self.graph = tf.get_default_graph()

        self.terminate = False  # Should we quit?
        self.last_logged_episode = 0
        self.training_initialized = False  # waiting for TF to get rolling

我们要用self.training_initialized用于跟踪TensorFlow何时准备就绪。当一个模型开始的时候,第一次的预测/调整会花费额外的时间,所以我们会先传递一些无意义的信息,让我们的模型能够真正开始运行。

现在让我们创建我们的模型:

    def create_model(self):
        base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))

        x = base_model.output
        x = GlobalAveragePooling2D()(x)

        predictions = Dense(3, activation="linear")(x)
        model = Model(inputs=base_model.input, outputs=predictions)
        model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])
        return model

这里,我们将使用预先制作好的Xception模型,但是您也可以制作其他模型,或者导入一个不同的模型。请注意,我们将GlobalAveragePooling添加到我们的输出层,同时显然还添加了3个神经元输出,这是代理要采取的每个可能的动作。让我们为这个方法做一些必需的导入:

from keras.applications.xception import Xception
from keras.layers import Dense, GlobalAveragePooling2D
from keras.optimizers import Adam
from keras.models import Model

在我们的DQNAgent中,我们需要一个快速的方法来更新重放内存:

    def update_replay_memory(self, transition):
        # transition = (current_state, action, reward, new_state, done)
        self.replay_memory.append(transition)

只要我们的方法这么简单,我们可能甚至不需要一个方法来做这个,但我先把它留到现在。

现在来谈谈训练方法。首先,我们只想在回放记忆中样本最少的情况下进行训练:

    def train(self):
        if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return

因为我们有很多像这样的常数,让我们继续,把它们一次性塞进去,以免我忘记:

REPLAY_MEMORY_SIZE = 5_000
MIN_REPLAY_MEMORY_SIZE = 1_000
MINIBATCH_SIZE = 16
PREDICTION_BATCH_SIZE = 1
TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4
UPDATE_TARGET_EVERY = 5
MODEL_NAME = "Xception"

MEMORY_FRACTION = 0.8
MIN_REWARD = -200

EPISODES = 100

DISCOUNT = 0.99
epsilon = 1
EPSILON_DECAY = 0.95 ## 0.9975 99975
MIN_EPSILON = 0.001

AGGREGATE_STATS_EVERY = 10

把它们放在脚本的顶部,显然,在所有类/函数/方法等之外。如果您在任何时候对内容的走向感到困惑,您可以查看本教程的结尾,查看到此为止的代码。

回到我们的训练方法:

    def train(self):
        if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return

如果我们没有足够的样品,我们就会回来,然后结束。如果我们做到了,我们就会开始训练。首先,我们需要随机抽取一小批:

        minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)

一旦我们有了我们的小批处理,我们想要获取我们当前和未来的q值:

        current_states = np.array([transition[0] for transition in minibatch])/255
        with self.graph.as_default():
            current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)

        new_current_states = np.array([transition[3] for transition in minibatch])/255
        with self.graph.as_default():
            future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)

回忆一下过渡是: transition = (current_state, action, reward, new_state, done)

现在,我们创建输入(X)和输出(y):

        X = []
        y = []

        for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):
            if not done:
                max_future_q = np.max(future_qs_list[index])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward

            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            X.append(current_state)
            y.append(current_qs)

这里没有什么新奇的东西,几乎与我们的强化学习教程(DQN)相同。

        log_this_step = False
        if self.tensorboard.step > self.last_logged_episode:
            log_this_step = True
            self.last_log_episode = self.tensorboard.step

我们只尝试记录每一集,而不是实际的训练步骤,所以我们将使用上面的方法来跟踪。

接下来,我们将配合:

        with self.graph.as_default():
            self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)

注意,只有当log_this_step为真时,我们才会设置tensorboard回调。如果它是假的,我们仍然适合,我们只是不会登录到TensorBoard。

接下来,我们想继续跟踪日志记录:

        if log_this_step:
            self.target_update_counter += 1

最后,我们来看看是不是该更新我们的target_model:

        if self.target_update_counter > UPDATE_TARGET_EVERY:
            self.target_model.set_weights(self.model.get_weights())
            self.target_update_counter = 0

现在我们只需要另外2个方法,我们就完成了代理类。首先,我们需要一个方法来获得q值(基本上是为了进行预测)

    def get_qs(self, state):
        return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]

最后,我们只需要进行培训:

    def train_in_loop(self):
        X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32)
        y = np.random.uniform(size=(1, 3)).astype(np.float32)
        with self.graph.as_default():
            self.model.fit(X,y, verbose=False, batch_size=1)

        self.training_initialized = True

首先,我们使用一些随机数据,如上面的初始化,然后我们开始无限循环:

        while True:
            if self.terminate:
                return
            self.train()
            time.sleep(0.01)

这是我们的DQNAgent类。完整的代码到这一点:

import glob
import os
import sys
import random
import time
import numpy as np
import cv2
import math
from collections import deque
from keras.applications.xception import Xception
from keras.layers import Dense, GlobalAveragePooling2D
from keras.optimizers import Adam
from keras.models import Model


try:
    sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
        sys.version_info.major,
        sys.version_info.minor,
        'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
    pass
import carla


SHOW_PREVIEW = False
IM_WIDTH = 640
IM_HEIGHT = 480
SECONDS_PER_EPISODE = 10
REPLAY_MEMORY_SIZE = 5_000
MIN_REPLAY_MEMORY_SIZE = 1_000
MINIBATCH_SIZE = 16
PREDICTION_BATCH_SIZE = 1
TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4
UPDATE_TARGET_EVERY = 5
MODEL_NAME = "Xception"

MEMORY_FRACTION = 0.8
MIN_REWARD = -200

EPISODES = 100

DISCOUNT = 0.99
epsilon = 1
EPSILON_DECAY = 0.95 ## 0.9975 99975
MIN_EPSILON = 0.001

AGGREGATE_STATS_EVERY = 10






class CarEnv:
    SHOW_CAM = SHOW_PREVIEW
    STEER_AMT = 1.0
    im_width = IM_WIDTH
    im_height = IM_HEIGHT
    front_camera = None

    def __init__(self):
        self.client = carla.Client("localhost", 2000)
        self.client.set_timeout(2.0)
        self.world = self.client.get_world()
        self.blueprint_library = self.world.get_blueprint_library()
        self.model_3 = self.blueprint_library.filter("model3")[0]

    def reset(self):
        self.collision_hist = []
        self.actor_list = []

        self.transform = random.choice(self.world.get_map().get_spawn_points())
        self.vehicle = self.world.spawn_actor(self.model_3, self.transform)
        self.actor_list.append(self.vehicle)

        self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')
        self.rgb_cam.set_attribute("image_size_x", f"self.im_width")
        self.rgb_cam.set_attribute("image_size_y", f"self.im_height")
        self.rgb_cam.set_attribute("fov", f"110")

        transform = carla.Transform(carla.Location(x=2.5, z=0.7))
        self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
        self.actor_list.append(self.sensor)
        self.sensor.listen(lambda data: self.process_img(data))

        self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
        time.sleep(4)

        colsensor = self.blueprint_library.find("sensor.other.collision")
        self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)
        self.actor_list.append(self.colsensor)
        self.colsensor.listen(lambda event: self.collision_data(event))

        while self.front_camera is None:
            time.sleep(0.01)

        self.episode_start = time.time(以上是关于使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理的主要内容,如果未能解决你的问题,请参考以下文章

使用 Carla 和 Python 的自动驾驶汽车第 3 部分 —— 强化学习环境

使用 Carla 和 Python 的自动驾驶汽车第 1 部分 —— 介绍

使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理

自动驾驶 7-1 Carla 概述 - 自动驾驶汽车模拟Carla Overview - Self-Driving Car Simulation

自动驾驶7-2 最终项目概述 Final Project Overview

自动驾驶模拟软件Carla---环境搭建和编译