QLearning方法有著明顯的局限性,當(dāng)狀態(tài)和動(dòng)作空間是離散的且維數(shù)不高時(shí)可使用Q-Table存儲(chǔ)每個(gè)狀態(tài)動(dòng)作的Q值,而當(dāng)狀態(tài)和動(dòng)作時(shí)高維連續(xù)時(shí),該方法便不太適用。可以將Q-Table的更新問(wèn)題變成一個(gè)函數(shù)擬合問(wèn)題,通過(guò)更新參數(shù)θ使得Q函數(shù)逼近最優(yōu)Q值。DL是解決參數(shù)學(xué)習(xí)的有效方法,可以通過(guò)引進(jìn)DL來(lái)解決強(qiáng)化學(xué)習(xí)RL中擬合Q值函數(shù)問(wèn)題,但是要先解決一系列問(wèn)題:
- DL需要大量帶標(biāo)簽的樣本進(jìn)行監(jiān)督學(xué)習(xí),但RL只有reward返回值
- DL樣本獨(dú)立,但RL前后State狀態(tài)有關(guān)
- DL目標(biāo)分布固定,但RL的分布一直變化
- 使用非線(xiàn)性網(wǎng)絡(luò)表示值函數(shù)時(shí)會(huì)不穩(wěn)定
QLearning實(shí)現(xiàn):https://www.cnblogs.com/N3ptune/p/17341434.html
Deep Q-Network
此處將使用DQN來(lái)解決上述問(wèn)題,其算法流程包括:
- 首先初始化深度神經(jīng)網(wǎng)絡(luò),它將作為 Q 函數(shù)的近似值函數(shù)
- 初始化經(jīng)驗(yàn)回放緩沖區(qū),用于存儲(chǔ)智能體的經(jīng)驗(yàn),其中包括狀態(tài)、動(dòng)作、獎(jiǎng)勵(lì)、下一狀態(tài)等信息
- 智能體在環(huán)境中采取行動(dòng),根據(jù)行動(dòng)獲得獎(jiǎng)勵(lì),得到下一個(gè)狀態(tài),并將這些經(jīng)驗(yàn)添加到經(jīng)驗(yàn)回放緩沖區(qū)中
- 從經(jīng)驗(yàn)回放緩沖區(qū)中采樣一批經(jīng)驗(yàn),用于訓(xùn)練神經(jīng)網(wǎng)絡(luò)
- 根據(jù)神經(jīng)網(wǎng)絡(luò)計(jì)算每個(gè)動(dòng)作的 Q 值
- 選擇一個(gè)動(dòng)作,可以使用 ε-greedy 策略或者 softmax 策略等
- 根據(jù)選擇的動(dòng)作與環(huán)境互動(dòng),得到獎(jiǎng)勵(lì)和下一個(gè)狀態(tài),將經(jīng)驗(yàn)添加到經(jīng)驗(yàn)回放緩沖區(qū)中
- 使用經(jīng)驗(yàn)回放緩沖區(qū)中的數(shù)據(jù)對(duì)神經(jīng)網(wǎng)絡(luò)進(jìn)行訓(xùn)練,目標(biāo)是最小化 Q 值函數(shù)的平均誤差
- 將神經(jīng)網(wǎng)絡(luò)中的參數(shù)復(fù)制到目標(biāo)網(wǎng)絡(luò)中,每隔一段時(shí)間更新目標(biāo)網(wǎng)絡(luò),以提高穩(wěn)定性和收斂性
- 重復(fù)執(zhí)行步驟3-9,直到達(dá)到指定的訓(xùn)練輪數(shù)或者 Q 值函數(shù)收斂
此處要說(shuō)明的是,DQN要使用Reward來(lái)構(gòu)造標(biāo)簽,通過(guò)經(jīng)驗(yàn)回放來(lái)解決相關(guān)性以及非靜態(tài)分布問(wèn)題,使用一個(gè)CNN(Policy-Net)產(chǎn)生當(dāng)前Q值,使用另外一個(gè)CNN(Target-Net)產(chǎn)生Target Q值
在本問(wèn)題中,動(dòng)作空間依然是上下左右四個(gè)方向,以整個(gè)迷宮為狀態(tài),用0來(lái)標(biāo)記道路、-1表示障礙、1表示起點(diǎn)和終點(diǎn),2表示已經(jīng)走過(guò)的路徑
損失函數(shù)
Q的目標(biāo)值:
Q的預(yù)測(cè)值:
因此損失函數(shù)為:
經(jīng)驗(yàn)回放
經(jīng)驗(yàn)回放機(jī)制,不斷地將智能體與環(huán)境交互產(chǎn)生的經(jīng)驗(yàn)存儲(chǔ)到一個(gè)經(jīng)驗(yàn)池中,然后從這個(gè)經(jīng)驗(yàn)池中隨機(jī)抽取一定數(shù)量的經(jīng)驗(yàn),用于訓(xùn)練神經(jīng)網(wǎng)絡(luò),避免了數(shù)據(jù)的相關(guān)性和非靜態(tài)分布性。
經(jīng)驗(yàn)回放機(jī)制的優(yōu)點(diǎn)在于可以將不同時(shí)間點(diǎn)收集到的經(jīng)驗(yàn)混合在一起,使得訓(xùn)練的樣本具有更大的多樣性,避免了訓(xùn)練樣本的相關(guān)性,從而提高了訓(xùn)練的穩(wěn)定性和效率。此外,經(jīng)驗(yàn)回放機(jī)制還可以減少因?yàn)闃颖痉植嫉母淖兌斐傻挠?xùn)練不穩(wěn)定問(wèn)題。
在DQN中,經(jīng)驗(yàn)回放機(jī)制的具體實(shí)現(xiàn)方式是將智能體與環(huán)境的交互序列(state, action, reward, next state)存儲(chǔ)在一個(gè)經(jīng)驗(yàn)池中,當(dāng)神經(jīng)網(wǎng)絡(luò)進(jìn)行訓(xùn)練時(shí),從經(jīng)驗(yàn)池中隨機(jī)抽取一定數(shù)量的經(jīng)驗(yàn)序列,用于訓(xùn)練網(wǎng)絡(luò)。這種方法可以減少數(shù)據(jù)的相關(guān)性,同時(shí)還可以重復(fù)利用之前的經(jīng)驗(yàn),提高數(shù)據(jù)的利用率。
代碼實(shí)現(xiàn)
首先實(shí)現(xiàn)一個(gè)神經(jīng)網(wǎng)絡(luò),如上述分析,該網(wǎng)絡(luò)用于擬合Q函數(shù),接收一個(gè)狀態(tài)作為輸入,然后在其隱藏層中執(zhí)行一系列非線(xiàn)性轉(zhuǎn)換,最終輸出狀態(tài)下所有可能動(dòng)作的Q值。這些Q值可以被用來(lái)選擇下一步要執(zhí)行的動(dòng)作。
# Deep Q Network
class DQNet(nn.Module):
def __init__(self):
super(DQNet,self).__init__()
self.conv1 = nn.Conv2d(1,32,kernel_size=3,stride=1,padding=1)
self.conv2 = nn.Conv2d(32,64,kernel_size=3,stride=1,padding=1)
self.fc1 = nn.Linear(64*8*8,256)
self.dropout = nn.Dropout(p=0.5)
self.fc2 = nn.Linear(256,4)
def forward(self,x):
x = x.view(-1,1,8,8)
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = x.view(-1,64*8*8)
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
定義經(jīng)驗(yàn)回放緩沖
class ReplayBuffer:
# 初始化緩沖區(qū)
def __init__(self,capacity):
self.capacity = capacity
self.buffer = []
# 將一條經(jīng)驗(yàn)數(shù)據(jù)添加到緩沖區(qū)中
def push(self,state,action,reward,next_state,done):
if len(self.buffer) >= self.capacity:
self.buffer.pop(0)
self.buffer.append((state,action,reward,next_state,done))
# 隨機(jī)從緩沖區(qū)抽取batch_size大小的經(jīng)驗(yàn)數(shù)據(jù)
def sample(self,batch_size):
states,actions,rewards,next_states,dones = zip(*random.sample(self.buffer,batch_size))
return states,actions,rewards,next_states,dones
def __len__(self):
return len(self.buffer)
定義智能體:
class DQNAgent:
def __init__(self,state_size,action_size):
self.state_size = state_size # 狀態(tài)空間
self.action_size = action_size # 動(dòng)作空間
self.q_net = DQNet() # 估計(jì)動(dòng)作價(jià)值 神經(jīng)網(wǎng)絡(luò)
self.target_q_net = DQNet() # 計(jì)算目標(biāo)值 神經(jīng)網(wǎng)絡(luò)
self.target_q_net.load_state_dict(self.q_net.state_dict())
self.optimizer = optim.Adam(self.q_net.parameters(),lr=0.001) # 初始化Adam優(yōu)化器
self.memory = ReplayBuffer(capacity=10000) # 經(jīng)驗(yàn)回放緩沖區(qū)
self.gamma = 0.99 # 折扣因子
self.epsilon = 1.0 # 探索率
self.epsilon_decay = 0.99995 # 衰減因子
self.epsilon_min = 0.01 # 探索率最小值
self.batch_size = 64 # 經(jīng)驗(yàn)回放每個(gè)批次大小
self.update_rate = 200 # 網(wǎng)絡(luò)更新頻率
self.steps = 0 # 總步數(shù)
# 探索策略 在給定狀態(tài)下采取動(dòng)作
def get_action(self,state):
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size) # 隨機(jī)選擇動(dòng)作
state = torch.from_numpy(state).float().unsqueeze(0)
q_values = self.q_net(state)
return torch.argmax(q_values,dim=1).item()
# 將狀態(tài)轉(zhuǎn)移元組存儲(chǔ)到經(jīng)驗(yàn)回放緩沖區(qū)
def remember(self,state,action,reward,next_state,done):
self.memory.push(state,action,reward,next_state,done)
# 從經(jīng)驗(yàn)回放緩沖區(qū)抽取一個(gè)批次的轉(zhuǎn)移樣本
def relay(self):
if len(self.memory) < self.batch_size:
return
# 從回放經(jīng)驗(yàn)中抽取數(shù)據(jù)
states,actions,rewards,next_states,dones = self.memory.sample(self.batch_size)
states = torch.from_numpy(np.array(states)).float()
actions = torch.from_numpy(np.array(actions)).long()
rewards = torch.from_numpy(np.array(rewards)).float()
next_states = torch.from_numpy(np.array(next_states)).float()
dones = torch.from_numpy(np.array(dones)).long()
q_targets = self.target_q_net(next_states).detach() # 計(jì)算下一狀態(tài)Q值
q_targets[dones] = 0.0 # 對(duì)于已完成狀態(tài) 將Q值設(shè)置為0
# 計(jì)算目標(biāo)Q值
q_targets = rewards.unsqueeze(1) + self.gamma * torch.max(q_targets,dim=1)[0].unsqueeze(1)
q_expected = self.q_net(states).gather(1,actions.unsqueeze(1)) # 計(jì)算當(dāng)前狀態(tài)Q值
# 計(jì)算損失值
loss = F.mse_loss(q_expected,q_targets)
# 通過(guò)反向傳播更新神經(jīng)網(wǎng)絡(luò)的參數(shù)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.steps += 1
# 隔一定步數(shù) 更新目標(biāo)網(wǎng)絡(luò)
if self.steps % self.update_rate == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict())
# 更新epsilon值 使得探索時(shí)間衰減
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def train(self,env,episodes):
steps = []
for episode in range(episodes):
env.reset(complete=False)
step = 0
while True:
step += 1
action = self.get_action(env.state) # 獲取動(dòng)作
next_state,reward,done = env.step(action) # 執(zhí)行動(dòng)作
agent.remember(env.state,action,reward,next_state,done)
agent.relay()
env.state = next_state # 更新地圖狀態(tài)
if done or step > 200:
break
steps.append(step)
return steps
def test(self,env):
step = 0
while True:
step += 1
action = self.get_action(env.state)
next_state,reward,done = env.step(action)
env.state = next_state
if done or step > 1000:
break
def save(self,path):
torch.save(self.q_net.state_dict(),path+"/value_model.pt")
torch.save(self.target_q_net.state_dict(),path+"/target_model.pt")
def load(self,path):
self.q_net.load_state_dict(torch.load(path+"/value_model.pt"))
self.target_q_net.load_state_dict(torch.load(path+"/target_model.pt"))
在定義中,init函數(shù)用于初始化對(duì)象:
def __init__(self,state_size,action_size):
self.state_size = state_size # 狀態(tài)空間
self.action_size = action_size # 動(dòng)作空間
self.q_net = DQNet() # 估計(jì)動(dòng)作價(jià)值 神經(jīng)網(wǎng)絡(luò)
self.target_q_net = DQNet() # 計(jì)算目標(biāo)值 神經(jīng)網(wǎng)絡(luò)
self.target_q_net.load_state_dict(self.q_net.state_dict())
self.optimizer = optim.Adam(self.q_net.parameters(),lr=0.001) # 初始化Adam優(yōu)化器
self.memory = ReplayBuffer(capacity=10000) # 經(jīng)驗(yàn)回放緩沖區(qū)
self.gamma = 0.99 # 折扣因子
self.epsilon = 1.0 # 探索率
self.epsilon_decay = 0.99995 # 衰減因子
self.epsilon_min = 0.01 # 探索率最小值
self.batch_size = 64 # 經(jīng)驗(yàn)回放每個(gè)批次大小
self.update_rate = 200 # 網(wǎng)絡(luò)更新頻率
self.steps = 0 # 總步數(shù)
上述包含了一些DQN的重要參數(shù)
在智能體選取動(dòng)作時(shí),依然使用QL中的貪婪策略
# 探索策略 在給定狀態(tài)下采取動(dòng)作
def get_action(self,state):
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size) # 隨機(jī)選擇動(dòng)作
state = torch.from_numpy(state).float().unsqueeze(0)
q_values = self.q_net(state)
return torch.argmax(q_values,dim=1).item()
與QL不同的是,Q值由神經(jīng)網(wǎng)絡(luò)求得
下述函數(shù)用于將五元組存儲(chǔ)到經(jīng)驗(yàn)回放緩沖區(qū)
# 將狀態(tài)轉(zhuǎn)移元組存儲(chǔ)到經(jīng)驗(yàn)回放緩沖區(qū)
def remember(self,state,action,reward,next_state,done):
self.memory.push(state,action,reward,next_state,done)
經(jīng)驗(yàn)回放:
# 從經(jīng)驗(yàn)回放緩沖區(qū)抽取一個(gè)批次的轉(zhuǎn)移樣本
def relay(self):
if len(self.memory) < self.batch_size:
return
# 從回放經(jīng)驗(yàn)中抽取數(shù)據(jù)
states,actions,rewards,next_states,dones = self.memory.sample(self.batch_size)
states = torch.from_numpy(np.array(states)).float()
actions = torch.from_numpy(np.array(actions)).long()
rewards = torch.from_numpy(np.array(rewards)).float()
next_states = torch.from_numpy(np.array(next_states)).float()
dones = torch.from_numpy(np.array(dones)).long()
q_targets = self.target_q_net(next_states).detach() # 計(jì)算下一狀態(tài)Q值
q_targets[dones] = 0.0 # 對(duì)于已完成狀態(tài) 將Q值設(shè)置為0
# 計(jì)算目標(biāo)Q值
q_targets = rewards.unsqueeze(1) + self.gamma * torch.max(q_targets,dim=1)[0].unsqueeze(1)
q_expected = self.q_net(states).gather(1,actions.unsqueeze(1)) # 計(jì)算當(dāng)前狀態(tài)Q值
# 計(jì)算損失值
loss = F.mse_loss(q_expected,q_targets)
# 通過(guò)反向傳播更新神經(jīng)網(wǎng)絡(luò)的參數(shù)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.steps += 1
# 隔一定步數(shù) 更新目標(biāo)網(wǎng)絡(luò)
if self.steps % self.update_rate == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict())
# 更新epsilon值 使得探索時(shí)間衰減
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
在每個(gè)時(shí)間步,從經(jīng)驗(yàn)回放池中隨機(jī)選擇一批先前觀察到的狀態(tài)和動(dòng)作,然后計(jì)算它們的Q值。之后可以使用這些Q值來(lái)計(jì)算一個(gè)損失函數(shù),該函數(shù)衡量當(dāng)前的Q函數(shù)與理論上的Q函數(shù)之間的差距。最后使用反向傳播算法來(lái)更新神經(jīng)網(wǎng)絡(luò)的權(quán)重,以最小化損失函數(shù)。
訓(xùn)練函數(shù):
def train(self,env,episodes):
steps = []
for episode in range(episodes):
env.reset(complete=False)
step = 0
while True:
step += 1
action = self.get_action(env.state) # 獲取動(dòng)作
next_state,reward,done = env.step(action) # 執(zhí)行動(dòng)作
agent.remember(env.state,action,reward,next_state,done)
agent.relay()
env.state = next_state # 更新地圖狀態(tài)
if done or step > 200:
break
steps.append(step)
return steps
在該函數(shù)中會(huì)讓智能體進(jìn)行數(shù)次游戲,每次游戲開(kāi)始時(shí)會(huì)重置狀態(tài),但不重置迷宮,并且設(shè)置一個(gè)閾值,讓智能體步數(shù)達(dá)到這個(gè)值時(shí)終止游戲,否則智能體有概率不斷滯留。在智能體每次選擇動(dòng)作并執(zhí)行后,會(huì)將這次的狀態(tài)和動(dòng)作以及獎(jiǎng)賞儲(chǔ)存到經(jīng)驗(yàn)池中,之后進(jìn)行經(jīng)驗(yàn)回放,訓(xùn)練網(wǎng)絡(luò)。
定義環(huán)境
定義一個(gè)迷宮環(huán)境,和智能體進(jìn)行交互:
class MazeEnv:
def __init__(self,size):
self.size = size
self.actions = [0,1,2,3]
self.maze,self.start,self.end = self.generate(size)
self.state = np.expand_dims(self.maze,axis=2).copy()
def reset(self,complete=False):
if complete:
# 重置迷宮
self.maze,self.start,self.end = self.generate(self.size)
self.state = np.expand_dims(self.maze,axis=2)
self.position = self.start
self.goal = self.end
self.path = [self.start]
return self.state
def step(self, action):
# 執(zhí)行動(dòng)作
next_position = None
if action == 0 and self.position[0] > 0:
next_position = (self.position[0]-1, self.position[1])
elif action == 1 and self.position[0] < self.size-1:
next_position = (self.position[0]+1, self.position[1])
elif action == 2 and self.position[1] > 0:
next_position = (self.position[0], self.position[1]-1)
elif action == 3 and self.position[1] < self.size-1:
next_position = (self.position[0], self.position[1]+1)
else:
next_position = self.position
if next_position == self.goal:
reward = 500
elif self.maze[next_position] == -1:
reward = -300
else:
reward = -10
self.position = next_position # 更新位置
self.path.append(self.position) # 加入路徑
next_state = self.state.copy()
next_state[self.position] = 2 # 標(biāo)記路徑
done = (self.position == self.goal) # 判斷是否結(jié)束
return next_state, reward, done
@staticmethod
# 生成迷宮圖像
def generate(size):
maze = np.zeros((size, size))
# Start and end points
start = (random.randint(0, size-1), 0)
end = (random.randint(0, size-1), size-1)
maze[start] = 1
maze[end] = 1
# Generate maze walls
for i in range(size * size):
x, y = random.randint(0, size-1), random.randint(0, size-1)
if (x, y) == start or (x, y) == end:
continue
if random.random() < 0.2:
maze[x, y] = -1
if np.sum(np.abs(maze)) == size*size - 2:
break
return maze, start, end
@staticmethod
# BFS求出路徑
def solve_maze(maze, start, end):
size = maze.shape[0]
visited = np.zeros((size, size))
solve = np.zeros((size,size))
queue = [start]
visited[start[0],start[1]] = 1
while queue:
x, y = queue.pop(0)
if (x, y) == end:
break
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
nx, ny = x + dx, y + dy
if nx < 0 or nx >= size or ny < 0 or ny >= size or visited[nx, ny] or maze[nx, ny] == -1:
continue
queue.append((nx, ny))
visited[nx, ny] = visited[x, y] + 1
if visited[end[0],end[1]] == 0:
return solve,[]
path = [end]
x, y = end
while (x, y) != start:
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
nx, ny = x + dx, y + dy
if nx < 0 or nx >= size or ny < 0 or ny >= size or visited[nx, ny] != visited[x, y] - 1:
continue
path.append((nx, ny))
x, y = nx, ny
break
points = path[::-1] # 倒序
for point in points:
solve[point[0]][point[1]] = 1
return solve, points
模型訓(xùn)練
初始化,這里針對(duì)8*8迷宮
maze_size = 8
input_shape = (maze_size,maze_size,1)
num_actions = 4
agent = DQNAgent(input_shape,num_actions)
env = MazeEnv(maze_size)
定義一個(gè)函數(shù),用于繪制迷宮:
from PIL import Image
def maze_to_image(maze, path):
size = maze.shape[0]
img = Image.new('RGB', (size, size), (255, 255, 255))
pixels = img.load()
for i in range(size):
for j in range(size):
if maze[i, j] == -1:
pixels[j, i] = (0, 0, 0)
elif maze[i, j] == 1:
pixels[j, i] = (0, 255, 0)
for x, y in path:
pixels[y, x] = (255, 0, 0)
return np.array(img)
執(zhí)行訓(xùn)練:
for epoch in range(100):
steps = agent.train(env,50)
plt.imshow(maze_to_image(env.maze,[]))
plt.savefig(f"mazes/{epoch+1}.png") # 保存迷宮原始圖像
plt.clf()
plt.plot(steps)
plt.xlabel('Episode')
plt.ylabel('Steps')
plt.title('Training Steps')
plt.savefig(f"train/{epoch+1}.png") # 保存訓(xùn)練圖像
plt.clf()
solve = maze_to_image(env.maze,env.path)
plt.imshow(solve)
plt.savefig(f"solves/{epoch+1}.png") # 保存最后一次路線(xiàn)
plt.clf()
env.reset(complete=True) # 完全重置環(huán)境
agent.save("model")
抽取一些訓(xùn)練時(shí)的圖片:
第1次訓(xùn)練:
迷宮圖像:

最后一次路線(xiàn)圖:

訓(xùn)練圖像:
執(zhí)行步數(shù)不穩(wěn)定,有多次超出閾值
第10次:



第50次:

盡管效率很高,但依然觸碰了障礙物
第100次:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-421803.html



這次不僅沒(méi)有觸碰障礙物,并且非常接近最優(yōu)解文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-421803.html
到了這里,關(guān)于基于深度強(qiáng)化學(xué)習(xí)(DQN)的迷宮尋路算法的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!