Tensorflow2.0|基于深度强化学习(DQN)实现动态柔性作业车间调度问题(DFJSP)
Posted 码丽莲梦露
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tensorflow2.0|基于深度强化学习(DQN)实现动态柔性作业车间调度问题(DFJSP)相关的知识,希望对你有一定的参考价值。
注:本文的python实现基于论文《Dynamic scheduling for flexible job shop with new job insertions by deep reinforcement learning》
论文详情可见:论文阅读|《用强化学习求解带插单的动态FJSP》
其他相关阅读可见个人CSDN专栏之《论文阅读与实现》,如:
论文阅读|《基于仿真的具有批次放行和扩展技术优先约束的动态作业车间调度问题调度规则研究》
论文阅读|《一种基于非支配排序的改进生物地理学优化算法求解多目标FJSP》
论文阅读|《面向多目标柔性作业车间调度的强化学习NSGA-Ⅱ算法》
论文阅读|《 基于强化学习的自适应遗传算法求解柔性作业车间调度问题》
作为一个刚入门的小白博主,非常愿意与各位同僚交流,欢迎叨扰!
1 代码
1.1 算例生成(Instance_Generator)
import random
import numpy as np
Total_Machine=[10,20,30,40,50] #全部机器
Initial_Job_num=20 #初始工件个数
Job_insert=[50,100,200] #工件新到达个数
DDT=[0.5,1.0,1.5] #工件紧急程度
E_ave=[50,100,200] #指数分布
def Instance_Generator(M_num,E_ave,New_insert,DDT):
'''
:param M_num: Machine Number
:param E_ave: exponetional distribution
:param New_insert: New Job insert
:param DDT:DDT
:return: Processing time,A:New Job arrive time,
D:Deliver time,
M_num: Machine Number,
Op_num: Operation Number,
J_num:Job NUMBER
'''
Initial_Job_num=20
Op_num=[random.randint(1,20) for i in range(New_insert+Initial_Job_num)]
Processing_time=[]
for i in range(Initial_Job_num+New_insert):
Job_i=[]
for j in range(Op_num[i]):
Ava_M = [random.randint(1, M_num) for i in range(Op_num[i])]
O_i=list(np.ones(M_num)*(-1))
for k in Ava_M:
T=list(range(M_num))
random.shuffle(T)
T=T[0:k+1]
for M_i in range(len(O_i)):
if M_i in T:
O_i[M_i]=random.randint(1,50)
Job_i.append(O_i)
Processing_time.append(Job_i)
A1=[0 for i in range(Initial_Job_num)]
A=np.random.exponential(E_ave, size=New_insert)
A=[int(A[i]) for i in range(len(A))]#New Insert Job arrive time
A1.extend(A)
T_ijave = []
for i in range(New_insert):
Tad = []
for j in range(Op_num[i]):
T_ijk = [k for k in Processing_time[i][j] if k != -1]
Tad.append(sum(T_ijk) / len(T_ijk))
T_ijave.append(sum(Tad))
D1=[int(T_ijave[i]*DDT) for i in range(Initial_Job_num)]
D=[int(A[i]+T_ijave[i]*DDT) for i in range(New_insert)]
D1.extend(D)
O_num=sum(Op_num)
J=dict(enumerate(Op_num))
J_num=Initial_Job_num+New_insert
return Processing_time,A1,D1,M_num,Op_num,J,O_num,J_num
Processing_time,A,D,M_num,Op_num,J,O_num,J_num=Instance_Generator(10,50,50,0.5)
1.2 柔性作业车间的机器和工件类(Object_for_FJSP)
class Object:
def __init__(self,I):
self.I=I
self.Start=[]
self.End=[]
self.T=[]
self.assign_for=[]
def _add(self,S,E,obs,t):
#obs:安排的对象
self.Start.append(S)
self.End.append(E)
self.Start.sort()
self.End.sort()
self.T.append(t)
self.assign_for.insert(self.End.index(E),obs)
def idle_time(self):
Idle=[]
try:
if self.Start[0]!=0:
Idle.append([0,self.Start[0]])
K=[[self.End[i],self.Start[i+1]] for i in range(len(self.End)) if self.Start[i+1]-self.End[i]>0]
Idle.extend(K)
except:
pass
return Idle
1.3 车间状态和动作(Job_shop)
import numpy as np
import random
from Instance_Generator import Processing_time,A,D,M_num,Op_num,J,O_num,J_num
from Object_for_FJSP import Object
class Situation:
def __init__(self,J_num,M_num,O_num,J,Processing_time,D,Ai):
self.Ai=Ai #工件到达时间
self.D=D #交货期
self.O_num=O_num #工序总数
self.M_num=M_num #机器数
self.J_num=J_num #工件数
self.J=J #工件对应的工序数
self.Processing_time = Processing_time # 加工时间
self.CTK=[0 for i in range(M_num)] #各机器上最后一道工序的完工时间列表
self.OP=[0 for i in range(J_num)] #各工件的已加工工序数列表
self.UK=[0 for i in range(M_num)] #各机器的实际使用率
self.CRJ=[0 for i in range(J_num)] #工件完工率
# 工件集:
self.Jobs=[]
for i in range(J_num):
F=Object(i)
self.Jobs.append(F)
#机器集
self.Machines = []
for i in range(M_num):
F = Object(i)
self.Machines.append(F)
#更新数据
def _Update(self,Job,Machine):
self.CTK[Machine]=max(self.Machines[Machine].End)
self.OP[Job]+=1
self.UK[Machine]=sum(self.Machines[Machine].T)/self.CTK[Machine]
self.CRJ[Job]=self.OP[Job]/self.J[Job]
#机器平均使用率
def Features(self):
#1 机器平均利用率
U_ave=sum(self.UK)/self.M_num
K=0
for uk in self.UK:
K+=np.square(uk-U_ave)
#2 机器的使用率标准差
U_std=np.sqrt(K/self.M_num)
#3 平均工序完成率
CRO_ave=sum(self.OP)/self.O_num
#4 平均工件工序完成率
CRJ_ave=sum(self.CRJ)/self.J_num
K = 0
for uk in self.CRJ:
K += np.square(uk - CRJ_ave)
#5 工件工序完成率标准差
CRJ_std=np.sqrt(K/self.J_num)
#6 Estimated tardiness rate Tard_e
T_cur=sum(self.CTK)/self.M_num
N_tard,N_left=0,0
for i in range(self.J_num):
if J[i]>self.OP[i]:
N_left+=self.J[i]-self.OP[i]
T_left=0
for j in range(self.OP[i]+1,J[i]):
M_ij=[k for k in self.Processing_time[i][j] if k>0 or k<999]
T_left+=sum(M_ij)/len(M_ij)
if T_left+T_cur>self.D[i]:
N_tard+=self.J[i]-j+1
try:
Tard_e=N_tard/N_left
except:
Tard_e =9999
#7 Actual tardiness rate Tard_a
N_tard, N_left = 0, 0
for i in range(self.J_num):
if J[i] > self.OP[i]:
N_left += self.J[i] - self.OP[i]
try:
if self.CTK[i] > self.D[i]:
N_tard += self.J[i] - j
except:
pass
Tard_a = N_tard / N_left
return U_ave,U_std,CRO_ave,CRJ_ave,CRJ_std,Tard_e,Tard_a
#Composite dispatching rule 1
#return Job,Machine
def rule1(self):
#T_cur:平均完工时间
T_cur = sum(self.CTK) / self.M_num
#Tard_Job:不能按期完成的工件
Tard_Job=[i for i in range(self.J_num) if self.OP[i]<self.J[i] and self.D[i]<T_cur]
UC_Job=[j for j in range(self.J_num) if self.OP[j]<self.J[j]]
if Tard_Job==[]:
Job_i=UC_Job[np.argmax([(self.D[i]-T_cur)/(self.J[i]-self.OP[i]) for i in UC_Job ])]
else:
T_ijave=[]
for i in Tard_Job:
Tad=[]
for j in range(self.OP[i],self.J[i]):
T_ijk=[k for k in self.Processing_time[i][j] if k!=-1]
Tad.append(sum(T_ijk)/len(T_ijk))
T_ijave.append(T_cur+sum(Tad)-self.D[i])
Job_i=Tard_Job[np.argmax(T_ijave)]
try:
C_ij = max(self.Jobs[Job_i].End)
except:
C_ij =0
A_ij=self.Ai[Job_i] #工件i的arrival time
On=len(self.Jobs[Job_i].End)
# print('这是各工件的工序加工情况:',self.OP[Job_i])
# print('工序数是否超出了工件的工序数',self.J[Job_i],On)
Mk=[]
for i in range(len(self.CTK)):
if self.Processing_time[Job_i][On][i]!=-1:
Mk.append(max(C_ij,A_ij,i))
else:
Mk.append(9999)
Machine=np.argmin(Mk)
return Job_i,Machine
#Composite dispatching rule 2
#return Job,Machine
def rule2(self):
# T_cur:平均完工时间
T_cur = sum(self.CTK) / self.M_num
# Tard_Job:不能按期完成的工件
Tard_Job = [i for i in range(self.J_num) if self.OP[i] < self.J[i] and self.D[i] < T_cur]
UC_Job = [j for j in range(self.J_num) if self.OP[j] < self.J[j]]
T_ijave = []
for i in range(self.J_num):
Tad = []
for j in range(self.OP[i], self.J[i]):
T_ijk = [k for k in self.Processing_time[i][j] if k != -1]
Tad.append(sum(T_ijk) / len(T_ijk))
T_ijave.append(sum(Tad))
if Tard_Job==[]:
Job_i=UC_Job[np.argmin([(self.D[i]-T_cur)/T_ijave[i] for i in UC_Job])]
else:
Job_i=Tard_Job[np.argmax([T_cur+T_ijave[i]-self.D[i] for i in Tard_Job ])]
try:
C_ij = max(self.Jobs[Job_i].End)
except:
C_ij =0
A_ij = self.Ai[Job_i] # 工件i的arrival time
On = len(self.Jobs[Job_i].End)
Mk = []
for i in range(len(self.CTK)):
if self.Processing_time[Job_i][On][i] != -1:
Mk.append(max(C_ij, A_ij, i))
else:
Mk.append(9999)
Machine = np.argmin(Mk)
return Job_i,Machine
#Composite dispatching rule 3
def rule3(self):
# T_cur:平均完工时间
T_cur = sum(self.CTK) / self.M_num
# Tard_Job:不能按期完成的工件
UC_Job = [j for j in range(self.J_num) if self.OP[j] < self.J[j]]
T_ijave = []
for i in UC_Job:
Tad = []
for j in range(self.OP[i], self.J[i]):
T_ijk = [k for k in self.Processing_time[i][j] if k != -1]
Tad.append(sum(T_ijk) / len(T_ijk))
T_ijave.append(T_cur + sum(Tad) - self.D[i])
Job_i = UC_Job[np.argmax(T_ijave)]
On = len(self.Jobs[Job_i].End)
if random.random()<0.5:
U=[]
for i in range(len(self.UK)):
if self.Processing_time[Job_i][On][i]==-1:
U.append(9999)
else:
U.append(self.UK[i])
Machine=np.argmin(U)
else:
MT=[]
for j in range(self.M_num):
if self.Processing_time[Job_i][On][j]==-1:
MT.append(9999)
else:
MT.append(sum(self.Machines[j].T))
Machine=np.argmin(MT)
return Job_i,Machine
#Composite dispatching rule 4
def rule4(self):
UC_Job = [j for j in range(self.J_num) if self.OP[j] < self.J[j]]
Job_i= random.choice(UC_Job)
try:
C_ij = max(self.Jobs[Job_i].End)
except:
C_ij =0
A_ij = self.Ai[Job_i] # 工件i的arrival time
On = len(self.Jobs[Job_i].End)
Mk = []
for i in range(len(self.CTK)):
if self.Processing_time[Job_i][On][i] != -1:
Mk.append(max(C_ij, A_ij, i))
else:
Mk.append(9999)
Machine = np.argmin(Mk)
return Job_i,Machine
#Composite dispatching rule 5
def rule5(self):
# T_cur:平均完工时间
T_cur = sum(self.CTK) / self.M_num
# Tard_Job:不能按期完成的工件
Tard_Job = [i for i in range(self.J_num) if self.OP[i] < self.J[i] and self.D[i] < T_cur]
UC_Job = [j for j in range(self.J_num) if self.OP[j] < self.J[j]]
if Tard_Job==[]:
Job_i=UC_Job[np.argmin([self.CRJ[i]*(self.D[i]-T_cur) for i in UC_Job])]
else:
T_ijave = []
for i in Tard_Job:
Tad = []
for j in range(self.OP[i], self.J[i]):
T_ijk = [k for k in self.Processing_time[i][j] if k != -1]
Tad.append(sum(T_ijk) / len(T_ijk))
T_ijave.append(1/(self.CRJ[i]+1)*(T_cur + sum(Tad) - self.D[i]))
Job_i = Tard_Job[np.argmax(T_ijave)]
try:
C_ij = max(self.Jobs[Job_i].End)
except:
C_ij =0
A_ij = self.Ai[Job_i] # 工件i的arrival time
On = len(self.Jobs[Job_i].End)
Mk = []
for i in range(len(self.CTK)):
if self.Processing_time[Job_i][On][i] != -1:
Mk.append(max(C_ij, A_ij, i))
else:
Mk.append(9999)
Machine = np.argmin(Mk)
return Job_i, Machine
#Composite dispatching rule 6
#return Job,Machine
def rule6(self):
# T_cur:平均完工时间
T_cur = sum(self.CTK) / self.M_num
UC_Job = [j for j in range(self.J_num) if self.OP[j] < self.J[j]]
T_ijave = []
for i in UC_Job:
Tad = []
for j in range(self.OP[i], self.J[i]):
T_ijk = [k for k in self.Processing_time[i][j] if k != -1]
Tad.append(sum(T_ijk) / len(T_ijk))
T_ijave.append(T_cur + sum(Tad) - self.D[i])
Job_i = UC_Job[np.argmax(T_ijave)]
try:
C_ij = max(self.Jobs[Job_i].End)
except:
C_ij =0
A_ij = self.Ai[Job_i] # 工件i的arrival time
On = len(self.Jobs[Job_i].End)
Mk = []
for i in range(len(self.CTK)):
if self.Processing_time[Job_i][On][i] != -1:
Mk.append(max(C_ij, A_ij, i))
else:
Mk.append(9999)
Machine = np.argmin(Mk)
return Job_i,Machine
def scheduling(self,action):
Job,Machine=action[0],action[1]
O_n=len(self.Jobs[Job].End)
# print(Job, Machine,O_n)
Idle=self.Machines[Machine].idle_time()
try:
last_ot=max(self.Jobs[Job].End) #上道工序加工时间
except:
last_ot=0
try:
last_mt=max(self.Machines[Machine].End) #机器最后完工时间
except:
last_mt=0
Start_time=max(last_ot,last_mt)
PT=self.Processing_time[Job][O_n][Machine] #工序加工时间
for i in range(len(Idle)):
if Idle[i][1]-Idle[i][0]>PT:
if Idle[i][0]>last_ot:
start_time=Idle[i][0]
if Idle[i][0]<last_ot and Idle[i][1]-last_ot>PT:
start_time=last_ot
end_time=Start_time+PT
self.Machines[Machine]._add(Start_time,end_time,Job,PT)
self.Jobs[Job]._add(Start_time,end_time,Machine,PT)
self._Update(Job,Machine)
def reward(self,Ta_t,Te_t,Ta_t1,Te_t1,U_t,U_t1):
'''
:param Ta_t: Tard_a(t)
:param Te_t: Tard_e(t)
:param Ta_t1: Tard_a(t+1)
:param Te_t1: Tard_e(t+1)
:param U_t: U_ave(t)
:param U_t1: U_ave(t+1)
:return: reward
'''
if Ta_t1<Ta_t:
rt=1
else:
if Ta_t1>Ta_t:
rt=-1
else:
if Te_t1<Te_t:
rt=1
else:
if Te_t1>Te_t:
rt=1
else:
if U_t1>U_t:
rt=1
else:
if U_t1>0.95*U_t:
rt=0
else:
rt=-1
return rt
Sit=Situation(J_num,M_num,O_num,J,Processing_time,D,A)
1.4 DQN
import numpy as np
import random
from collections import deque
from tensorflow.keras import layers,models
import tensorflow as tf
from Job_Shop import Sit,O_num
from tensorflow.keras.optimizers import Adam
class DQN:
def __init__(self,):
self.Hid_Size = 30
# ------------Hidden layer=5 30 nodes each layer--------------
model = models.Sequential()
model.add(layers.Input(shape=(7,)))
model.add(layers.Dense(self.Hid_Size, name='l1'))
model.add(layers.Dense(self.Hid_Size, name='l2'))
model.add(layers.Dense(self.Hid_Size, name='l3'))
model.add(layers.Dense(self.Hid_Size, name='l4'))
model.add(layers.Dense(self.Hid_Size, name='l5'))
model.add(layers.Dense(6, name='l6'))
model.compile(loss='mse',
optimizer=Adam(learning_rate=0.001))
# # model.summary()
self.model = model
#------------Q-network Parameters-------------
self.act_dim=[1,2,3,4,5,6] #神经网络的输出节点
self.obs_n=[0,0,0,0,0,0,0] #神经网路的输入节点
self.gama = 0.95 # γ经验折损率
# self.lr = 0.001 # 学习率
self.global_step = 0
self.update_target_steps = 200 # 更新目标函数的步长
self.target_model = self.model
#-------------------Agent-------------------
self.e_greedy=0.01
self.e_greedy_decrement=0.0003
self.L=10 #Number of training episodes L
#---------------Replay Buffer---------------
self.buffer=deque()
self.memory_size = 5000 # 记忆上限
self.memory_counter = 0 # 当前记忆数
self.Batch_size=32 # Batch Size of Samples to perform gradient descent
def replace_target(self):
self.target_model.get_layer(name='l1').set_weights(self.model.get_layer(name='l1').get_weights())
self.target_model.get_layer(name='l2').set_weights(self.model.get_layer(name='l2').get_weights())
self.target_model.get_layer(name='l3').set_weights(self.model.get_layer(name='l3').get_weights())
self.target_model.get_layer(name='l4').set_weights(self.model.get_layer(name='l4').get_weights())
self.target_model.get_layer(name='l5').set_weights(self.model.get_layer(name='l5').get_weights())
self.target_model.get_layer(name='l6').set_weights(self.model.get_layer(name='l6').get_weights())
def replay(self):
if self.global_step % self.update_target_steps == 0:
self.replace_target()
# replay the history and train the model
minibatch = random.sample(self.buffer, self.Batch_size)
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
k=self.target_model.predict(next_state)
target = (reward + self.gama *
np.argmax(self.target_model.predict(next_state)))
target_f = self.model.predict(state)
target_f[0][action] = target
self.model.fit(state, target_f, epochs=1, verbose=0)
self.global_step += 1
def Select_action(self,obs):
obs=np.expand_dims(obs,0)
if random.random()<self.e_greedy:
act=random.randint(0,6)
else:
act=np.argmax(self.model.predict(obs))
self.e_greedy = max(
0.01, self.e_greedy - self.e_greedy_decrement) # 随着训练逐步收敛,探索的程度慢慢降低
return act
def _append(self, exp):
self.buffer.append(exp)
def main(self,O_num):
Total_reward=0
for i in range(self.L):
print('-----------开始第',i,'次训练-----------')
obs=[0 for i in range(7)]
k=0
done=False
for i in range(O_num):
k+=1
at=self.Select_action(obs)
# print(at)
if at==0:
at_trans=Sit.rule1()
if at==1:
at_trans=Sit.rule2()
if at==2:
at_trans=Sit.rule3()
if at==3:
at_trans=Sit.rule4()
if at==4:
at_trans=Sit.rule5()
if at==5:
at_trans=Sit.rule6()
# at_trans=self.act[at]
print('执行action:',at)
print('---将工件---',at_trans[0],'---安排到机器---',at_trans[1])
Sit.scheduling(at_trans)
obs_t=Sit.Features()
if i==O_num-1:
done=True
obs = obs_t
obs_t = np.expand_dims(obs_t, 0)
obs = np.expand_dims(obs, 0)
# print(obs,obs_t)
r_t = Sit.reward(obs[0][6],obs[0][5],obs_t[0][6],obs_t[0][5],obs[0][0],obs_t[0][0])
self._append((obs,at,r_t,obs_t,done))
if k>self.Batch_size:
# batch_obs, batch_action, batch_reward, batch_next_obs,done= self.sample()
self.replay()
Total_reward+=r_t
M=Sit.Machines
E=0
for Mi in M:
if max(Mi.End)>E:
E=max(Mi.End)
print('完工时间:',E)
return Total_reward
act=[Sit.rule1(),Sit.rule2(),Sit.rule3(),Sit.rule4(),Sit.rule5(),Sit.rule6]
d=DQN()
d.main(O_num)
以上是关于Tensorflow2.0|基于深度强化学习(DQN)实现动态柔性作业车间调度问题(DFJSP)的主要内容,如果未能解决你的问题,请参考以下文章
神经网络与深度学习-基于Tensorflow2.0的手势识别(数据集和训练集)