发布时间:2022-09-11 08:00
也叫Reinforce算法,原始实现:examples/reinforce.py at main · pytorch/examples · GitHub
参考代码:https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow/blob/master/contents/7_Policy_gradient_softmax/RL_brain.py
深度参考了下列几篇文章:
强化学习 # Policy gradient_真·skysys的博客-CSDN博客
pytorch笔记:policy gradient_UQI-LIUWJ的博客-CSDN博客_pytorch 策略梯度 代码解读
策略梯度PG( Policy Gradient) 的pytorch代码实现示例 cart-pole游戏_李莹斌XJTU的博客-CSDN博客_pytorch策略梯度
问题1:为何要回合制的训练,而不是像DQN那样单步训练?
最基本PG算法叫REINFORCE,是基于序列里累积收益期望最大的假设推导的,因此需要的是轨迹输入(对应于PG算法里所说的Monte Carlo方法)
问题2:loss的公式怎么来的?
强化学习(九)--Policy Gradient推导过程 - 知乎 这里面的推导很详细。这里有点需要注意,logX等同于lnX,因为没有写底数。
问题3:更新方法,已经在程序里如何获取这些数?
注意这里的G累积的reward是要从后往前去反推回去的,因为要把回合制的收益反推到之前的过程上,每个t上的收益都是直接收益 + 未来收益。
参考了上述几篇文章里代码,直接给代码如下。这里测试的过程中动作的选择直接选取概率最大的那个行为了。gymTest/CartPolePolicyGradient.py at main · BITcsy/gymTest · GitHub
from matplotlib import animation
import matplotlib.pyplot as plt
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
class Net(nn.Module):
def __init__(self, n_states, n_actions):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_states, 10)
self.fc2 = nn.Linear(10, n_actions)
self.fc1.weight.data.normal_(0, 0.1)
self.fc2.weight.data.normal_(0, 0.1)
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
action_score = self.fc2(x)
return F.softmax(action_score, dim=1) # why softmax?
class PolicyGradient(nn.Module):
def __init__(self, n_states, n_actions):
super(PolicyGradient, self).__init__()
self.net = Net(n_states, n_actions)
self.optimizer = optim.Adam(self.net.parameters(), lr=1e-2)
self.saved_log_prob_list = [] # 记录每个时刻的log p(a|s)
self.reward_list = [] # 一个数组,记录每个时刻做完动作后的reward
self.gamma = 0.99
def choose_action(self, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.net(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_prob_list.append(m.log_prob(action)) # what if continuous output?
return action.item()
def choose_best_action(self, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.net(state)
max_action = torch.max(probs, 1)[1].data.numpy()[0]
return max_action
def store_transition(self, reward):
self.reward_list.append(reward)
def learn(self):
R = 0
policy_loss = []
returns = []
for r in self.reward_list[::-1]:
R = r + self.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + eps)
for log_prob, R in zip(self.saved_log_prob_list, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
self.saved_log_prob_list.clear()
self.reward_list.clear()
def save_gif(frames):
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=5)
anim.save('./CartPortPolicyGradient.gif', writer='imagemagick', fps=30)
if __name__ == "__main__":
env = gym.make('CartPole-v1')
# train
done_step = 0
learn_episode_num = 200000
data_collect_num = 1000
negative_reward = -10.0
positive_reward = 10.0
x_bound = 1.0
state = env.reset()
model = PolicyGradient(
n_states=4,
n_actions=2
) # 算法模型
model.saved_log_prob_list.clear()
model.reward_list.clear()
running_reward = 10
reward_threshold = 4000
log_interval = 50
train_mode = True
if (train_mode):
eps = np.finfo(np.float64).eps.item()
for i in range(learn_episode_num):
sum_reward = 0
for j in range(data_collect_num):
# env.render()
action = model.choose_action(state)
state, reward, done, _ = env.step(action)
x, x_dot, theta, theta_dot = state
if (abs(x) > x_bound):
r1 = 0.5 * negative_reward
else:
r1 = negative_reward * abs(x) / x_bound + 0.5 * (-negative_reward)
if (abs(theta) > env.theta_threshold_radians):
r2 = 0.5 * negative_reward
else:
r2 = negative_reward * abs(theta) / env.theta_threshold_radians + 0.5 * (-negative_reward)
reward = r1 + r2
if (done) and (done_step < 499):
reward += negative_reward
# print("reward: x = %lf, r1 = %lf, theta = %lf, r2 = %lf" % (x, r1, theta, r2))
model.store_transition(reward)
sum_reward += reward
done_step += 1
if (done):
# print("reset env! done_step = %d, epsilon = %lf" % (done_step, epsilon))
state = env.reset()
done_step = 0
break
running_reward = 0.05 * sum_reward + (1 - 0.05) * running_reward
model.learn()
if i % log_interval == 0:
print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
i, sum_reward, running_reward))
if running_reward > reward_threshold:
print("Solved! Running reward is now {} and learn_episode is {}".format(running_reward, i))
torch.save(model, 'PolicyGradient.ptl')
break
else: # test mode
frames = []
state = env.reset()
model_test = torch.load('PolicyGradient.ptl')
for _ in range(400):
frames.append(env.render(mode='rgb_array'))
action = model_test.choose_best_action(state)
state, reward, done, info = env.step(action)
if (done):
state = env.reset()
print("test try again")
break
save_gif(frames)
env.close()
训练的效果如下:
这次训练过程中没遇到什么困难,自己写的几个bug解了之后,就可以正常work了。回合制训练真的费事,没法去批量化训练。但是训练出最后效果还挺快的,1000次以内就训出来了。