《Reinforcement learning based parameters adaption method for particleswarm optimization》代码复现
Posted Huterox
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Reinforcement learning based parameters adaption method for particleswarm optimization》代码复现相关的知识,希望对你有一定的参考价值。
文章目录
前言
前几天通过阅读这篇文献:
《Reinforcement learning based parameters adaption method for particleswarm optimization》
发现有些点还是比较新颖的,所以今天对论文的代码进行了整体的复现。整个过程大概花费了1天半(编码调试,不包括实验)
(PS:如果 不想看论文的话,请查看这篇博客:关于强化学习优化粒子群算法的论文解读
在本篇博文将完整分析这篇论文的思路以及工作流程。而且说实话这篇论文其实我感觉复现起来没有一点难度,有些点还是比较新颖的,可以玩玩,顺便作为一个强化学习项目练练手。
版权
郑重提示:本文版权归本人所有,任何人不得抄袭,搬运,使用需征得本人同意!
2022.6.27~2022.6.28
日期:2022.6.27~2022.6.28
项目结构
整个的项目结构如下:
这里不做过多的解释了,不过值的一提的是,我这里是没有使用矩阵的写法的,因为整个项目一开始的目的就是为了使用Python作为实验,然后把Python代码转换为Java代码上Flink的,所以设计之初就是使用一个对象来存储一个粒子的,这样做的好处就是使用一个对象代替了好几个大的矩阵,也就是说不需要去维护矩阵了,而且写出来的代码可读性很高,并且刚好论文当中有使用CLPSO的速度更新方程来进行变体,所以他这里实现的话,也是很难直接使用矩阵来实现这个粒子之间的跟踪,以及锦标赛选择滴。
哦,对了额外说明一下这篇论文是发在arxiv上面的,不是什么IEEE这种顶刊,所以有些地方,他的描述是不严谨的,所以代码的总体的设计是按照论文来的,但是有些细节是不太一样的,不然代码都跑不起来。
然后这个项目也是验证跑了一下的,发现效果真的挺厉害的,说实话如果不是因为这个东西加了个DDPG,我想要玩玩这个神经网络,我根本就不会去想要复现这个玩意,而且一开始也是抱着怀疑的态度编写的,不过现在来看,还是挺厉害的,我一共训练了300轮每一轮PSO算法跑1000次。也就是说这里是跑了30万然后100个粒子,也就是3000万,本来的话,上午我是可以发出这篇文章记录一下的,但是后来改了几个bug然后调了几个参数,其实原来我还在训练3亿次的网络,但是实在顶不住了,最后改到0.3亿。
传统PSO(这里我没有展示优化过后的(原来我优化的)因为结果都一样被吊打)
都是跑1000次
传统:
我自己优化的只能到-58
DDPG优化:
而且是在50多次就出来了:
这怎么玩?!!!
说实话,没有想过会有这种效果。
当然废话不多说了,我们直接来看实现。
数据结构定义
这里的话,基于先去的数据结构,这里定义了一个Bird专门用来储存一些信息。
#coding=utf-8
#这里我们使用定义Class的方式实现PSO,便于后面调整
from ONEPSO.Config import *
import random
class Bird(object):
#这个是从1开始的
ID = 1
#这个是划分多种群时的种群ID.他是从0开始的
CID = 0
#用于存放粒子到每个子种群的粒子的距离
DIST = []
Iterate = 0
Y = None
X = None
#记录个体的最优值,这个关系到后面求取全局最优值
# 因为论文当中也采用了CLPSO,所以需要这个破玩意
# 不过在这里还是需要Pbest与Gbest
Follow = None
#这个玩意是用来判断当前粒子是不是需要换一个追随者了的
NoChange = 0
PbestY = None
PBestX = None
PBestV = None
GBestX = None
GBestY = None
GBestV = None
CBestY=None
CBestX=None
CBestV =None
V = None
def __init__(self,ID):
self.ID = ID
self.V = [random.random() *(V_max-V_min) + V_min for _ in range(DIM)]
self.X = [random.random() *(X_up-X_down) + X_down for _ in range(DIM)]
def __str__(self):
return "ID:"+str(self.ID)+" -Fintess:%.2e:"%(self.Y)+" -X"+str(self.X)+" -PBestFitness:%.2e"%(self.PbestY)+" -PBestX:"+str(self.PBestX)+\\
"\\n -GBestFitness:%.2e"%(self.GBestY)+" -GBestX:"+str(self.GBestX)
基本粒子群实现
这里的基本的粒子群实现其实和原来的是现实是一样的,只是我这里为了做到通用,我是直接在上面加功能。
#coding=utf-8
#这个是最基础的PSO算法实现,用于测试当前算法架构的合理性
#此外这个玩意还是用来优化的工具类,整个算法是为了求取最小值
import sys
import os
sys.path.append(os.path.abspath(os.path.dirname(os.getcwd())))
import math
from ONEPSO.RLPSOEmersion.TargetRL import Target
from ONEPSO.RLPSOEmersion.ConfigRL import *
from ONEPSO.RLPSOEmersion.BirdRL import Bird
import random
import time
class NewPsoRL(object):
#这里是用来记录上一轮的GBestY的,主要是为了给ENV用(不影响这个算法的正常的基础PSO使用)
GBestYLast = None
#这个是用来记录全局最优的,为了记录停摆的
GBestY = None
NoChangeFlag = False
Population = None
Iterate_num = 0
Random = random.random
target = Target()
W_RL = W
C1_RL = C1
C2_RL = C2
C3_RL = C3
#先实例出来一个对象,避免下次再启动它很慢
Math = math
Count_Div = 0
##################################
"""
这三个破玩意是要输入神经网络的几个参数
这三个玩意后面每一个玩意展开都是5维的
"""
F_iterate_num = None
F_diversity = None
F_no_increase = None
"""
接下来这些是为了得到上面三个破玩意而设置的辅助变量
"""
F_no_increase_time = 0
##################################
def __init__(self):
#为了方便,我们这边直接先从1开始
self.Population = [Bird(ID) for ID in range(1,PopulationSize+1)]
def __GetDiversity(self,Population):
#先计算出平均数
x_pa = [0 for _ in range(DIM)]
for bird in Population:
for i in range(DIM):
x_pa[i]+=bird.X[i]
for i in range(DIM):
x_pa[i]/=PopulationSize
#现在计算出Diversity
Diversity = 0.
for bird in Population:
sum = 0
for i in range(DIM):
sum+=abs(bird.X[i]-x_pa[i])
Diversity+=self.Math.sqrt(sum)
Diversity = Diversity/PopulationSize
return Diversity
def __SinX(self,params):
"""
这个就是论文提到的那个sin方法映射
这个在论文里面是说从0-4,也就是5个参数返回
:param params:
:return:
"""
res = []
for i in range(5):
tem_ = self.Math.sin(params*self.Math.pow(2,i))
res.append(tem_)
return res
def __cat(self,states,params):
for param in params:
states.append(param)
def GetStatus(self,Population):
"""
按照要求,我们将在这里实现对现在粒子群的一个状态的输入
:return:
"""
self.F_iterate_num = self.Iterate_num / IterationsNumber
self.F_diversity = self.__GetDiversity(Population)
self.F_no_increase = (self.Iterate_num-self.F_no_increase_time)/IterationsNumber
#此时经过这个sin函数进行映射
self.F_iterate_num = self.__SinX(self.F_iterate_num)
self.F_diversity = self.__SinX(self.F_diversity)
self.F_no_increase = self.__SinX(self.F_no_increase)
states = []
self.__cat(states,self.F_iterate_num);self.__cat(states,self.F_diversity);self.__cat(states,self.F_no_increase)
return states
def ChangeBird(self,bird,Population):
#这个主要是实现锦标赛法来对粒子的跟踪对象进行更新
while True:
#被跟踪的粒子不能和自己一样,也不能和上一个一样
a,b = random.sample(range(PopulationSize),2)
a = Population[a];b=Population[b]
follow = a
if(a.PbestY>b.PbestY):
follow = b
if(follow.ID!=bird.ID):
if(bird.Follow):
if(bird.Follow.ID !=follow.ID):
bird.Follow = follow
return
else:
bird.Follow = follow
return
def __PCi(self,i,ps):
"""
论文当中的PCi的算子
:return:
"""
pci = 0.05+0.45*((self.Math.exp(10*(i-1)/(ps-1)))/(self.Math.exp(10)-1))
return pci
def NewComputeV(self,bird,params):
"""
:param bird:
:param params: 传入的数据格式为:[[w,c1,c2,c3],[],[],[],[]] 这里一共是5组共设置100个粒子
:return:
这里按照ID的顺序来调用不同的参数
"""
NewV = []
w, c1, c2, c3 = params[self.Count_Div]
if(bird.ID%ClusterSize==0):
if(self.Count_Div<ClusterNumber-1):
self.Count_Div+=1
for i in range(DIM):
v = bird.V[i]*w
if(self.Random()<self.__PCi((i+1),PopulationSize)):
pbestfi = bird.PBestX[i]
else:
pbestfi=bird.PBestX[i]
v=v+c1*self.Random()*(pbestfi-bird.X[i])+c2*self.Random()*(bird.GBestX[i]-bird.X[i])\\
+c3*self.Random()*(bird.PBestX[i]-bird.X[i])
if(v>V_max):
v = V_max
elif(v<V_min):
v = V_min
NewV.append(v)
return NewV
def NewComputeX(self,bird:Bird,params):
NewX = []
NewV = self.NewComputeV(bird,params)
bird.V = NewV
for i in range(DIM):
x = bird.X[i]+NewV[i]
if(x>X_up):
x = X_up
elif(x<X_down):
x = X_down
NewX.append(x)
return NewX
def ComputeV(self,bird):
#这个方法是用来计算速度滴
#现在这个粒子群的这个计算V的算法需要进行改动,但是改动不会太大。
NewV=[]
for i in range(DIM):
v = bird.V[i]*self.W_RL + self.C1_RL*self.Random()*(bird.PBestX[i]-bird.X[i])\\
+self.C2_RL*self.Random()*(bird.GBestX[i]-bird.X[i])
#这里注意判断是否超出了范围
if(v>V_max):
v = V_max
elif(v<V_min):
v = V_min
NewV.append(v)
return NewV
def ComputeX(self,bird:Bird):
NewX = []
NewV = self.ComputeV(bird)
bird.V = NewV
for i in range(DIM):
x = bird.X[i]+NewV[i]
if(x>X_up):
x = X_up
elif(x<X_down):
x = X_down
NewX.append(x)
return NewX
def InitPopulation(self):
#初始化种群
GBestX = [0. for _ in range(DIM)]
Flag = float("inf")
for bird in self.Population:
bird.PBestX = bird.X
bird.Y = self.target.SquareSum(bird.X)
bird.PbestY = bird.Y
if(bird.Y<=Flag):
GBestX = bird.X
Flag = bird.Y
#便利了一遍我们得到了全局最优的种群
self.GBestY = Flag
for bird in self.Population:
bird.GBestX = GBestX
bird.GBestY = Flag
self.Iterate_num+=1
def InitPopulationRL(self):
#初始化种群,不过是给ENV调用的,因为这个里面有一个CLPSO的思想
GBestX = [0. for _ in range(DIM)]
Flag = float("inf")
for bird in self.Population:
bird.PBestX = bird.X
bird.Y = self.target.SquareSum(bird.X)
bird.PbestY = bird.Y
if(bird.Y<=Flag):
GBestX = bird.X
Flag = bird.Y
#便利了一遍我们得到了全局最优的种群
self.GBestY = Flag
for bird in self.Population:
bird.GBestX = GBestX
bird.GBestY = Flag
#现在是初始化,所以这个这样算是没问题的
self.GBestYLast = Flag
#给每一个粒子找到一个追随者
self.ChangeBird(bird,self.Population)
def Running(self):
"""
这个方法是用来正常运行基础版本的PSO算法的
这里没有必要删除这些算法,最起码可以做对比
:return:
"""
for iterate in range(1,IterationsNumber+1):
w = LinearW(iterate)
#这个算的GBestX其实始终是在算下一轮的最好的玩意
GBestX = [0. for _ in range(DIM)]
Flag = float("inf")
for bird in self.Population:
#更改为线性权重
self.W_RL = w
x = self.ComputeX(bird)
y = self.target.SquareSum(x)
# 这里还是要无条件更细的,不然这里的话C1就失效了
# if(y<=bird.Y):
# bird.X = x
# bird.Y = y
bird.X = x
bird.Y = y
if(bird.Y<=bird.PbestY):
bird.PBestX=bird.X
bird.PbestY = bird.Y
#个体中的最优一定包含了全局经历过的最优值
if(bird.PbestY<=Flag):
GBestX = bird.PBestX
Flag = bird.PbestY
for bird in self.Population:
bird.GBestX = GBestX
bird.GBestY=Flag
if __name__ == '__main__':
start = time.time()
basePso = NewPsoRL()
basePso.InitPopulation()
basePso.Running()
end = time.time()
# for bird in basePso.Population:
# print(bird)
print(basePso.Population[0])
print("花费时长:",end-start)
这个类其实是有两个功能的,一个是传统的粒子群算法,对每次这里是封装了一个传统粒子群算法的,然后就是基于论文改造的一些方法,那么这部分的调用是通过别的类来调用的。这玩意可以提取出来,但是也有重复的地方(和基本的粒子群算法)所以我就没有单独提取出来了。
配置
这里也是有一个统一的配置中心进行管控的。
不过其实还有一些东西我是没有提取到这个配置里面的,懒得搞了。
#coding=utf-8
# 相关参数的设置通过配置中心完成
import sys
import os
sys.path.append(os.path.abspath(os.path.dirname(os.getcwd())))
C1=2.0
C2=2.0
C3=2.0
#如果超过四轮还没有得到更新的话,那么需要重新选择一个follow对于当前的粒子来说
M_follow = 4
W = 0.4
#单目标下定义的维度
DIM =5
#运行1000次(可以理解为训练1次这个粒子群要跑一千次)
IterationsNumber = 1000
X_down = -10.0
X_up = 10
V_min = -5.0
V_max = 5
#这个是按照论文进行的描述,分为5组,100个粒子是自己后面加的
ClusterSize = 20
ClusterNumber = 5
EPOCH = 20 #训练N轮
PopulationSize = 100
def LinearW(iterate):
#传入迭代次数
Wmax = 0.9
Wmin = 0.4
w = Wmax-(iterate*((Wmax-Wmin)/IterationsNumber))
return w
训练实现
那么接下来就是如何实现论文当中的训练部分了。
因为论文也是说要先预训练嘛,所以你懂的。
神经网络的定义
这里我们想要实现这个玩意,我们需要先定义两个神经网络。
class Actor(nn.Module):
"""
生成动作网络输入三个参数
"""
def __init__(self,state):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state,64)
self.fc1.weight.data.normal_(0, 0.1)
self.fc2 = nn.Linear(64,64)
self.fc2.weight.data.normal_(0, 0.1)
self.以上是关于《Reinforcement learning based parameters adaption method for particleswarm optimization》代码复现的主要内容,如果未能解决你的问题,请参考以下文章
reinforcement learning和deep learning的区别
Reinforcement Learning Q-learning 算法学习-4
Deep Reinforcement Learning 深度增强学习资源
repost: Deep Reinforcement Learning