目錄
一、強化學(xué)習(xí)的主要構(gòu)成
二、基于python的強化學(xué)習(xí)框架
三、gym
四、DQN算法
1.DQN算法兩個特點
(1)經(jīng)驗回放
(2)目標(biāo)網(wǎng)絡(luò)
2.DQN算法的流程
五、使用pytorch實現(xiàn)DQN算法
1.replay memory
2.神經(jīng)網(wǎng)絡(luò)部分
3.Agent
4.模型訓(xùn)練函數(shù)
5.訓(xùn)練模型
6.實驗結(jié)果
六、補充說明
一、強化學(xué)習(xí)的主要構(gòu)成
強化學(xué)習(xí)主要由兩部分組成:智能體(agent)和環(huán)境(env)。在強化學(xué)習(xí)過程中,智能體與環(huán)境一直在交互。智能體在環(huán)境里面獲取某個狀態(tài)后,它會利用該狀態(tài)輸出一個動作(action)。然后這個動作會在環(huán)境之中被執(zhí)行,環(huán)境會根據(jù)智能體采取的動作,輸出下一個狀態(tài)以及當(dāng)前這個動作帶來的獎勵。智能體的目的就是盡可能多地從環(huán)境中獲取獎勵。
二、基于python的強化學(xué)習(xí)框架
本次我使用到的框架是pytorch,因為DQN算法的實現(xiàn)包含了部分的神經(jīng)網(wǎng)絡(luò),這部分對我來說使用pytorch會更順手,所以就選擇了這個。
三、gym
gym?定義了一套接口,用于描述強化學(xué)習(xí)中的環(huán)境這一概念,同時在其官方庫中,包含了一些已實現(xiàn)的環(huán)境。
四、DQN算法
傳統(tǒng)的強化學(xué)習(xí)算法使用的是Q表格存儲狀態(tài)價值函數(shù)或者動作價值函數(shù),但是實際應(yīng)用時,問題在的環(huán)境可能有很多種狀態(tài),甚至數(shù)不清,所以這種情況下使用離散的Q表格存儲價值函數(shù)會非常不合理,所以DQN(Deep Q-learning)算法,使用神經(jīng)網(wǎng)絡(luò)擬合動作價值函數(shù)。
通常DQN算法只能處理動作離散,狀態(tài)連續(xù)的情況,使用神經(jīng)網(wǎng)絡(luò)擬合出動作價值函數(shù), 然后針對動作價值函數(shù),選擇出當(dāng)狀態(tài)state固定的Q值最大的動作a。
1.DQN算法兩個特點
(1)經(jīng)驗回放
每一次的樣本都放到樣本池中,所以可以多次反復(fù)的使用一個樣本,重復(fù)利用。訓(xùn)練時一次隨機抽取多個數(shù)據(jù)樣本來進行訓(xùn)練。
(2)目標(biāo)網(wǎng)絡(luò)
DQN算法的更新目標(biāo)時讓逼近, 但是如果兩個Q使用一個網(wǎng)絡(luò)計算,那么Q的目標(biāo)值也在不斷改變, 容易造成神經(jīng)網(wǎng)絡(luò)訓(xùn)練的不穩(wěn)定。DQN使用目標(biāo)網(wǎng)絡(luò),訓(xùn)練時目標(biāo)值Q使用目標(biāo)網(wǎng)絡(luò)來計算,目標(biāo)網(wǎng)絡(luò)的參數(shù)定時和訓(xùn)練網(wǎng)絡(luò)的參數(shù)同步。
2.DQN算法的流程
五、使用pytorch實現(xiàn)DQN算法
import time
import random
import torch
from torch import nn
from torch import optim
import gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque, namedtuple # 隊列類型
from tqdm import tqdm # 繪制進度條用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))
1.replay memory
class ReplayMemory(object):
def __init__(self, memory_size):
self.memory = deque([], maxlen=memory_size)
def sample(self, batch_size):
batch_data = random.sample(self.memory, batch_size)
state, action, reward, next_state, done = zip(*batch_data)
return state, action, reward, next_state, done
def push(self, *args):
# *args: 把傳進來的所有參數(shù)都打包起來生成元組形式
# self.push(1, 2, 3, 4, 5)
# args = (1, 2, 3, 4, 5)
self.memory.append(Transition(*args))
def __len__(self):
return len(self.memory)
2.神經(jīng)網(wǎng)絡(luò)部分
class Qnet(nn.Module):
def __init__(self, n_observations, n_actions):
super(Qnet, self).__init__()
self.model = nn.Sequential(
nn.Linear(n_observations, 128),
nn.ReLU(),
nn.Linear(128, n_actions)
)
def forward(self, state):
return self.model(state)
3.Agent
class Agent(object):
def __init__(self, observation_dim, action_dim, gamma, lr, epsilon, target_update):
self.action_dim = action_dim
self.q_net = Qnet(observation_dim, action_dim).to(device)
self.target_q_net = Qnet(observation_dim, action_dim).to(device)
self.gamma = gamma
self.lr = lr
self.epsilon = epsilon
self.target_update = target_update
self.count = 0
self.optimizer = optim.Adam(params=self.q_net.parameters(), lr=lr)
self.loss = nn.MSELoss()
def take_action(self, state):
if np.random.uniform(0, 1) < 1 - self.epsilon:
state = torch.tensor(state, dtype=torch.float).to(device)
action = torch.argmax(self.q_net(state)).item()
else:
action = np.random.choice(self.action_dim)
return action
def update(self, transition_dict):
states = transition_dict.state
actions = np.expand_dims(transition_dict.action, axis=-1) # 擴充維度
rewards = np.expand_dims(transition_dict.reward, axis=-1) # 擴充維度
next_states = transition_dict.next_state
dones = np.expand_dims(transition_dict.done, axis=-1) # 擴充維度
states = torch.tensor(states, dtype=torch.float).to(device)
actions = torch.tensor(actions, dtype=torch.int64).to(device)
rewards = torch.tensor(rewards, dtype=torch.float).to(device)
next_states = torch.tensor(next_states, dtype=torch.float).to(device)
dones = torch.tensor(dones, dtype=torch.float).to(device)
# update q_values
# gather(1, acitons)意思是dim=1按行號索引, index=actions
# actions=[[1, 2], [0, 1]] 意思是索引出[[第一行第2個元素, 第1行第3個元素],[第2行第1個元素, 第2行第2個元素]]
# 相反,如果是這樣
# gather(0, acitons)意思是dim=0按列號索引, index=actions
# actions=[[1, 2], [0, 1]] 意思是索引出[[第一列第2個元素, 第2列第3個元素],[第1列第1個元素, 第2列第2個元素]]
# states.shape(64, 4) actions.shape(64, 1), 每一行是一個樣本,所以這里用dim=1很合適
predict_q_values = self.q_net(states).gather(1, actions)
with torch.no_grad():
# max(1) 即 max(dim=1)在行向找最大值,這樣的話shape(64, ), 所以再加一個view(-1, 1)擴增至(64, 1)
max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1)
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)
l = self.loss(predict_q_values, q_targets)
self.optimizer.zero_grad()
l.backward()
self.optimizer.step()
if self.count % self.target_update == 0:
# copy model parameters
self.target_q_net.load_state_dict(self.q_net.state_dict())
self.count += 1
4.模型訓(xùn)練函數(shù)
def run_episode(env, agent, repalymemory, batch_size):
state = env.reset()
reward_total = 0
while True:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
# print(reward)
repalymemory.push(state, action, reward, next_state, done)
reward_total += reward
if len(repalymemory) > batch_size:
state_batch, action_batch, reward_batch, next_state_batch, done_batch = repalymemory.sample(batch_size)
T_data = Transition(state_batch, action_batch, reward_batch, next_state_batch, done_batch)
# print(T_data)
agent.update(T_data)
state = next_state
if done:
break
return reward_total
def episode_evaluate(env, agent, render):
reward_list = []
for i in range(5):
state = env.reset()
reward_episode = 0
while True:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
reward_episode += reward
state = next_state
if done:
break
if render:
env.render()
reward_list.append(reward_episode)
return np.mean(reward_list).item()
def test(env, agent, delay_time):
state = env.reset()
reward_episode = 0
while True:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
reward_episode += reward
state = next_state
if done:
break
env.render()
time.sleep(delay_time)
5.訓(xùn)練CartPole-v0環(huán)境模型
模型訓(xùn)練使用到的環(huán)境時gym提供的CartPole游戲(Cart Pole - Gymnasium Documentation (farama.org)),這個環(huán)境比較經(jīng)典,小車運行結(jié)束的要求有三個:
(1)桿子的角度超過度
(2)小車位置大于(小車中心到達顯示屏邊緣)
(3)小車移動步數(shù)超過200(v1是500)
小車每走一步獎勵就會+1,所以在v0版本環(huán)境中,小車一次episode的最大獎勵為200。
if __name__ == "__main__":
# print("prepare for RL")
env = gym.make("CartPole-v0")
env_name = "CartPole-v0"
observation_n, action_n = env.observation_space.shape[0], env.action_space.n
# print(observation_n, action_n)
agent = Agent(observation_n, action_n, gamma=0.98, lr=2e-3, epsilon=0.01, target_update=10)
replaymemory = ReplayMemory(memory_size=10000)
batch_size = 64
num_episodes = 200
reward_list = []
# print("start to train model")
# 顯示10個進度條
for i in range(10):
with tqdm(total=int(num_episodes/10), desc="Iteration %d" % i) as pbar:
for episode in range(int(num_episodes / 10)):
reward_episode = run_episode(env, agent, replaymemory, batch_size)
reward_list.append(reward_episode)
if (episode+1) % 10 == 0:
test_reward = episode_evaluate(env, agent, False)
# print("Episode %d, total reward: %.3f" % (episode, test_reward))
pbar.set_postfix({
'episode': '%d' % (num_episodes / 10 * i + episode + 1),
'return' : '%.3f' % (test_reward)
})
pbar.update(1) # 更新進度條
test(env, agent, 0.5) # 最后用動畫觀看一下效果
episodes_list = list(range(len(reward_list)))
plt.plot(episodes_list, reward_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('Double DQN on {}'.format(env_name))
plt.show()
6.實驗結(jié)果
六、補充說明
想要開啟動畫的話,這句代碼里面的False更改為True。
test_reward = episode_evaluate(env, agent, False)
參考資料:
蘑菇書EasyRL (datawhalechina.github.io)文章來源:http://www.zghlxwxcb.cn/news/detail-419260.html
DQN 算法 (boyuai.com)文章來源地址http://www.zghlxwxcb.cn/news/detail-419260.html
到了這里,關(guān)于使用Pytorch實現(xiàn)強化學(xué)習(xí)——DQN算法的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!