CartPole 强化学习详解2 - Policy Gradient

发布时间:2024-10-04 17:01

也叫Reinforce算法,原始实现:examples/reinforce.py at main · pytorch/examples · GitHub

参考代码:https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow/blob/master/contents/7_Policy_gradient_softmax/RL_brain.py

1. 基本原理

深度参考了下列几篇文章:

强化学习 # Policy gradient_真·skysys的博客-CSDN博客

pytorch笔记:policy gradient_UQI-LIUWJ的博客-CSDN博客_pytorch 策略梯度    代码解读

策略梯度PG( Policy Gradient) 的pytorch代码实现示例 cart-pole游戏_李莹斌XJTU的博客-CSDN博客_pytorch策略梯度

问题1:为何要回合制的训练,而不是像DQN那样单步训练?

最基本PG算法叫REINFORCE,是基于序列里累积收益期望最大的假设推导的,因此需要的是轨迹输入(对应于PG算法里所说的Monte Carlo方法)

问题2:loss的公式怎么来的?

强化学习(九)--Policy Gradient推导过程 - 知乎   这里面的推导很详细。这里有点需要注意,logX等同于lnX,因为没有写底数。

问题3:更新方法,已经在程序里如何获取这些数?

CartPole 强化学习详解2 - Policy Gradient_第1张图片

注意这里的G累积的reward是要从后往前去反推回去的,因为要把回合制的收益反推到之前的过程上,每个t上的收益都是直接收益 + 未来收益。

2. 代码

参考了上述几篇文章里代码,直接给代码如下。这里测试的过程中动作的选择直接选取概率最大的那个行为了。gymTest/CartPolePolicyGradient.py at main · BITcsy/gymTest · GitHub

from matplotlib import animation
import matplotlib.pyplot as plt
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

class Net(nn.Module):
    def __init__(self, n_states, n_actions):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_states, 10)
        self.fc2 = nn.Linear(10, n_actions)
        self.fc1.weight.data.normal_(0, 0.1)
        self.fc2.weight.data.normal_(0, 0.1)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        action_score = self.fc2(x)
        return F.softmax(action_score, dim=1)   # why softmax?

class PolicyGradient(nn.Module):
    def __init__(self, n_states, n_actions):
        super(PolicyGradient, self).__init__()
        self.net = Net(n_states, n_actions)
        self.optimizer = optim.Adam(self.net.parameters(), lr=1e-2)
        self.saved_log_prob_list = []   # 记录每个时刻的log p(a|s)
        self.reward_list = []           # 一个数组,记录每个时刻做完动作后的reward
        self.gamma = 0.99

    def choose_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.net(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_prob_list.append(m.log_prob(action))  # what if continuous output?
        return action.item()

    def choose_best_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.net(state)
        max_action = torch.max(probs, 1)[1].data.numpy()[0]
        return max_action

    def store_transition(self, reward):
        self.reward_list.append(reward)

    def learn(self):
        R = 0
        policy_loss = []
        returns = []
        for r in self.reward_list[::-1]:
            R = r + self.gamma * R
            returns.insert(0, R)
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)
        for log_prob, R in zip(self.saved_log_prob_list, returns):
            policy_loss.append(-log_prob * R)
        self.optimizer.zero_grad()
        policy_loss = torch.cat(policy_loss).sum()
        policy_loss.backward()
        self.optimizer.step()
        self.saved_log_prob_list.clear()
        self.reward_list.clear()

def save_gif(frames):
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=5)
    anim.save('./CartPortPolicyGradient.gif', writer='imagemagick', fps=30)

if __name__ == "__main__":
    env = gym.make('CartPole-v1')
    # train
    done_step = 0
    learn_episode_num = 200000
    data_collect_num = 1000
    negative_reward = -10.0
    positive_reward = 10.0
    x_bound = 1.0
    state = env.reset()
    model = PolicyGradient(
        n_states=4,
        n_actions=2
    )  # 算法模型
    model.saved_log_prob_list.clear()
    model.reward_list.clear()
    running_reward = 10
    reward_threshold = 4000
    log_interval = 50
    train_mode = True

    if (train_mode):
        eps = np.finfo(np.float64).eps.item()
        for i in range(learn_episode_num):
            sum_reward = 0
            for j in range(data_collect_num):
                # env.render()
                action = model.choose_action(state)
                state, reward, done, _ = env.step(action)
                x, x_dot, theta, theta_dot = state
                if (abs(x) > x_bound):
                    r1 = 0.5 * negative_reward
                else:
                    r1 = negative_reward * abs(x) / x_bound + 0.5 * (-negative_reward)
                if (abs(theta) > env.theta_threshold_radians):
                    r2 = 0.5 * negative_reward
                else:
                    r2 = negative_reward * abs(theta) / env.theta_threshold_radians + 0.5 * (-negative_reward)
                reward = r1 + r2
                if (done) and (done_step < 499):
                    reward += negative_reward
                # print("reward: x = %lf, r1 = %lf, theta = %lf, r2 = %lf" % (x, r1, theta, r2))
                model.store_transition(reward)
                sum_reward += reward
                done_step += 1
                if (done):
                    # print("reset env! done_step = %d, epsilon = %lf" % (done_step, epsilon))
                    state = env.reset()
                    done_step = 0
                    break
            running_reward = 0.05 * sum_reward + (1 - 0.05) * running_reward
            model.learn()
            if i % log_interval == 0:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i, sum_reward, running_reward))
            if running_reward > reward_threshold:
                print("Solved! Running reward is now {} and learn_episode is {}".format(running_reward, i))
                torch.save(model, 'PolicyGradient.ptl')
                break
    else: # test mode
        frames = []
        state = env.reset()
        model_test = torch.load('PolicyGradient.ptl')
        for _ in range(400):
            frames.append(env.render(mode='rgb_array'))
            action = model_test.choose_best_action(state)
            state, reward, done, info = env.step(action)
            if (done):
                state = env.reset()
                print("test try again")
                break
        save_gif(frames)
    env.close()

训练的效果如下:

CartPole 强化学习详解2 - Policy Gradient_第2张图片

3. 训练心得

这次训练过程中没遇到什么困难,自己写的几个bug解了之后,就可以正常work了。回合制训练真的费事,没法去批量化训练。但是训练出最后效果还挺快的,1000次以内就训出来了。

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号