python TORCS包裹在openai健身房环境中
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python TORCS包裹在openai健身房环境中相关的知识,希望对你有一定的参考价值。
import gym
from gym import spaces
import numpy as np
# from os import path
import snakeoil3_gym as snakeoil3
import numpy as np
import copy
import collections as col
import os
import time
class TorcsEnv:
terminal_judge_start = 100 # If after 100 timestep still no progress, terminated
termination_limit_progress = 5 # [km/h], episode terminates if car is running slower than this limit
default_speed = 50
initial_reset = True
def __init__(self, vision=False, throttle=False, gear_change=False):
self.vision = vision
self.throttle = throttle
self.gear_change = gear_change
self.initial_run = True
##print("launch torcs")
os.system('pkill torcs')
time.sleep(0.5)
if self.vision is True:
os.system('torcs -nofuel -nodamage -nolaptime -vision &')
else:
os.system('torcs -nofuel -nolaptime &')
time.sleep(0.5)
os.system('sh autostart.sh')
time.sleep(0.5)
"""
# Modify here if you use multiple tracks in the environment
self.client = snakeoil3.Client(p=3101, vision=self.vision) # Open new UDP in vtorcs
self.client.MAX_STEPS = np.inf
client = self.client
client.get_servers_input() # Get the initial input from torcs
obs = client.S.d # Get the current full-observation from torcs
"""
if throttle is False:
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,))
else:
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,))
if vision is False:
high = np.array([1., np.inf, np.inf, np.inf, 1., np.inf, 1., np.inf])
low = np.array([0., -np.inf, -np.inf, -np.inf, 0., -np.inf, 0., -np.inf])
self.observation_space = spaces.Box(low=low, high=high)
else:
high = np.array([1., np.inf, np.inf, np.inf, 1., np.inf, 1., np.inf, 255])
low = np.array([0., -np.inf, -np.inf, -np.inf, 0., -np.inf, 0., -np.inf, 0])
self.observation_space = spaces.Box(low=low, high=high)
def step(self, u):
#print("Step")
# convert thisAction to the actual torcs actionstr
client = self.client
this_action = self.agent_to_torcs(u)
# Apply Action
action_torcs = client.R.d
# Steering
action_torcs['steer'] = this_action['steer'] # in [-1, 1]
# Simple Autnmatic Throttle Control by Snakeoil
if self.throttle is False:
target_speed = self.default_speed
if client.S.d['speedX'] < target_speed - (client.R.d['steer']*50):
client.R.d['accel'] += .01
else:
client.R.d['accel'] -= .01
if client.R.d['accel'] > 0.2:
client.R.d['accel'] = 0.2
if client.S.d['speedX'] < 10:
client.R.d['accel'] += 1/(client.S.d['speedX']+.1)
# Traction Control System
if ((client.S.d['wheelSpinVel'][2]+client.S.d['wheelSpinVel'][3]) -
(client.S.d['wheelSpinVel'][0]+client.S.d['wheelSpinVel'][1]) > 5):
action_torcs['accel'] -= .2
else:
action_torcs['accel'] = this_action['accel']
action_torcs['brake'] = this_action['brake']
# Automatic Gear Change by Snakeoil
if self.gear_change is True:
action_torcs['gear'] = this_action['gear']
else:
# Automatic Gear Change by Snakeoil is possible
action_torcs['gear'] = 1
if self.throttle:
if client.S.d['speedX'] > 50:
action_torcs['gear'] = 2
if client.S.d['speedX'] > 80:
action_torcs['gear'] = 3
if client.S.d['speedX'] > 110:
action_torcs['gear'] = 4
if client.S.d['speedX'] > 140:
action_torcs['gear'] = 5
if client.S.d['speedX'] > 170:
action_torcs['gear'] = 6
# Save the privious full-obs from torcs for the reward calculation
obs_pre = copy.deepcopy(client.S.d)
# One-Step Dynamics Update #################################
# Apply the Agent's action into torcs
client.respond_to_server()
# Get the response of TORCS
client.get_servers_input()
# Get the current full-observation from torcs
obs = client.S.d
# Make an obsevation from a raw observation vector from TORCS
self.observation = self.make_observaton(obs)
# Reward setting Here #######################################
# direction-dependent positive reward
track = np.array(obs['track'])
trackPos = np.array(obs['trackPos'])
sp = np.array(obs['speedX'])
damage = np.array(obs['damage'])
rpm = np.array(obs['rpm'])
progress = sp*np.cos(obs['angle']) - np.abs(sp*np.sin(obs['angle'])) - sp * np.abs(obs['trackPos'])
reward = progress
# collision detection
if obs['damage'] - obs_pre['damage'] > 0:
reward = -1
# Termination judgement #########################
episode_terminate = False
#if (abs(track.any()) > 1 or abs(trackPos) > 1): # Episode is terminated if the car is out of track
# reward = -200
# episode_terminate = True
# client.R.d['meta'] = True
#if self.terminal_judge_start < self.time_step: # Episode terminates if the progress of agent is small
# if progress < self.termination_limit_progress:
# print("No progress")
# episode_terminate = True
# client.R.d['meta'] = True
if np.cos(obs['angle']) < 0: # Episode is terminated if the agent runs backward
episode_terminate = True
client.R.d['meta'] = True
if client.R.d['meta'] is True: # Send a reset signal
self.initial_run = False
client.respond_to_server()
self.time_step += 1
return self.get_obs(), reward, client.R.d['meta'], {}
def reset(self, relaunch=False):
#print("Reset")
self.time_step = 0
if self.initial_reset is not True:
self.client.R.d['meta'] = True
self.client.respond_to_server()
## TENTATIVE. Restarting TORCS every episode suffers the memory leak bug!
if relaunch is True:
self.reset_torcs()
print("### TORCS is RELAUNCHED ###")
# Modify here if you use multiple tracks in the environment
self.client = snakeoil3.Client(p=3101, vision=self.vision) # Open new UDP in vtorcs
self.client.MAX_STEPS = np.inf
client = self.client
client.get_servers_input() # Get the initial input from torcs
obs = client.S.d # Get the current full-observation from torcs
self.observation = self.make_observaton(obs)
self.last_u = None
self.initial_reset = False
return self.get_obs()
def end(self):
os.system('pkill torcs')
def get_obs(self):
return self.observation
def reset_torcs(self):
#print("relaunch torcs")
os.system('pkill torcs')
time.sleep(0.5)
if self.vision is True:
os.system('torcs -nofuel -nodamage -nolaptime -vision &')
else:
os.system('torcs -nofuel -nolaptime &')
time.sleep(0.5)
os.system('sh autostart.sh')
time.sleep(0.5)
def agent_to_torcs(self, u):
torcs_action = {'steer': u[0]}
if self.throttle is True: # throttle action is enabled
torcs_action.update({'accel': u[1]})
torcs_action.update({'brake': u[2]})
if self.gear_change is True: # gear change action is enabled
torcs_action.update({'gear': int(u[3])})
return torcs_action
def obs_vision_to_image_rgb(self, obs_image_vec):
image_vec = obs_image_vec
r = image_vec[0:len(image_vec):3]
g = image_vec[1:len(image_vec):3]
b = image_vec[2:len(image_vec):3]
sz = (64, 64)
r = np.array(r).reshape(sz)
g = np.array(g).reshape(sz)
b = np.array(b).reshape(sz)
return np.array([r, g, b], dtype=np.uint8)
def make_observaton(self, raw_obs):
if self.vision is False:
names = ['focus',
'speedX', 'speedY', 'speedZ', 'angle', 'damage',
'opponents',
'rpm',
'track',
'trackPos',
'wheelSpinVel',
'roll', 'pitch', 'yaw', 'yaw_rate',
'accel_x', 'accel_y', 'accel_z',
'trackR',
'toMiddleDist']
# roll, pitch, yaw, yaw_rate, accel_x, accel_y, accel_z, trackR, toMiddleDist are added by wwf
Observation = col.namedtuple('Observation', names)
return Observation(focus=np.array(raw_obs['focus'], dtype=np.float32)/200.,
speedX=np.array(raw_obs['speedX'], dtype=np.float32)/300.0,
speedY=np.array(raw_obs['speedY'], dtype=np.float32)/300.0,
speedZ=np.array(raw_obs['speedZ'], dtype=np.float32)/300.0,
angle=np.array(raw_obs['angle'], dtype=np.float32)/3.1416,
damage=np.array(raw_obs['damage'], dtype=np.float32),
opponents=np.array(raw_obs['opponents'], dtype=np.float32)/200.,
rpm=np.array(raw_obs['rpm'], dtype=np.float32)/10000,
track=np.array(raw_obs['track'], dtype=np.float32)/200.,
trackPos=np.array(raw_obs['trackPos'], dtype=np.float32)/1.,
wheelSpinVel=np.array(raw_obs['wheelSpinVel'], dtype=np.float32),
roll=np.array(raw_obs['roll'], dtype=np.float32),
pitch=np.array(raw_obs['pitch'], dtype=np.float32),
yaw=np.array(raw_obs['yaw'], dtype=np.float32),
yaw_rate=np.array(raw_obs['yaw_rate'], dtype=np.float32),
accel_x=np.array(raw_obs['accel_x'],dtype=np.float32),
accel_y=np.array(raw_obs['accel_y'], dtype=np.float32),
accel_z=np.array(raw_obs['accel_z'], dtype=np.float32),
trackR=np.array(raw_obs['trackR'], dtype=np.float32),
toMiddleDist=np.array(raw_obs['toMiddleDist'], dtype=np.float32))
else:
names = ['focus',
'speedX', 'speedY', 'speedZ', 'angle',
'opponents',
'rpm',
'track',
'trackPos',
'wheelSpinVel',
'img',
'roll', 'pitch', 'yaw', 'yaw_rate',
'accel_x', 'accel_y', 'accel_z',
'trackR',
'toMiddleDist'
]
Observation = col.namedtuple('Observaion', names)
# Get RGB from observation
image_rgb = self.obs_vision_to_image_rgb(raw_obs[names[8]])
return Observation(focus=np.array(raw_obs['focus'], dtype=np.float32)/200.,
speedX=np.array(raw_obs['speedX'], dtype=np.float32)/self.default_speed,
speedY=np.array(raw_obs['speedY'], dtype=np.float32)/self.default_speed,
speedZ=np.array(raw_obs['speedZ'], dtype=np.float32)/self.default_speed,
opponents=np.array(raw_obs['opponents'], dtype=np.float32)/200.,
rpm=np.array(raw_obs['rpm'], dtype=np.float32),
track=np.array(raw_obs['track'], dtype=np.float32)/200.,
trackPos=np.array(raw_obs['trackPos'], dtype=np.float32)/1.,
wheelSpinVel=np.array(raw_obs['wheelSpinVel'], dtype=np.float32),
roll=np.array(raw_obs['roll'], dtype=np.float32),
pitch=np.array(raw_obs['pitch'], dtype=np.float32),
yaw=np.array(raw_obs['yaw'], dtype=np.float32),
yaw_rate=np.array(raw_obs['yaw_rate'], dtype=np.float32),
accel_x=np.array(raw_obs['accel_x'], dtype=np.float32),
accel_y=np.array(raw_obs['accel_y'], dtype=np.float32),
accel_z=np.array(raw_obs['accel_z'], dtype=np.float32),
trackR=np.array(raw_obs['trackR'], dtype=np.float32),
toMiddleDist=np.array(raw_obs['toMiddleDist'], dtype=np.float32),
img=image_rgb)
以上是关于python TORCS包裹在openai健身房环境中的主要内容,如果未能解决你的问题,请参考以下文章