强化学习笔记(五)价值函数的近似(Value Function Approximation)及Pytorch实现简单DQN
- Q1:值函数近似的形式和意义?
- Q2:梯度下降法公式中的真值 和 是如何处理的?
- Q3:如何理解DQN中的经验回放(Experience Relay)机制?
- Q4:Pytorch实现一个简单的Q-Network
表格型的近似求解方法只适用于小规模的问题。对于复杂庞大状态-动作空间的问题,我们不可能有足够的内存去存放和维护这样一个Q_Table. 因此强化学习和深度学习的结合是发展历程中必经的道路,而在Atari游戏,围棋比赛等良好的表现让DNN+RL的模式充满了潜力。
UCL本节课的分标题是Incremental Methods和Batch Methods,区别也是一目了然。前者是阶段性(step by step)地优化函数,比如蒙卡走完一个episode就更新所有,TD直接每个状态就可以更新一下。而Batch的方法跟深度学习里的批处理是一样的,取用的是价值误差的最小二乘作为损失函数,批量地去更新参数。实际问题通常还是取后者,因为前者更新完一个数据就丢弃了,后者则是对总体做了一个总结,而且计算效率更高。
Q1:值函数近似的形式和意义?
值函数的表示主要有以上三种,最常用的是最后一种,原因有二:1. 输入简单;2. 输出的是输入状态对应的动作价值函数,可以直接决策。
本质上这就是一个黑箱模型,整个流程就是对价值函数进行再编码的过程。只有w参数的个数远远小于q的个数,这个数据压缩才是有意义的。
上课时还提到了,Q表的方法跟值函数近似二者并不是分离的。Q表可以看成是一种特殊的Value Approximation. 针对第一个模型,我们如果把状态向量S用One-Hot向量表示,权重的维度即为状态维度n,而权重的值也就是各个状态价值。后两种模型其实是同理,相当于一个直接映射,没有做任何处理。
Q2:梯度下降法公式中的真值 和 是如何处理的?
课上首先以线性函数近似为例,推导了梯度下降的增量公式。我们的目的就是要让黑箱模型尽可能准确地模拟Value Function。因此在优化函数
中使用了二范数去逼近真值
。在监督学习里,我们有原始数据和标签,做分类我们有类别标签,做分割我们有分割的Ground Truth;但是强化学习里我们没有真值,只有Reward. 因此问题来了,我们根本不知道准确值是多少,怎么去逼近它?
答案还是很简单,从经验中选择正确的结果。对于蒙特卡洛学习,使用收获作为返回值;对于TD,我们有TD(0)公式和TD(
)公式。把这种返回值作为Supervisor信号。
比如对一个蒙卡序列
,我们可以把这些数据作为训练集,尽量让每个Gain都适用这个黑箱模型。
对于TD模型来说,这是个有偏模型。因为我们要用贝尔曼方程,而下一个时刻的状态价值
我们得通过黑箱模型去模拟,跟蒙卡相比并不是走一个Episode之后的真实收获。
最后重申一点:一般使用动作价值函数比状态价值函数更好,因为它可以直接反映决策。
Q3:如何理解DQN中的经验回放(Experience Relay)机制?
经验回放机制对应子标题的Batch methods. 在普通的q-learning中,我们更新迭代的是所有的价值函数,而且是不连续地,把每个状态下的价值函数往TD Target处偏移。但是加入Deep Q Network,我们一次更新的参数变成了维数很高的权重
. 因为要更新的项数很多,所以出现了梯度法。
经验回放即将每次和环境交互得到的奖励与状态更新情况都保存起来(即一个transition,{
}). 用于后面目标Q值的更新。在每次更新的时候,我们随机挑选经验回放中的m个序列计算当前目标Q值(监督信号),从而更新权重
.
Q4:Pytorch实现一个简单的Q-Network
我参考了刘老师的代码,地址为:https://github.com/ljpzzz/machinelearning/blob/master/reinforcement-learning/dqn.py
刘老师使用的环境是python3.6 + tensorflow 1.8.0 .按道理说只要tensorflow是1x版本,同时装了gym的游戏库基本都可以顺利地运行。
我在这篇代码的基础上做了些改动,把网络用pytorch重新搭建了,输入的状态向量、动作向量等转化成了torch_tensor;添加了GPU训练方式。
这个游戏名叫CartPole-v0. 在控制领域其实就是一个单极倒立摆控制问题。其实很多经典控制理论已经将其解决了,像PID控制。控制方法解决的思路比较严谨,首先获取摆杆的质量、重心、导轨的摩擦力等等参数,对系统物理建模,然后再分析它的平衡状态,用控制理论去求解。
而强化学习显得简单粗暴,我直接去疯狂地尝试,最后总结经验。二者其实就是分别从正面和反面去求解问题。控制方法不会允许失败,它通过不断去分析问题的本源,以解释性和理论的角度直到问题破解。强化学习方法更像是一种复盘,从失败中学习,总结经验。 二者各有好坏吧。
这里有些标题党了,说是DQN,实际上网络就是个两层的感知机,一个输入层,一个隐藏层和一个输出层。隐藏层的节点有20个。
运行结果如下,首先训练的时候会调出一个游戏交互框,这样可以实时地看到快速的游戏过程。在运行窗口也会显示每100个episode游戏后获得的平均奖励,可以看到前400个epoch奖励是一直上升的,也就是游戏水平在不断进步。
因为训练采样是随机的,所以结果好坏要看运气。有时候很快收敛了,有时候又很慢。另外使用CUDA加速后,如果同时添加了env.render()渲染画面可能出现卡死的情况。我猜测是游戏时间很快,有时候一把很快就死了,训练过程快于画面展示,所以要等待画面渲染的时间。建议训练的时候关掉render. 我这里使用env.unwrapped()解除了游戏最高为200分的限制,可以看到第400轮跑到了223.5分。
放一下代码,我使用的Conda环境是Python3.7 + Pytorch1.4.0 + Gym 0.17.1
""" @ Author: Peter Xiao @ Date: 2020.7.16 @ Filename: dqn.py @ Brief: 使用 DQN训练CartPole-v0 """import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
import time
from collections import deque# Hyper Parameters for DQN
GAMMA = 0.9 # discount factor for target Q
INITIAL_EPSILON = 0.5 # starting value of epsilon
FINAL_EPSILON = 0.01 # final value of epsilon
REPLAY_SIZE = 10000 # experience replay buffer size
EXPLORE = 10000
BATCH_SIZE = 32 # size of minibatch
LR = 0.0001 # learning rate# Use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.enabled = False # 非确定性算法class QNetwork(nn.Module):def __init__(self, state_dim, action_dim):super(QNetwork, self).__init__()self.fc1 = nn.Linear(state_dim, 20)self.fc2 = nn.Linear(20, action_dim)def forward(self, x):out = F.relu(self.fc1(x))out = self.fc2(out)return outclass DQN(object):# dqn Agentdef __init__(self, env): # 初始化# 状态空间和动作空间的维度self.state_dim = env.observation_space.shape[0]self.action_dim = env.action_space.n# init experience replayself.replay_buffer = deque() # 经验回放池# init network parametersself.network = QNetwork(state_dim=self.state_dim, action_dim=self.action_dim).to(device)self.optimizer = torch.optim.Adam(self.network.parameters(), lr=LR)self.loss_func = nn.MSELoss()# init some parametersself.time_step = 0self.epsilon = INITIAL_EPSILON # epsilon值是随机不断变小的def perceive(self, state, action, reward, next_state, done):one_hot_action = np.zeros(self.action_dim) # 用独热向量保存动作one_hot_action[action] = 1 # 选中的动作为1,其余为0# 将该Transition保存到经验回放池self.replay_buffer.append((state, one_hot_action, reward, next_state, done))if len(self.replay_buffer) > REPLAY_SIZE: # 如果经验回放池溢出,扔掉左边的数据self.replay_buffer.popleft()if len(self.replay_buffer) > BATCH_SIZE: # 只有经验回放池大于mini_batch数了才能采样训练self.train_Q_network()def train_Q_network(self):self.time_step += 1# Step 1: obtain random minibatch from replay memoryminibatch = random.sample(self.replay_buffer, BATCH_SIZE) # 32的liststate_batch = torch.FloatTensor([data[0] for data in minibatch]).to(device) # 32*4action_batch = torch.LongTensor([data[1] for data in minibatch]).to(device) # 32*2reward_batch = torch.FloatTensor([data[2] for data in minibatch]).to(device) # 32*1next_state_batch = torch.FloatTensor([data[3] for data in minibatch]).to(device) # 32*4done = torch.FloatTensor([data[4] for data in minibatch]).to(device)done = done.unsqueeze(1)reward_batch = reward_batch.unsqueeze(1)# q_val = self.network.forward(state_batch) # 32*2action_index = action_batch.argmax(dim=1).unsqueeze(1) # 32*1eval_q = self.network.forward(state_batch).gather(1, action_index) # 32*1# Step 2: calculate yQ_value_batch = self.network.forward(next_state_batch)next_action_batch = torch.unsqueeze(torch.max(Q_value_batch, 1)[1], 1)next_q = self.network.forward(next_state_batch).gather(1, next_action_batch)y_batch = reward_batch + GAMMA * next_q * (1 - done)# y_batch = torch.tensor(y_batch).unsqueeze(1)# 更新网络loss = self.loss_func(eval_q, y_batch)self.optimizer.zero_grad()loss.backward()self.optimizer.step()def egreedy_action(self, state): # epsilon-greedy策略state = torch.unsqueeze(torch.FloatTensor(state).to(device), 0) # 给state加一个batch_size的维度,此时batch_size为1Q_value = self.network.forward(state.to(device))# Q_value = self.Q_value.eval(feed_dict={# self.state_input: [state]# })[0]if random.random() <= self.epsilon:self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000return random.randint(0, self.action_dim - 1)else:self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000return torch.max(Q_value, 1)[1].data.to('cpu').numpy()[0]def action(self, state): # 贪婪选择state = torch.unsqueeze(torch.FloatTensor(state).to(device), 0) # 给state加一个batch_size的维度,此时batch_size为1Q_value = self.network.forward(state.to(device))return torch.max(Q_value, 1)[1].data.to('cpu').numpy()[0]# ---------------------------------------------------------
# Hyper Parameters
ENV_NAME = 'CartPole-v0'
EPISODE = 3000 # Episode limitation
STEP = 300 # Step limitation in an episode
TEST = 10 # The number of experiment test every 100 episodedef main():# initialize OpenAI Gym env and dqn agentenv = gym.make(ENV_NAME)env = env.unwrapped # 打开限制操作agent = DQN(env)for episode in range(EPISODE):# initialize taskstate = env.reset()# Trainfor step in range(STEP):action = agent.egreedy_action(state) # e-greedy action for trainnext_state, reward, done, _ = env.step(action)# Define reward for agentreward = -1 if done else 0.1agent.perceive(state, action, reward, next_state, done)state = next_stateif done:break# Test every 100 episodesif episode % 100 == 0:total_reward = 0for i in range(TEST):state = env.reset()for j in range(STEP):env.render()action = agent.action(state) # direct action for teststate, reward, done, _ = env.step(action)total_reward += rewardif done:breakave_reward = total_reward/TESTprint ('episode: ', episode, 'Evaluation Average Reward:', ave_reward)if __name__ == '__main__':time_start = time.time()main()time_end = time.time()print('The total time is ', time_end - time_start)