AI智能
改变未来

【百度AI强化学习系列】四、基于策略梯度求解RL(用PG解决Pong)

目录

  • Policy Gradient
  • 公式推导
  • 期望回报
  • 优化目标与策略梯度
  • 项目练习(PG解决Pong)
    • 实战要求
    • 网络结构
    • 老师解决方案Github

    Policy Gradient


    在强化学习中,有两大类方法,一种基于值(Value-based),一种基于策略(Policy-based),Policy Gradient属于后者。PG不需要想DQN那样对Q值进行拟合,它直接拟合出Agent下一步的动作,这就和端到端的神经网络一样,没有中间环节,如下图所示。

    公式推导

    期望回报

    在一个Episode中。Agent的活动用一个序列 τ={s1,a1,s2,a2,…,sT,aT}\\tau=\\left\\{ s_1,a_1,s_2,a_2 ,…,s_T,a_T \\right\\}τ={s1​,a1​,s2​,a2​,…,sT​,aT​}表示,其中sTs_TsT​代表TTT步时的状态,sTs_TsT​代表TTT步时的action。


    πθ=(aT∣sT)\\pi_\\theta=(a_T|s_T)πθ​=(aT​∣sT​)是通过网络预测出Agent所做动作的概率。

    优化目标与策略梯度


    ③式中ρ(s1)\\rho(s_1)ρ(s1​)和ρ(st+1∣st,at0)\\rho(s_{t+1}|s_t,a_t0)ρ(st+1​∣st​,at​0)在对θ\\thetaθ求导时会被约去,所以得到了④式。

    项目练习(PG解决Pong)

    课程与练习都是由百度AI强化学习7日打卡营提供(课程链接)

    实战要求

    分数从-21开始逐渐上涨收敛,Test reward的分数可以收敛到0分以上(说明打败对方了),越高越好。

    网络结构

    在撰写这篇文章的时候我还在疯狂的调试,参考过其他的解决方案,估计大部分的方案都是经过好几轮的训练才收敛到比较好的结果,我想继续尝试,看看用什么办法可以加快收敛速度。到时候再把自己的代码放出来。下面我贴出老师的网络结构:

    class Model(parl.Model):def __init__(self, act_dim):act_dim = act_dimhid1_size = 256hid2_size = 64self.fc1 = layers.fc(size=hid1_size, act=\'relu\')self.fc2 = layers.fc(size=hid2_size, act=\'relu\')self.fc3 = layers.fc(size=act_dim, act=\'softmax\')def forward(self, obs):h1 = self.fc1(obs)h2 = self.fc2(h1)out = self.fc3(h2)return out

    Agent:

    class Agent(parl.Agent):def __init__(self, algorithm, obs_dim, act_dim):self.obs_dim = obs_dimself.act_dim = act_dimsuper(Agent, self).__init__(algorithm)def build_program(self):self.pred_program = fluid.Program()self.learn_program = fluid.Program()with fluid.program_guard(self.pred_program):  # 搭建计算图用于 预测动作,定义输入输出变量obs = layers.data(name=\'obs\', shape=[self.obs_dim], dtype=\'float32\')self.act_prob = self.alg.predict(obs)with fluid.program_guard(self.learn_program):  # 搭建计算图用于 更新policy网络,定义输入输出变量obs = layers.data(name=\'obs\', shape=[self.obs_dim], dtype=\'float32\')act = layers.data(name=\'act\', shape=[1], dtype=\'int64\')reward = layers.data(name=\'reward\', shape=[], dtype=\'float32\')self.cost = self.alg.learn(obs, act, reward)def sample(self, obs):obs = np.expand_dims(obs, axis=0)  # 增加一维维度act_prob = self.fluid_executor.run(self.pred_program,feed={\'obs\': obs.astype(\'float32\')},fetch_list=[self.act_prob])[0]act_prob = np.squeeze(act_prob, axis=0)  # 减少一维维度act = np.random.choice(range(self.act_dim), p=act_prob)  # 根据动作概率选取动作return actdef predict(self, obs):obs = np.expand_dims(obs, axis=0)act_prob = self.fluid_executor.run(self.pred_program,feed={\'obs\': obs.astype(\'float32\')},fetch_list=[self.act_prob])[0]act_prob = np.squeeze(act_prob, axis=0)act = np.argmax(act_prob)  # 根据动作概率选择概率最高的动作return actdef learn(self, obs, act, reward):act = np.expand_dims(act, axis=-1)feed = {\'obs\': obs.astype(\'float32\'),\'act\': act.astype(\'int64\'),\'reward\': reward.astype(\'float32\')}cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0]return cost

    训练设置:

    def run_episode(env, agent):obs_list, action_list, reward_list = [], [], []obs = env.reset()while True:obs = preprocess(obs)  # from shape (210, 160, 3) to (100800,)obs_list.append(obs)action = agent.sample(obs)action_list.append(action)obs, reward, done, info = env.step(action)reward_list.append(reward)if done:breakreturn obs_list, action_list, reward_list# 评估 agent, 跑 5 个episode,总reward求平均def evaluate(env, agent, render=False):eval_reward = []for i in range(5):obs = env.reset()episode_reward = 0while True:obs = preprocess(obs)  # from shape (210, 160, 3) to (100800,)action = agent.predict(obs)obs, reward, isOver, _ = env.step(action)episode_reward += rewardif render:env.render()if isOver:breakeval_reward.append(episode_reward)return np.mean(eval_reward)def preprocess(image):\"\"\" 预处理 210x160x3 uint8 frame into 6400 (80x80) 1维 float vector \"\"\"image = image[35:195]  # 裁剪image = image[::2, ::2, 0]  # 下采样,缩放2倍image[image == 144] = 0  # 擦除背景 (background type 1)image[image == 109] = 0  # 擦除背景 (background type 2)image[image != 0] = 1  # 转为灰度图,除了黑色外其他都是白色return image.astype(np.float).ravel()def calc_reward_to_go(reward_list, gamma=0.99):\"\"\"calculate discounted reward\"\"\"reward_arr = np.array(reward_list)for i in range(len(reward_arr) - 2, -1, -1):# G_t = r_t + γ·r_t+1 + ... = r_t + γ·G_t+1reward_arr[i] += gamma * reward_arr[i + 1]# normalize episode rewardsreward_arr -= np.mean(reward_arr)reward_arr /= np.std(reward_arr)return reward_arrdef main():env = gym.make(\'Pong-v0\')obs_dim = 80 * 80act_dim = env.action_space.nlogger.info(\'obs_dim {}, act_dim {}\'.format(obs_dim, act_dim))# 根据parl框架构建agentmodel = Model(act_dim=act_dim)alg = PolicyGradient(model, lr=LEARNING_RATE)agent = Agent(alg, obs_dim=obs_dim, act_dim=act_dim)# 加载模型# if os.path.exists(\'./model.ckpt\'):#     agent.restore(\'./model.ckpt\')for i in range(1000):obs_list, action_list, reward_list = run_episode(env, agent)if i % 10 == 0:logger.info(\"Train Episode {}, Reward Sum {}.\".format(i, sum(reward_list)))batch_obs = np.array(obs_list)batch_action = np.array(action_list)batch_reward = calc_reward_to_go(reward_list)agent.learn(batch_obs, batch_action, batch_reward)if (i + 1) % 100 == 0:total_reward = evaluate(env, agent, render=False)logger.info(\'Episode {}, Test reward: {}\'.format(i + 1, total_reward))# save the parameters to ./model.ckptagent.save(\'./model.ckpt\')

    老师解决方案Github

    这里面还有其他几个实战项目的方案(很有用),实战项目老师解决方案大家可以进去点个star噢!

    赞(0) 打赏
    未经允许不得转载:爱站程序员基地 » 【百度AI强化学习系列】四、基于策略梯度求解RL(用PG解决Pong)