Ai studio 强化学习心得
作者:junkdood
注:未完成版,以后丰富
前言
在过去7天里,我报名了百度的强化学习7日打卡营,收获良多。现在在这里记录一下自己习得的知识与相关代码
发展历程
- 图灵测试
图灵测试是人工智能是否真正能够成功的一个标准。
即把一个人和一台计算机分别放在两个隔离的房间中,房间外的一个人同时询问人和计算机相同的问题,如果房间外的人无法分别哪个是人,哪个是计算机,就能够说明计算机具有人工智能 - Deep Learning
深度学习,就是通过组合低层特征形成更加抽象的高层特征。在计算机视觉领域,深度学习算法从原始图像去学习得到一个低层次表达,例如边缘检测器等,然后在这些低层次表达的基础上,通过一些组合,来获得一个高层次的表达。
DQN实例
import parlfrom parl import layersimport paddle.fluid as fluidimport copyimport numpy as npimport osimport gymfrom parl.utils import loggerclass Model(parl.Model):def __init__(self, act_dim):hid1_size = 128hid2_size = 128# 3层全连接网络self.fc1 = layers.fc(size=hid1_size, act=\'relu\')self.fc2 = layers.fc(size=hid2_size, act=\'relu\')self.fc3 = layers.fc(size=act_dim, act=None)def value(self, obs):# 定义网络# 输入state,输出所有action对应的Q,[Q(s,a1), Q(s,a2), Q(s,a3)...]h1 = self.fc1(obs)h2 = self.fc2(h1)Q = self.fc3(h2)return Qclass DQN(parl.Algorithm):def __init__(self, model, act_dim=None, gamma=None, lr=None):\"\"\" DQN algorithmArgs:model (parl.Model): 定义Q函数的前向网络结构act_dim (int): action空间的维度,即有几个actiongamma (float): reward的衰减因子lr (float): learning rate 学习率.\"\"\"self.model = modelself.target_model = copy.deepcopy(model)assert isinstance(act_dim, int)assert isinstance(gamma, float)assert isinstance(lr, float)self.act_dim = act_dimself.gamma = gammaself.lr = lrdef predict(self, obs):\"\"\" 使用self.model的value网络来获取 [Q(s,a1),Q(s,a2),...]\"\"\"return self.model.value(obs)def learn(self, obs, action, reward, next_obs, terminal):\"\"\" 使用DQN算法更新self.model的value网络\"\"\"# 从target_model中获取 max Q\' 的值,用于计算target_Qnext_pred_value = self.target_model.value(next_obs)best_v = layers.reduce_max(next_pred_value, dim=1)best_v.stop_gradient = True # 阻止梯度传递terminal = layers.cast(terminal, dtype=\'float32\')target = reward + (1.0 - terminal) * self.gamma * best_vpred_value = self.model.value(obs) # 获取Q预测值# 将action转onehot向量,比如:3 => [0,0,0,1,0]action_onehot = layers.one_hot(action, self.act_dim)action_onehot = layers.cast(action_onehot, dtype=\'float32\')# 下面一行是逐元素相乘,拿到action对应的 Q(s,a)# 比如:pred_value = [[2.3, 5.7, 1.2, 3.9, 1.4]], action_onehot = [[0,0,0,1,0]]# ==> pred_action_value = [[3.9]]pred_action_value = layers.reduce_sum(layers.elementwise_mul(action_onehot, pred_value), dim=1)# 计算 Q(s,a) 与 target_Q的均方差,得到losscost = layers.square_error_cost(pred_action_value, target)cost = layers.reduce_mean(cost)optimizer = fluid.optimizer.Adam(learning_rate=self.lr) # 使用Adam优化器optimizer.minimize(cost)return costdef sync_target(self):\"\"\" 把 self.model 的模型参数值同步到 self.target_model\"\"\"self.model.sync_weights_to(self.target_model)class Agent(parl.Agent):def __init__(self,algorithm,obs_dim,act_dim,e_greed=0.1,e_greed_decrement=0):assert isinstance(obs_dim, int)assert isinstance(act_dim, int)self.obs_dim = obs_dimself.act_dim = act_dimsuper(Agent, self).__init__(algorithm)self.global_step = 0self.update_target_steps = 200 # 每隔200个training steps再把model的参数复制到target_model中self.e_greed = e_greed # 有一定概率随机选取动作,探索self.e_greed_decrement = e_greed_decrement # 随着训练逐步收敛,探索的程度慢慢降低def build_program(self):self.pred_program = fluid.Program()self.learn_program = fluid.Program()with fluid.program_guard(self.pred_program): # 搭建计算图用于 预测动作,定义输入输出变量obs = layers.data(name=\'obs\', shape=[self.obs_dim], dtype=\'float32\')self.value = self.alg.predict(obs)with fluid.program_guard(self.learn_program): # 搭建计算图用于 更新Q网络,定义输入输出变量obs = layers.data(name=\'obs\', shape=[self.obs_dim], dtype=\'float32\')action = layers.data(name=\'act\', shape=[1], dtype=\'int32\')reward = layers.data(name=\'reward\', shape=[], dtype=\'float32\')next_obs = layers.data(name=\'next_obs\', shape=[self.obs_dim], dtype=\'float32\')terminal = layers.data(name=\'terminal\', shape=[], dtype=\'bool\')self.cost = self.alg.learn(obs, action, reward, next_obs, terminal)def sample(self, obs):sample = np.random.rand() # 产生0~1之间的小数if sample < self.e_greed:act = np.random.randint(self.act_dim) # 探索:每个动作都有概率被选择else:act = self.predict(obs) # 选择最优动作self.e_greed = max(0.01, self.e_greed - self.e_greed_decrement) # 随着训练逐步收敛,探索的程度慢慢降低return actdef predict(self, obs): # 选择最优动作obs = np.expand_dims(obs, axis=0)pred_Q = self.fluid_executor.run(self.pred_program,feed={\'obs\': obs.astype(\'float32\')},fetch_list=[self.value])[0]pred_Q = np.squeeze(pred_Q, axis=0)act = np.argmax(pred_Q) # 选择Q最大的下标,即对应的动作return actdef learn(self, obs, act, reward, next_obs, terminal):# 每隔200个training steps同步一次model和target_model的参数if self.global_step % self.update_target_steps == 0:self.alg.sync_target()self.global_step += 1act = np.expand_dims(act, -1)feed = {\'obs\': obs.astype(\'float32\'),\'act\': act.astype(\'int32\'),\'reward\': reward,\'next_obs\': next_obs.astype(\'float32\'),\'terminal\': terminal}cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0] # 训练一次网络return costimport randomimport collectionsimport numpy as npclass ReplayMemory(object):def __init__(self, max_size):self.buffer = collections.deque(maxlen=max_size)# 增加一条经验到经验池中def append(self, exp):self.buffer.append(exp)# 从经验池中选取N条经验出来def sample(self, batch_size):mini_batch = random.sample(self.buffer, batch_size)obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []for experience in mini_batch:s, a, r, s_p, done = experienceobs_batch.append(s)action_batch.append(a)reward_batch.append(r)next_obs_batch.append(s_p)done_batch.append(done)return np.array(obs_batch).astype(\'float32\'), \\np.array(action_batch).astype(\'float32\'), np.array(reward_batch).astype(\'float32\'),\\np.array(next_obs_batch).astype(\'float32\'), np.array(done_batch).astype(\'float32\')def __len__(self):return len(self.buffer)def run_episode(env, agent, rpm):total_reward = 0obs = env.reset()step = 0while True:step += 1action = agent.sample(obs) # 采样动作,所有动作都有概率被尝试到next_obs, reward, done, _ = env.step(action)rpm.append((obs, action, reward, next_obs, done))# train modelif (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0):(batch_obs, batch_action, batch_reward, batch_next_obs,batch_done) = rpm.sample(BATCH_SIZE)train_loss = agent.learn(batch_obs, batch_action, batch_reward,batch_next_obs,batch_done) # s,a,r,s\',donetotal_reward += rewardobs = next_obsif done:breakreturn total_reward# 评估 agent, 跑 5 个episode,总reward求平均def evaluate(env, agent, render=False):eval_reward = []for i in range(5):obs = env.reset()episode_reward = 0while True:action = agent.predict(obs) # 预测动作,只选最优动作obs, reward, done, _ = env.step(action)episode_reward += rewardif render:env.render()if done:breakeval_reward.append(episode_reward)return np.mean(eval_reward)env = gym.make(\'CartPole-v0\') # CartPole-v0: 预期最后一次评估总分 > 180(最大值是200)action_dim = env.action_space.n # CartPole-v0: 2obs_shape = env.observation_space.shape # CartPole-v0: (4,)rpm = ReplayMemory(MEMORY_SIZE) # DQN的经验回放池# 根据parl框架构建agentmodel = Model(act_dim=action_dim)algorithm = DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE)agent = Agent(algorithm,obs_dim=obs_shape[0],act_dim=action_dim,e_greed=0.1, # 有一定概率随机选取动作,探索e_greed_decrement=1e-6) # 随着训练逐步收敛,探索的程度慢慢降低# 加载模型# save_path = \'./dqn_model.ckpt\'# agent.restore(save_path)# 先往经验池里存一些数据,避免最开始训练的时候样本丰富度不够while len(rpm) < MEMORY_WARMUP_SIZE:run_episode(env, agent, rpm)max_episode = 2000# 开始训练episode = 0while episode < max_episode: # 训练max_episode个回合,test部分不计算入episode数量# train partfor i in range(0, 50):total_reward = run_episode(env, agent, rpm)episode += 1# test parteval_reward = evaluate(env, agent, render=False) # render=True 查看显示效果logger.info(\'episode:{} e_greed:{} test_reward:{}\'.format(episode, agent.e_greed, eval_reward))# 训练结束,保存模型save_path = \'./dqn_model.ckpt\'agent.save(save_path)