《Reinforcement learning based parameters adaption method for particleswarm optimization》代码复现

Posted Huterox

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Reinforcement learning based parameters adaption method for particleswarm optimization》代码复现相关的知识,希望对你有一定的参考价值。

文章目录

前言

前几天通过阅读这篇文献:
《Reinforcement learning based parameters adaption method for particleswarm optimization》
发现有些点还是比较新颖的,所以今天对论文的代码进行了整体的复现。整个过程大概花费了1天半(编码调试,不包括实验)
(PS:如果 不想看论文的话,请查看这篇博客:关于强化学习优化粒子群算法的论文解读
在本篇博文将完整分析这篇论文的思路以及工作流程。而且说实话这篇论文其实我感觉复现起来没有一点难度,有些点还是比较新颖的,可以玩玩,顺便作为一个强化学习项目练练手。

版权

郑重提示:本文版权归本人所有,任何人不得抄袭,搬运,使用需征得本人同意!

2022.6.27~2022.6.28

日期:2022.6.27~2022.6.28

项目结构

整个的项目结构如下:

这里不做过多的解释了,不过值的一提的是,我这里是没有使用矩阵的写法的,因为整个项目一开始的目的就是为了使用Python作为实验,然后把Python代码转换为Java代码上Flink的,所以设计之初就是使用一个对象来存储一个粒子的,这样做的好处就是使用一个对象代替了好几个大的矩阵,也就是说不需要去维护矩阵了,而且写出来的代码可读性很高,并且刚好论文当中有使用CLPSO的速度更新方程来进行变体,所以他这里实现的话,也是很难直接使用矩阵来实现这个粒子之间的跟踪,以及锦标赛选择滴。

哦,对了额外说明一下这篇论文是发在arxiv上面的,不是什么IEEE这种顶刊,所以有些地方,他的描述是不严谨的,所以代码的总体的设计是按照论文来的,但是有些细节是不太一样的,不然代码都跑不起来。

然后这个项目也是验证跑了一下的,发现效果真的挺厉害的,说实话如果不是因为这个东西加了个DDPG,我想要玩玩这个神经网络,我根本就不会去想要复现这个玩意,而且一开始也是抱着怀疑的态度编写的,不过现在来看,还是挺厉害的,我一共训练了300轮每一轮PSO算法跑1000次。也就是说这里是跑了30万然后100个粒子,也就是3000万,本来的话,上午我是可以发出这篇文章记录一下的,但是后来改了几个bug然后调了几个参数,其实原来我还在训练3亿次的网络,但是实在顶不住了,最后改到0.3亿。

传统PSO(这里我没有展示优化过后的(原来我优化的)因为结果都一样被吊打)
都是跑1000次
传统:

我自己优化的只能到-58
DDPG优化:

而且是在50多次就出来了:

这怎么玩?!!!
说实话,没有想过会有这种效果。

当然废话不多说了,我们直接来看实现。

数据结构定义

这里的话,基于先去的数据结构,这里定义了一个Bird专门用来储存一些信息。

#coding=utf-8
#这里我们使用定义Class的方式实现PSO,便于后面调整

from ONEPSO.Config import *
import random

class Bird(object):

    #这个是从1开始的
    ID = 1
    #这个是划分多种群时的种群ID.他是从0开始的
    CID = 0
    #用于存放粒子到每个子种群的粒子的距离
    DIST = []
    Iterate = 0
    Y = None
    X = None
    #记录个体的最优值,这个关系到后面求取全局最优值
    # 因为论文当中也采用了CLPSO,所以需要这个破玩意
    # 不过在这里还是需要Pbest与Gbest
    Follow = None
    #这个玩意是用来判断当前粒子是不是需要换一个追随者了的
    NoChange = 0
    PbestY = None
    PBestX = None
    PBestV = None
    GBestX = None
    GBestY = None
    GBestV = None
    CBestY=None
    CBestX=None
    CBestV =None
    V = None

    def __init__(self,ID):
        self.ID = ID
        self.V = [random.random() *(V_max-V_min) + V_min for _ in range(DIM)]
        self.X = [random.random() *(X_up-X_down) + X_down for _ in range(DIM)]

    def __str__(self):
        return "ID:"+str(self.ID)+" -Fintess:%.2e:"%(self.Y)+" -X"+str(self.X)+" -PBestFitness:%.2e"%(self.PbestY)+" -PBestX:"+str(self.PBestX)+\\
            "\\n -GBestFitness:%.2e"%(self.GBestY)+" -GBestX:"+str(self.GBestX)

基本粒子群实现

这里的基本的粒子群实现其实和原来的是现实是一样的,只是我这里为了做到通用,我是直接在上面加功能。

#coding=utf-8
#这个是最基础的PSO算法实现,用于测试当前算法架构的合理性
#此外这个玩意还是用来优化的工具类,整个算法是为了求取最小值
import sys
import os
sys.path.append(os.path.abspath(os.path.dirname(os.getcwd())))
import math
from ONEPSO.RLPSOEmersion.TargetRL import Target
from ONEPSO.RLPSOEmersion.ConfigRL import *
from ONEPSO.RLPSOEmersion.BirdRL import Bird
import random
import time
class NewPsoRL(object):

    #这里是用来记录上一轮的GBestY的,主要是为了给ENV用(不影响这个算法的正常的基础PSO使用)
    GBestYLast = None
    #这个是用来记录全局最优的,为了记录停摆的
    GBestY = None
    NoChangeFlag = False

    Population = None
    Iterate_num = 0
    Random = random.random
    target = Target()
    W_RL = W
    C1_RL = C1
    C2_RL = C2
    C3_RL = C3
    #先实例出来一个对象,避免下次再启动它很慢
    Math = math
    Count_Div = 0
    ##################################
    """
    这三个破玩意是要输入神经网络的几个参数
    这三个玩意后面每一个玩意展开都是5维的
    """
    F_iterate_num = None
    F_diversity = None
    F_no_increase = None
    """
    接下来这些是为了得到上面三个破玩意而设置的辅助变量
    """
    F_no_increase_time = 0
    ##################################


    def __init__(self):
        #为了方便,我们这边直接先从1开始
        self.Population = [Bird(ID) for ID in range(1,PopulationSize+1)]
    def __GetDiversity(self,Population):
        #先计算出平均数
        x_pa = [0 for _ in range(DIM)]
        for bird in Population:
            for i in range(DIM):
                x_pa[i]+=bird.X[i]
        for i in range(DIM):
            x_pa[i]/=PopulationSize

        #现在计算出Diversity
        Diversity = 0.
        for bird in Population:
            sum = 0
            for i in range(DIM):
                sum+=abs(bird.X[i]-x_pa[i])

            Diversity+=self.Math.sqrt(sum)
        Diversity  = Diversity/PopulationSize
        return Diversity
    def __SinX(self,params):
        """
        这个就是论文提到的那个sin方法映射
        这个在论文里面是说从0-4,也就是5个参数返回
        :param params:
        :return:
        """
        res = []
        for i in range(5):
            tem_ = self.Math.sin(params*self.Math.pow(2,i))
            res.append(tem_)
        return res

    def __cat(self,states,params):
        for param in params:
            states.append(param)

    def GetStatus(self,Population):
        """
        按照要求,我们将在这里实现对现在粒子群的一个状态的输入
        :return:
        """
        self.F_iterate_num = self.Iterate_num / IterationsNumber
        self.F_diversity = self.__GetDiversity(Population)
        self.F_no_increase = (self.Iterate_num-self.F_no_increase_time)/IterationsNumber
        #此时经过这个sin函数进行映射
        self.F_iterate_num = self.__SinX(self.F_iterate_num)
        self.F_diversity = self.__SinX(self.F_diversity)
        self.F_no_increase = self.__SinX(self.F_no_increase)

        states  = []

        self.__cat(states,self.F_iterate_num);self.__cat(states,self.F_diversity);self.__cat(states,self.F_no_increase)
        return states

    def ChangeBird(self,bird,Population):
        #这个主要是实现锦标赛法来对粒子的跟踪对象进行更新

        while True:
            #被跟踪的粒子不能和自己一样,也不能和上一个一样
            a,b = random.sample(range(PopulationSize),2)
            a = Population[a];b=Population[b]
            follow = a
            if(a.PbestY>b.PbestY):
                follow = b
            if(follow.ID!=bird.ID):
                if(bird.Follow):
                    if(bird.Follow.ID !=follow.ID):
                        bird.Follow = follow
                        return
                else:
                    bird.Follow = follow
                    return

    def __PCi(self,i,ps):
        """
        论文当中的PCi的算子
        :return:
        """
        pci = 0.05+0.45*((self.Math.exp(10*(i-1)/(ps-1)))/(self.Math.exp(10)-1))
        return pci
    def NewComputeV(self,bird,params):
        """

        :param bird:
        :param params: 传入的数据格式为:[[w,c1,c2,c3],[],[],[],[]] 这里一共是5组共设置100个粒子
        :return:
        这里按照ID的顺序来调用不同的参数
        """
        NewV = []
        w, c1, c2, c3 = params[self.Count_Div]
        if(bird.ID%ClusterSize==0):
            if(self.Count_Div<ClusterNumber-1):
                self.Count_Div+=1

        for i in range(DIM):
            v = bird.V[i]*w
            if(self.Random()<self.__PCi((i+1),PopulationSize)):
                pbestfi = bird.PBestX[i]
            else:
                pbestfi=bird.PBestX[i]
            v=v+c1*self.Random()*(pbestfi-bird.X[i])+c2*self.Random()*(bird.GBestX[i]-bird.X[i])\\
            +c3*self.Random()*(bird.PBestX[i]-bird.X[i])

            if(v>V_max):
                v = V_max
            elif(v<V_min):
                v = V_min
            NewV.append(v)

        return NewV

    def NewComputeX(self,bird:Bird,params):
        NewX = []
        NewV = self.NewComputeV(bird,params)
        bird.V = NewV
        for i in range(DIM):
            x = bird.X[i]+NewV[i]
            if(x>X_up):
                x = X_up
            elif(x<X_down):
                x = X_down
            NewX.append(x)
        return NewX



    def ComputeV(self,bird):
        #这个方法是用来计算速度滴
        #现在这个粒子群的这个计算V的算法需要进行改动,但是改动不会太大。
        NewV=[]
        for i in range(DIM):
            v = bird.V[i]*self.W_RL + self.C1_RL*self.Random()*(bird.PBestX[i]-bird.X[i])\\
            +self.C2_RL*self.Random()*(bird.GBestX[i]-bird.X[i])
            #这里注意判断是否超出了范围
            if(v>V_max):
                v = V_max
            elif(v<V_min):
                v = V_min
            NewV.append(v)

        return NewV

    def ComputeX(self,bird:Bird):
        NewX = []
        NewV = self.ComputeV(bird)
        bird.V = NewV
        for i in range(DIM):
            x = bird.X[i]+NewV[i]
            if(x>X_up):
                x = X_up
            elif(x<X_down):
                x = X_down
            NewX.append(x)
        return NewX

    def InitPopulation(self):
        #初始化种群
        GBestX = [0. for _ in range(DIM)]
        Flag = float("inf")
        for bird in self.Population:
            bird.PBestX = bird.X
            bird.Y = self.target.SquareSum(bird.X)
            bird.PbestY = bird.Y
            if(bird.Y<=Flag):
                GBestX = bird.X
                Flag = bird.Y
        #便利了一遍我们得到了全局最优的种群
        self.GBestY = Flag
        for bird in self.Population:
            bird.GBestX = GBestX
            bird.GBestY = Flag

        self.Iterate_num+=1

    def InitPopulationRL(self):
        #初始化种群,不过是给ENV调用的,因为这个里面有一个CLPSO的思想
        GBestX = [0. for _ in range(DIM)]
        Flag = float("inf")
        for bird in self.Population:
            bird.PBestX = bird.X
            bird.Y = self.target.SquareSum(bird.X)
            bird.PbestY = bird.Y
            if(bird.Y<=Flag):
                GBestX = bird.X
                Flag = bird.Y

        #便利了一遍我们得到了全局最优的种群
        self.GBestY = Flag
        for bird in self.Population:
            bird.GBestX = GBestX
            bird.GBestY = Flag
            #现在是初始化,所以这个这样算是没问题的
            self.GBestYLast = Flag
            #给每一个粒子找到一个追随者
            self.ChangeBird(bird,self.Population)



    def Running(self):
        """
        这个方法是用来正常运行基础版本的PSO算法的
        这里没有必要删除这些算法,最起码可以做对比
        :return:
        """
        for iterate in range(1,IterationsNumber+1):
            w = LinearW(iterate)
            #这个算的GBestX其实始终是在算下一轮的最好的玩意
            GBestX = [0. for _ in range(DIM)]
            Flag = float("inf")
            for bird in self.Population:
                #更改为线性权重
                self.W_RL = w
                x = self.ComputeX(bird)
                y = self.target.SquareSum(x)
                # 这里还是要无条件更细的,不然这里的话C1就失效了
                # if(y<=bird.Y):
                #     bird.X = x
                #     bird.Y = y
                bird.X = x
                bird.Y = y
                if(bird.Y<=bird.PbestY):
                    bird.PBestX=bird.X
                    bird.PbestY = bird.Y

                #个体中的最优一定包含了全局经历过的最优值
                if(bird.PbestY<=Flag):
                    GBestX = bird.PBestX
                    Flag = bird.PbestY
            for bird in self.Population:
                bird.GBestX = GBestX
                bird.GBestY=Flag

if __name__ == '__main__':

    start = time.time()
    basePso = NewPsoRL()
    basePso.InitPopulation()
    basePso.Running()
    end = time.time()
    # for bird in basePso.Population:
    #     print(bird)
    print(basePso.Population[0])
    print("花费时长:",end-start)



这个类其实是有两个功能的,一个是传统的粒子群算法,对每次这里是封装了一个传统粒子群算法的,然后就是基于论文改造的一些方法,那么这部分的调用是通过别的类来调用的。这玩意可以提取出来,但是也有重复的地方(和基本的粒子群算法)所以我就没有单独提取出来了。

配置

这里也是有一个统一的配置中心进行管控的。
不过其实还有一些东西我是没有提取到这个配置里面的,懒得搞了。

#coding=utf-8
# 相关参数的设置通过配置中心完成
import sys
import os
sys.path.append(os.path.abspath(os.path.dirname(os.getcwd())))

C1=2.0
C2=2.0
C3=2.0
#如果超过四轮还没有得到更新的话,那么需要重新选择一个follow对于当前的粒子来说
M_follow = 4
W = 0.4
#单目标下定义的维度
DIM =5
#运行1000次(可以理解为训练1次这个粒子群要跑一千次)
IterationsNumber = 1000
X_down = -10.0
X_up = 10

V_min = -5.0
V_max = 5

#这个是按照论文进行的描述,分为5组,100个粒子是自己后面加的
ClusterSize = 20
ClusterNumber = 5
EPOCH = 20 #训练N轮
PopulationSize = 100

def LinearW(iterate):
    #传入迭代次数
    Wmax = 0.9
    Wmin = 0.4

    w = Wmax-(iterate*((Wmax-Wmin)/IterationsNumber))

    return w


训练实现

那么接下来就是如何实现论文当中的训练部分了。
因为论文也是说要先预训练嘛,所以你懂的。

神经网络的定义

这里我们想要实现这个玩意,我们需要先定义两个神经网络。

class Actor(nn.Module):
    """
    生成动作网络输入三个参数
    """
    def __init__(self,state):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state,64)
        self.fc1.weight.data.normal_(0, 0.1)
        self.fc2 = nn.Linear(64,64)
        self.fc2.weight.data.normal_(0, 0.1)
        self.以上是关于《Reinforcement learning based parameters adaption method for particleswarm optimization》代码复现的主要内容,如果未能解决你的问题,请参考以下文章

reinforcement learning和deep learning的区别

Reinforcement Learning Q-learning 算法学习-4

Deep Reinforcement Learning 深度增强学习资源

repost: Deep Reinforcement Learning

Reinforcement Learning Q-learning 算法学习-3

Reinforcement Learning