精华内容
下载资源
问答
  • 【强化学习】揭开PPO2算法的神秘面纱(二)

    千次阅读 多人点赞 2020-02-02 15:16:32
    PPO受到与TRPO相同的问题的启发:我们如何才能使用当前拥有的数据在策略上采取最大可能的改进步骤,而又不会走得太远而导致意外导致性能下降?在TRPO试图通过复杂的二阶方法解决此问题的地方...其次PPO算法也是AC架构。

    一、PPO主体

    1、主结构

    PPO主体主要分为两个部分,初始化部分init用来设定网络的一些超参数,以及构建网络,第二部分train则用于更新网络参数(实际代码中,该PPO主体继承自另外一个主要用于设定超参数的类)。

    class PPO():
    	def __init__(...):
    		pass
    		
    	def train(self, states, actions, advantages, logp_olds, returns):
            pass
    

    2、初始化部分

    根据动作类型选取合适的网络模型,关于不同网络模型的代码实现参考上一篇文章

        def __init__(
                self,
                state_shape,
                action_dim,
                is_discrete,
                max_action=1.,
                actor_units=[256, 256],
                critic_units=[256, 256],
                lr_actor=1e-3,
                lr_critic=3e-3,
                const_std=0.3,
                hidden_activation_actor="relu",
                hidden_activation_critic="relu",
                clip_ratio=0.2,
                name="PPO",
                **kwargs):
            super().__init__(name=name, **kwargs)
            self.clip_ratio = clip_ratio
            self._is_discrete = is_discrete
            
    		# 创建网络模型
            if is_discrete:
                self.actor = CategoricalActor(
                    state_shape, action_dim, actor_units)
            else:
                self.actor = GaussianActor(
                    state_shape, action_dim, max_action, actor_units,
                    hidden_activation=hidden_activation_actor,
                    const_std=const_std)
    
            self.critic = CriticV(state_shape, critic_units,
                                  hidden_activation=hidden_activation_critic)
    		# 创建优化器
            self.actor_optimizer = tf.keras.optimizers.Adam(
                learning_rate=lr_actor)
            self.critic_optimizer = tf.keras.optimizers.Adam(
                learning_rate=lr_critic)
    
            # This is used to check if input state to `get_action` is multiple (batch) or single
            self._state_ndim = np.array(state_shape).shape[0]
    

    3、训练部分

    a、训练actor

    因为这里actor跟critic分开两个网络进行,不共享网络参数,因此将value_loss独立开单独进行梯度计算,因此损失函数用以下公式表示( c 2 c_2 c2取0.01):

    L t C L I P + S = L t C L I P ( θ ) + c 2 S [ π θ ] ( s t ) L_t^{CLIP+S} = L_t^{CLIP}(\theta) + c_2S[\pi_{\theta}](s_t) LtCLIP+S=LtCLIP(θ)+c2S[πθ](st)

    代码如下:

        @tf.function
        def _train_actor_body(self, states, actions, advantages, logp_olds):
            with tf.device(self.device):
                # Update actor
                with tf.GradientTape() as tape:
                	# 计算熵
                    ent = tf.reduce_mean(
                        self.actor.compute_entropy(states))
                    if self.clip:
                    	# 计算新策略的概率
                        logp_news = self.actor.compute_log_probs(
                            states, actions)
                        # 计算概率比例
                        ratio = tf.math.exp(logp_news - tf.squeeze(logp_olds))
                        # 对比例进行裁剪
                        min_adv = tf.clip_by_value(
                            ratio,
                            1.0 - self.clip_ratio,
                            1.0 + self.clip_ratio) * tf.squeeze(advantages)
                        # loss = (l_clip + entropy)
                        actor_loss = -tf.reduce_mean(tf.minimum(
                            ratio * tf.squeeze(advantages),
                            min_adv))
                        actor_loss -= self.entropy_coef * ent
                    else:
                        raise NotImplementedError
                actor_grad = tape.gradient(
                    actor_loss, self.actor.trainable_variables)
                self.actor_optimizer.apply_gradients(
                    zip(actor_grad, self.actor.trainable_variables))
    
            return actor_loss, logp_news, ratio, ent
    

    熵值的计算:

        def compute_entropy(self, state):
        	param = self._compute_dist(states)
            log_stds = param["log_std"]
            return tf.reduce_sum(log_stds + tf.math.log(tf.math.sqrt(2 * np.pi * np.e)), axis=-1)
    
    

    b、训练critic

    损失函数如下(这里 c 1 c_1 c1取0.5):
    l o s s t d = 1 T ∑ c 1 ∗ ( V θ ( s t ) − V t t a r g e t ) 2 loss_{td} = \frac{1}{T} \sum c_1* (V_{\theta}(s_t)-V_t^{target})^2 losstd=T1c1(Vθ(st)Vttarget)2
    其中T指的是该序列 τ \tau τ的长度,代码如下:

        @tf.function
        def _train_critic_body(self, states, returns):
            with tf.device(self.device):
                # Train baseline
                with tf.GradientTape() as tape:
                    current_V = self.critic(states)
                    td_errors = tf.squeeze(returns) - current_V
                    critic_loss = tf.reduce_mean(0.5 * tf.square(td_errors))
                critic_grad = tape.gradient(
                    critic_loss, self.critic.trainable_variables)
                self.critic_optimizer.apply_gradients(
                    zip(critic_grad, self.critic.trainable_variables))
    
            return critic_loss
    
    

    二、环境交互

    1、 交互部分主结构

    class OnPolicyTrainer(object):
    	def __init__(self,...):
    		'''
    		初始化训练参数,导入环境,policy等
    		'''
    		pass
    
    	 def __call__(self):
    	 	'''
    		主循环,采集数据,更新网络
    		'''
    	 	pass
    
    	def finish_horizon(self, last_val=0):
    		'''
    		每一个序列T采集完的时候调用
    		用于计算adv,存储buffer
    		'''
    		pass
    
    	def evaluate_policy(self, total_steps):
    		'''
    		用于检验决策模型得分
    		'''
    		pass
    
    	def _set_from_args(self, args):
    		'''
    		设置参数
    		'''
    		pass
    	
    	@staticmethod
        def get_argument(parser=None):
        	'''
    		获取参数
    		'''
        	pass
    

    2、初始化部分

        def __init__(self, policy,
                     env,
                     args,
                     test_env=None):
            self._set_from_args(args)
            self._policy = policy
            self._env = env
            self._test_env = self._env if test_env is None else test_env
    		
    		# 正则化状态
    		# obs-mean/(var+ 1e8)
            if self._normalize_obs:
                self._env = NormalizeObsEnv(self._env)
                self._test_env = NormalizeObsEnv(self._test_env)
    		
    		...
    		# 省略部分用于监测数据的代码
    		...
    

    3、调用

    ppo2算法里面,规定序列 T T T长度,即每一轮的最大步数,当步数达到最大值或者该轮结束时,通过以下式子进行网络更新,其中 k ⊆ [ 0 , T − 1 ] k \subseteq [0,T-1] k[0T1]
    θ k + 1 = arg ⁡ max ⁡ θ E s , a ∼ π θ k [ L ( s , a , θ k , θ ) ] \theta_{k+1} = \arg \max_{\theta} \underset{s,a \sim \pi_{\theta_k}}{{\mathrm E}}\left[ L(s,a,\theta_k, \theta)\right] θk+1=argθmaxs,aπθkE[L(s,a,θk,θ)]

    buffer的存储调用利用cpprb库提供的api实现。回调函数如下

    
        def __call__(self):
    
    		# 准备每一轮更新用的buffer
            # Prepare buffer
            self.replay_buffer = get_replay_buffer(
                self._policy, self._env)
            kwargs_local_buf = get_default_rb_dict(
                size=self._policy.horizon, env=self._env)
            kwargs_local_buf["env_dict"]["logp"] = {}
            kwargs_local_buf["env_dict"]["val"] = {}
            if is_discrete(self._env.action_space):
                kwargs_local_buf["env_dict"]["act"]["dtype"] = np.int32
            self.local_buffer = ReplayBuffer(**kwargs_local_buf)
    
            episode_steps = 0
            episode_return = 0
            episode_start_time = time.time()
            total_steps = np.array(0, dtype=np.int32)
            n_epoisode = 0
            obs = self._env.reset()
    
            tf.summary.experimental.set_step(total_steps)
            while total_steps < self._max_steps:
                # Collect samples
                for _ in range(self._policy.horizon):
                    act, logp, val = self._policy.get_action_and_val(obs)
                    next_obs, reward, done, _ = self._env.step(act)
    
                    episode_steps += 1
                    total_steps += 1
                    episode_return += reward
    
                    done_flag = done
                    if hasattr(self._env, "_max_episode_steps") and \
                            episode_steps == self._env._max_episode_steps:
                        done_flag = False
                    self.local_buffer.add(
                        obs=obs, act=act, next_obs=next_obs,
                        rew=reward, done=done_flag, logp=logp, val=val)
                    obs = next_obs
    				
    
                    if done or episode_steps == self._episode_max_steps:
                        tf.summary.experimental.set_step(total_steps)
                        self.finish_horizon()
                        obs = self._env.reset()
                        n_epoisode += 1
                        fps = episode_steps / (time.time() - episode_start_time)
                        self.logger.info(
                            "Total Epi: {0: 5} Steps: {1: 7} Episode Steps: {2: 5} Return: {3: 5.4f} FPS: {4:5.2f}".format(
                                n_epoisode, int(total_steps), episode_steps, episode_return, fps))
                       
                        episode_steps = 0
                        episode_return = 0
                        episode_start_time = time.time()
    
                self.finish_horizon(last_val=val)
    
                tf.summary.experimental.set_step(total_steps)
    
                # 更新参数
                if self._policy.normalize_adv:
                    samples = self.replay_buffer._encode_sample(np.arange(self._policy.horizon))
                    mean_adv = np.mean(samples["adv"])
                    std_adv = np.std(samples["adv"])
                with tf.summary.record_if(total_steps % self._save_summary_interval == 0):
                    for _ in range(self._policy.n_epoch):
                        samples = self.replay_buffer._encode_sample(
                            np.random.permutation(self._policy.horizon))
                        if self._policy.normalize_adv:
                            adv = (samples["adv"] - mean_adv) / (std_adv + 1e-8)
                        else:
                            adv = samples["adv"]
                        for idx in range(int(self._policy.horizon / self._policy.batch_size)):
                            target = slice(idx * self._policy.batch_size,
                                           (idx + 1) * self._policy.batch_size)
                            self._policy.train(
                                states=samples["obs"][target],
                                actions=samples["act"][target],
                                advantages=adv[target],
                                logp_olds=samples["logp"][target],
                                returns=samples["ret"][target])
    
            
    

    4、计算adv

    在ppo2里面,优势值通过以下方式计算:
    A t = δ t + ( γ λ ) δ t + 1 + . . . + ( γ λ ) T − t + 1 δ T − 1 A_t = \delta_t + (\gamma\lambda )\delta_{t+1}+ ... + (\gamma\lambda)^{T-t+1}\delta_{T-1} At=δt+(γλ)δt+1+...+(γλ)Tt+1δT1

    其中,

    δ t = r t + γ V ( s t + 1 ) − V ( s t ) \delta _t = r_t +\gamma V(s_{t+1})-V(s_t) δt=rt+γV(st+1)V(st)

        def finish_horizon(self, last_val=0):
            samples = self.local_buffer._encode_sample(
                np.arange(self.local_buffer.get_stored_size()))
            rews = np.append(samples["rew"], last_val)
            vals = np.append(samples["val"], last_val)
    
            # GAE-Lambda advantage calculation
            deltas = rews[:-1] + self._policy.discount * vals[1:] - vals[:-1]
            if self._policy.enable_gae:
                advs = discount_cumsum(
                    deltas, self._policy.discount * self._policy.lam)
            else:
                advs = deltas
    
            # Rewards-to-go, to be targets for the value function
            rets = discount_cumsum(rews, self._policy.discount)[:-1]
            self.replay_buffer.add(
                obs=samples["obs"], act=samples["act"], done=samples["done"],
                ret=rets, adv=advs, logp=np.squeeze(samples["logp"]))
            self.local_buffer.clear()
    

    其中,discount_cumsum函数用以下方式实现

    def discount_cumsum(x, discount):
        """
        Forked from rllab for computing discounted cumulative sums of vectors.
    
        :param x (np.ndarray or tf.Tensor)
            vector of [x0, x1, x2]
        :return output:
            [x0 + discount * x1 + discount^2 * x2,
             x1 + discount * x2,
             x2]
        """
        return lfilter(
            b=[1],
            a=[1, float(-discount)],
            x=x[::-1],
            axis=0)[::-1]
    

    5、检验函数

        def evaluate_policy(self, total_steps):
            if self._normalize_obs:
                self._test_env.normalizer.set_params(
                    *self._env.normalizer.get_params())
            avg_test_return = 0.
            if self._save_test_path:
                replay_buffer = get_replay_buffer(
                    self._policy, self._test_env, size=self._episode_max_steps)
            for i in range(self._test_episodes):
                episode_return = 0.
                frames = []
                obs = self._test_env.reset()
                for _ in range(self._episode_max_steps):
                    act, _ = self._policy.get_action(obs, test=True)
                    act = act if not hasattr(self._env.action_space, "high") else \
                        np.clip(act, self._env.action_space.low, self._env.action_space.high)
                    next_obs, reward, done, _ = self._test_env.step(act)
                    if self._save_test_path:
                        replay_buffer.add(
                            obs=obs, act=act, next_obs=next_obs,
                            rew=reward, done=done)
                            
                    episode_return += reward
                    obs = next_obs
                    if done:
                        break
                prefix = "step_{0:08d}_epi_{1:02d}_return_{2:010.4f}".format(
                    total_steps, i, episode_return)
                
            return avg_test_return / self._test_episodes
    
    

    三、 run_ppo

    导入相关模块。utils主要涵盖了一些琐碎的功能,例如跟环境相关的。

    import tensorflow as tf
    
    from ppo import PPO
    from on_policy_trainer import OnPolicyTrainer
    from utils import is_discrete, get_act_dim
    

    主程序,先从trainer那里获取默认的超参数,然后设定跟训练集测试集相关的参数。ppo算法的是随机连续决策算法,根据openAI官方推荐,其模型输出的方差不是一个函数并且与环境无关。

    官方说法:
    There is a single vector of log standard deviations, log ⁡ σ \log \sigma logσ, which is not a function of state: the log ⁡ σ \log \sigma logσ are standalone parameters. (You Should Know: our implementations of VPG, TRPO, and PPO do it this way.)

    if __name__ == '__main__':
        parser = OnPolicyTrainer.get_argument()
        parser = PPO.get_argument(parser)
        parser.add_argument('--env-name', type=str,
                            default="Pendulum-v0")
        parser.set_defaults(test_interval=20480)
        parser.set_defaults(max_steps=int(1e7))
        parser.set_defaults(horizon=2048)
        parser.set_defaults(batch_size=64)
        parser.set_defaults(gpu=-1)
        parser.set_defaults(episode_max_steps=200)
        args = parser.parse_args()
    
        env = gym.make(args.env_name)
        
        test_env = gym.make(args.env_name)
    
        policy = PPO(
            state_shape=env.observation_space.shape,
            action_dim=get_act_dim(env.action_space),
            is_discrete=is_discrete(env.action_space),
            max_action=None if is_discrete(
                env.action_space) else env.action_space.high[0],
            batch_size=args.batch_size,
            actor_units=[128, 64],
            critic_units=[128, 64],
            n_epoch=10,
            n_epoch_critic=10,
            lr_actor=3e-4,
            lr_critic=3e-4,
            discount=0.99,
            lam=0.95,
            hidden_activation=tf.nn.relu,
            horizon=args.horizon,
            normalize_adv=args.normalize_adv,
            enable_gae=args.enable_gae,
            gpu=args.gpu)
        trainer = OnPolicyTrainer(policy, env, args)
        trainer()
    

    最后来看下结果,大约在400k步的时候就开始收敛了,如果想收敛地更快可以自己尝试一下调整参数

    在这里插入图片描述


    如果觉得ok,点个赞,点个关注,也欢迎给个打赏支持一下编者的工作
    展开全文
  • 【强化学习】揭开PPO2算法的神秘面纱(一)

    千次阅读 多人点赞 2020-02-01 19:30:35
    PPO受到与TRPO相同的问题的启发:我们如何才能使用当前拥有的数据在策略上采取最大可能的改进步骤,而又不会走得太远而导致意外导致性能下降?在TRPO试图通过复杂的二阶方法解决此问题的地方...其次PPO算法也是AC架构。

    在上一篇强化学习应该知道的一些概念当中我们已经介绍了许多相关理论要点以及给出部分公式,下面就结合相关代码进行实践,由于篇幅过长,会分为两部分(或以上)进行讲解。

    另外,可能有些地方名字不一样但指的是同一个东西的地方也请大家见谅,因为很多专业名词都暂时没有找到相关的翻译,后续会慢慢更改的~

    一、算法简介

    PPO受到与TRPO相同的问题的启发:我们如何才能使用当前拥有的数据在策略上采取最大可能的改进步骤,而又不会走得太远而导致意外导致性能下降?在TRPO试图通过复杂的二阶方法解决此问题的地方,PPO使用的是一阶方法,它使用其他一些技巧来使新策略接近于旧策略。PPO方法实施起来非常简单,而且从经验上看,其性能至少与TRPO相同。其次PPO算法也是AC架构。

    PPO有两种主要形式:PPO-Penalty和PPO-Clip。

    PPO-Penalty:近似地解决了TRPO之类的受KL约束的更新,但对目标函数中的KL偏离进行了惩罚而不是使其成为硬约束,并在训练过程中自动调整惩罚系数,以便对其进行适当缩放。

    PPO-Clip:在目标中没有KL散度项,也完全没有约束。取而代之的是依靠对目标函数的专门裁剪来减小新老策略的差异。

    在这里,我们只讨论PPO-Clip(OpenAI使用的主要形式)。

    OPENAI的baseline里面有两个版本的ppo算法,安装他们的注释,ppo2应该才是正式版,其实两者差别不大,主要体现在算法的优化上面,基本思想还是限制新老策略的差别以防更新过快。

    1、关键点

    1.1 损失函数的设计

    ppo算法最精髓的地方就是加入了一项比例用以描绘新老策略的差异,用 r t ( θ ) = π θ ( a t ∣ s t ) π θ k ( a t ∣ s t ) r_t(\theta)=\frac{\pi_{\theta}(a_t|s_t)}{\pi_{\theta _k}(a_t|s_t)} rt(θ)=πθk(atst)πθ(atst)表示,损失函数如下:

    L ( s , a , θ k , θ ) = min ⁡ [ r ( θ ) A π θ k ( s , a ) ,      clip ( r ( θ ) , 1 − ϵ , 1 + ϵ ) A π θ k ( s , a ) ] L(s,a,\theta_k,\theta) = \min\left[ r(\theta) A^{\pi_{\theta_k}}(s,a), \;\; \text{clip}\left(r(\theta), 1 - \epsilon, 1+\epsilon \right) A^{\pi_{\theta_k}}(s,a) \right] L(s,a,θk,θ)=min[r(θ)Aπθk(s,a),clip(r(θ),1ϵ,1+ϵ)Aπθk(s,a)]

    其中, ϵ \epsilon ϵ是一个(很小的)超参数,用以粗略限制 r ( θ ) r(\theta) r(θ)的变化范围。为了表达方便,我们将上面将该损失函数用 L c l i p ( θ ) L^{clip}(\theta) Lclip(θ)表示
     

    ppo2里面,如果critic和actor共享参数的话,损失函数定义为如下:
    L C L I P + V F + S = L C L I P ( θ ) − c 1 L V F ( θ ) + c 2 S [ π θ ] ( s t ) L^{CLIP + VF + S} = L^{CLIP}(\theta) - c_1 L^{VF}(\theta) +c_2S[\pi_{\theta}](s_t) LCLIP+VF+S=LCLIP(θ)c1LVF(θ)+c2S[πθ](st)

    其中, c 1 , c 2 c_1,c_2 c1,c2是一个系数, S S S用于计算熵值, L V F L^{VF} LVF是value函数的损失函数,形式为: ( V θ ( s t ) − V t t a r g e t ) 2 (V_{\theta}(s_t)-V_t^{target})^2 (Vθ(st)Vttarget)2

    1.2 优势函数设计

    给定长度为 T T T的序列,t作为时间 [ 0 , T ] [0,T] [0,T]的下标
    A t = δ t + ( γ λ ) δ t + 1 + . . . + ( γ λ ) T − t + 1 δ T − 1 A_t = \delta_t + (\gamma\lambda )\delta_{t+1}+ ... + (\gamma\lambda)^{T-t+1}\delta_{T-1} At=δt+(γλ)δt+1+...+(γλ)Tt+1δT1
    其中,
    δ t = r t + γ V ( s t + 1 ) − V ( s t ) \delta _t = r_t +\gamma V(s_{t+1})-V(s_t) δt=rt+γV(st+1)V(st)

    2、算法流程

    PPO1的算法流程,ppo2里面大同小异,只是替换了里面某些公式。
    在这里插入图片描述

    3、代码结构

    • 根据动作类型创建决策网络
    • 计算loss
    • 环境交互以及参数更新
    class PPO(OnPolicyAgent):
        def __init__(
                self,
                state_shape,
                action_dim,
                is_discrete,
                max_action=1.,
                actor_units=[256, 256],
                critic_units=[256, 256],
                lr_actor=1e-3,
                lr_critic=3e-3,
                const_std=0.3,
                hidden_activation_actor="relu",
                hidden_activation_critic="relu",
                clip=True,
                clip_ratio=0.2,
                name="PPO",
                **kwargs):
            super().__init__(name=name, **kwargs)
            self.clip = clip
            self.clip_ratio = clip_ratio
            self._is_discrete = is_discrete
    
            if is_discrete:
                self.actor = CategoricalActor(
                    state_shape, action_dim, actor_units)
            else:
                self.actor = GaussianActor(
                    state_shape, action_dim, max_action, actor_units,
                    hidden_activation=hidden_activation_actor,
                    const_std=const_std)
    
            self.critic = CriticV(state_shape, critic_units,
                                  hidden_activation=hidden_activation_critic)
    
            self.actor_optimizer = tf.keras.optimizers.Adam(
                learning_rate=lr_actor)
            self.critic_optimizer = tf.keras.optimizers.Adam(
                learning_rate=lr_critic)
    
            # This is used to check if input state to `get_action` is multiple (batch) or single
            self._state_ndim = np.array(state_shape).shape[0]
    
        def train(self, states, actions, advantages, logp_olds, returns):
            # Train actor and critic
            
            actor_loss, logp_news, ratio, ent = self._train_actor_body(
                    states, actions, advantages, logp_olds)
            critic_loss = self._train_critic_body(states, returns)
            return actor_loss, critic_loss
    
        @tf.function
        def _train_actor_critic_body(
            pass
    
        @tf.function
        def _train_actor_body(self, states, actions, advantages, logp_olds):
            pass
    
        @tf.function
        def _train_critic_body(self, states, returns):
            pass
    
        def get_action(self, state, test=False):
    		pass
    
        def get_action_and_val(self, state, test=False):
            pass
    
        @tf.function
        def _get_action_logp_v_body(self, state, test):
            pass
    
    

    二、决策模型(policies)

    决策模型根据动作值是否由网络直接输出分为两种类型:确定性决策、随机决策。就流程上而言,随机决策的网络部分是跟确定性决策一致,而后者则少了采样、似然函数所引申出来的相关功能函数,因此本篇主要针对随机决策进行讨论

    1、确定性决策

    动作值直接由网络生成,其形式可以表达为:
    a t = μ θ ( s t ) a_t = \mu_{\theta}(s_t) at=μθ(st)

    2、随机决策

    随机决策根据动作连续与否分为分类决策和高斯决策,决策模型输出的是动作的概率分布。其形式如下:

    a t ∼ π θ ( ⋅ ∣ s t ) a_t \sim \pi_{\theta}(\cdot|s_t) atπθ(st)

    不管是哪一种,都包含以下两个技术要点:

    • 对动作进行采样
    • 计算动作的似然函数

    2.1 分类决策

    分类决策网络输出层的激活函数使用softmax,将logit转换为概率。所以该决策网络的输出就是各种可能动作的概率,是一组向量。如果动作为k维向量,输出即为每个动作发生的概率所组成的向量。

    2.1.1 创建模型

    代码如下,重写keras.Model类,默认情况下只有两个隐藏层。输入的是时间 t t t的状态,输出动作,动作的概率(用字典表达),以及其对数概率,给出的代码里面省略了两个功能函数,往后会详谈。

    
    class CategoricalActor(tf.keras.Model):
        def __init__(self, state_shape, action_dim, units=[256, 256],
                     name="CategoricalActor"):
            super().__init__(name=name)
            self.dist = Categorical(dim=action_dim)
            self.action_dim = action_dim
    
            self.l1 = Dense(units[0], activation='relu')
            self.l2 = Dense(units[1], activation='relu')
            self.prob = Dense(action_dim, activation='softmax')
    
            self(tf.constant(
                np.zeros(shape=(1,)+state_shape, dtype=np.float32)))
    
        def _compute_feature(self, states):
            features = self.l1(states)
            return self.l2(features)
    
        def _compute_dist(self, states):
            """
            Compute categorical distribution
    
            :param states (np.ndarray or tf.Tensor): Inputs to neural network.
                NN outputs probabilities of K classes
            :return: Categorical distribution
            """
            features = self._compute_feature(states)
            probs = self.prob(features)
            return {"prob": probs}
    
        def call(self, states, test=False):
            """
            Compute actions and log probability of the selected action
    
            :return action (tf.Tensors): Tensor of actions
            :return log_probs (tf.Tensor): Tensors of log probabilities of selected actions
            """
            param = self._compute_dist(states)
            if test:
                action = tf.math.argmax(param["prob"], axis=1)  # (size,)
            else:
                action = tf.squeeze(self.dist.sample(param), axis=1)  # (size,)
            log_prob = self.dist.log_likelihood(
                tf.one_hot(indices=action, depth=self.action_dim), param)
    
            return action, log_prob, param
    
        def compute_entropy(self, states):
            pass
    
        def compute_log_probs(self, states, actions):
            '''
    		该函数用于计算动作概率分布的似然函数,
    		在2.1.3里面有具体实现,这是只是实现了调用。故省略
    		'''
    
    

    2.1.2 采样函数

    根据概率分布随机选取一个特定的动作。probs是神经网络输出的动作概率分布,

        def sample(self, param):
            probs = param["prob"]
            # 注意: `tf.random.categorical` 的输入是对数概率 
            # 文档https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/random/categorical
            # 输出的形状[probs.shape[0], 1]
            return tf.random.categorical(tf.math.log(probs), 1)
    

    2.1.3似然函数

    将动作概率分布转换成对数概率。其值是一个由对数概率所组成的向量。使用该函数前,需要对采样得到的动作进行one-hot编码,才能作为x输入。parm是一个字典,里面包含了动作概率分布。
    log ⁡ π θ ( a ∣ s ) = log ⁡ [ P θ ( s ) ] a . \log \pi_{\theta}(a|s) = \log \left[P_{\theta}(s)\right]_a. logπθ(as)=log[Pθ(s)]a.

        def log_likelihood(self, x, param):
            """
            Compute log likelihood as:
                \log \sum(p_i * x_i)
    
            :param x (tf.Tensor or np.ndarray): Values to compute log likelihood
            :param param (Dict): Dictionary that contains probabilities of outputs
            :return (tf.Tensor): Log probabilities
            """
            probs = param["prob"]
            assert probs.shape == x.shape, \
                "Different shape inputted. You might have forgotten to convert `x` to one-hot vector."
            return tf.math.log(tf.reduce_sum(probs * x, axis=1) + self._tiny)
    
    

    2.2 连续决策(Diagonal Gaussian Policies)

    高斯决策(连续决策)的特点是产生连续动作,其决策模型输出的是均值 μ \mu μ和对数方差 log ⁡ ( σ ) \log(\sigma) log(σ),其中对数方差有两种取值方式,一种是固定值,一种是由决策网络给出,下面我们只介绍第一种取值方法。(具体的理论部分可以去查看开头所引用的文章。

    2.2.1 模型创建

    代码结构跟分类决策类似,主要是对象返回的内容不一样,这里返回的是动作值,其对数概率,以及字典保存的均值和方差

    class GaussianActor(tf.keras.Model):
        LOG_SIG_CAP_MAX = 2  # np.e**2 = 7.389
        LOG_SIG_CAP_MIN = -20  # np.e**-10 = 4.540e-05
        EPS = 1e-6
    
        def __init__(self, state_shape, action_dim, max_action,
                     units=[256, 256], hidden_activation="relu",
                     const_std=0.1,
                     squash=False, name='GaussianPolicy'):
            super().__init__(name=name)
            self.dist = DiagonalGaussian(dim=action_dim)
            self._const_std = const_std
            self._max_action = max_action
            self._squash = squash
    
            self.l1 = keras.layers.Dense(units[0], name="L1", activation=hidden_activation)
            self.l2 = keras.layers.Dense(units[1], name="L2", activation=hidden_activation)
            self.out_mean = keras.layers.Dense(action_dim, name="L_mean")
    
            self.out_log_std = tf.Variable(
                initial_value=-0.5*np.ones(action_dim, dtype=np.float32),
                dtype=tf.float32, name="logstd")
    
            self(tf.constant(
                np.zeros(shape=(1,)+state_shape, dtype=np.float32)))
    
        def _compute_dist(self, states):
            """
            Compute multivariate normal distribution
    
            :param states (np.ndarray or tf.Tensor): Inputs to neural network.
                NN outputs mean and standard deviation to compute the distribution
            :return (Dict): Multivariate normal distribution
            """
            features = self.l1(states)
            features = self.l2(features)
            mean = self.out_mean(features)
            log_std = tf.ones_like(mean) * tf.math.log(self._const_std)
    
            return {"mean": mean, "log_std": log_std}
    
        def call(self, states, test=False):
            """
            Compute actions and log probabilities of the selected action
            """
            param = self._compute_dist(states)
            if test:
                raw_actions = param["mean"]
            else:
                raw_actions = self.dist.sample(param)
            logp_pis = self.dist.log_likelihood(raw_actions, param)
    
            actions = raw_actions
    
            if self._squash:
                actions = tf.tanh(raw_actions)
                logp_pis = self._squash_correction(logp_pis, actions)
    
            return actions * self._max_action, logp_pis, param
    
        def compute_log_probs(self, states, actions):
            '''
            该功能函数用于计算似然函数,在2.2.3里面有具体实现,略
            '''
        def compute_entropy(self, states):
            pass
    
        def _squash_correction(self, logp_pis, actions):
            pass
    
    

    2.2.2 采样

    给定输出动作的均值 μ θ ( s ) \mu_{\theta}(s) μθ(s)以及对数方差求动作最终输出值,其中 z ∼ N ( 0 , 1 ) z \sim N(0,1) zN(0,1)
    a = μ θ ( s ) + σ θ ( s ) ⊙ z a = \mu_{\theta}(s) + \sigma_{\theta}(s) \odot z a=μθ(s)+σθ(s)z

        def sample(self, param):
            means = param["mean"]
            log_stds = param["log_std"]
            
            # tf.math.exp : e^x
            # log_std: ln (x)
            # std = e^log_std
            return means + tf.random.normal(shape=means.shape) * tf.math.exp(log_stds)
    
    

    2.2.3 似然函数

    对于 k k k维动作 a a a,其均值为 μ = μ θ ( s ) \mu=\mu_{\theta}(s) μ=μθ(s),方差为 σ = σ θ ( s ) \sigma=\sigma_{\theta}(s) σ=σθ(s),似然函数由以下公式给出:
    log ⁡ π θ ( a ∣ s ) = − 1 2 ( ∑ i = 1 k ( ( a i − μ i ) 2 σ i 2 + 2 log ⁡ σ i ) + k log ⁡ 2 π ) \log \pi_{\theta}(a|s) = -\frac{1}{2}\left(\sum_{i=1}^k \left(\frac{(a_i - \mu_i)^2}{\sigma_i^2} + 2 \log \sigma_i \right) + k \log 2\pi \right) logπθ(as)=21(i=1k(σi2(aiμi)2+2logσi)+klog2π)

        def log_likelihood(self, x, param):
            '''
            parm同样是一个字典,记录了模型产生的均值和方差
            '''
            means = param["mean"]
            log_stds = param["log_std"]
            assert means.shape == log_stds.shape
            zs = (x - means) / tf.exp(log_stds)
            return - tf.reduce_sum(log_stds, axis=-1) \
                   - 0.5 * tf.reduce_sum(tf.square(zs), axis=-1) \
                   - 0.5 * self.dim * tf.math.log(2 * np.pi)
    

    至此PPO算法的主体模型已经搭建完成,剩下的部分由于篇幅问题会放在下一篇中讲解,代码参考tf2rl开源强化学习库,ubuntu的小伙伴可以直接pip安装即可,window下想要安装可以参考这篇文章


    如果觉得ok,点个赞,点个关注,也欢迎给个打赏支持一下编者的工作
    展开全文
  • 通过使用纸张近端策略优化算法推出近端政策优化(PPO算法。 供您参考,PPO是OpenAI提出的算法,用于训练OpenAI Five,这是第一款在电竞游戏中击败世界冠军的AI。 具体来说,OpenAI五人队在2018年8月派出了一支由...
  • PPO算法

    千次阅读 2020-05-23 15:02:56
    近端策略优化(Proximal Policy Optimization,PPO算法背景: 策略梯度虽然在有一定难度的问题中取得了一些成效,但是这类方法对于迭代步骤的数量非常敏感;如果选的太小,训练的结果就会令人绝望;如果选的过大...

    近端策略优化(Proximal Policy Optimization,PPO)

    算法背景

    1. 策略梯度虽然在有一定难度的问题中取得了一些成效,但是这类方法对于迭代步骤的数量非常敏感;如果选的太小,训练的结果就会令人绝望;如果选的过大,反馈信号就会淹没在噪声中,甚至有可能使训练模型呈现雪崩式的下降。这类方法的采样效率也是非常低,学习简单的任务就需要百万级甚至以上的总迭代次数。
    2. 策略梯度方法的缺点是数据效率和鲁棒性不好。对于TRPO算法比较复杂,且不兼容dropout和参数共享(策略和价值网络间),且使用约束的方法参数难以调节,同时算法实现的复杂性。故不建议使用TRPO算法,而使用PPO算法

    PPO算法的提出:旨在借鉴TRPO算法,使用一阶优化,在采样效率、算法表现,以及实现和调试的复杂度之间取得了新的平衡。这是因为PPO会在每一次迭代中尝试计算新的策略,让损失函数最小化,并且保证每一次新计算出的策略能够和原策略相差不大,目前,openai已经把PPO作为自己RL中的首选算法。

    展开全文
  • 通过使用纸张近端策略优化算法推出近端政策优化(PPO算法。 说到性能,我经过PPO培训的代理可以完成29/32个级别,这比我一开始的预期要好得多。 供您参考,PPO是OpenAI提出的算法,用于训练OpenAI Five,这是第...
  • PPO算法详解

    千次阅读 2020-05-27 23:13:50
    1. PPO算法思想 PPO算法是一种新型的Policy Gradient算法,Policy Gradient算法对步长十分敏感,但是又难以选择合适的步长,在训练过程中新旧策略的的变化差异如果过大则不利于学习。PPO提出了新的目标函数可以再多...

    原文:Proximal Policy Optimization Algorithms [arXiv] [GitHub]

    1. PPO算法思想

    PPO算法是一种新型的Policy Gradient算法,Policy Gradient算法对步长十分敏感,但是又难以选择合适的步长,在训练过程中新旧策略的的变化差异如果过大则不利于学习。PPO提出了新的目标函数可以再多个训练步骤实现小批量的更新,解决了Policy Gradient算法中步长难以确定的问题。其实TRPO也是为了解决这个思想但是相比于TRPO算法PPO算法更容易求解。

    2. Policy Gradient回顾

    重新回顾一下Policy Gradient算法,Policy Gradient不通过误差反向传播,它通过观测信息选出一个行为直接进行反向传播,当然出人意料的是他并没有误差,而是利用reward奖励直接对选择行为的可能性进行增强和减弱,好的行为会被增加下一次被选中的概率,不好的行为会被减弱下次被选中的概率。

    策略 τ \tau τ的回报期望: ∇ R = E τ p θ ( τ ) [ R ( τ ) ∇ log ⁡ p θ ( τ ) ] \nabla R=E_{\tau p_\theta(\tau)}[R(\tau)\nabla\log p_\theta(\tau)] R=Eτpθ(τ)[R(τ)logpθ(τ)]

    重要性采样(Importance Sampling):

    E x p E_{xp} Exp

    参考资源

    [1] 【强化学习】PPO(Proximal Policy Optimization)近端策略优化算法
    [2] Proximal Policy Optimization

    展开全文
  • 通过使用近端策略优化算法论文中介绍的近端策略优化(PPO算法。 [PYTORCH]用于玩超级马里奥兄弟的近战策略优化(PPO)简介这是我的python源代码,用于训练特工玩超级马里奥兄弟。 通过使用近端策略优化算法论文中...
  • PPO算法实战

    千次阅读 2021-03-23 15:36:43
    PPO是一种off-policy算法,具有较好的性能,其前身是TRPO算法,也是policy gradient算法的一种,它是现在 OpenAI 默认的强化学习算法,具体原理可参考PPO算法讲解。PPO算法主要有两个变种,一个是结合KL penalty的,...
  • 本文主要讲解有关 TRPO算法PPO 算法PPO2算法以及 DPPO 算法的相关内容。 一、PPO 算法 PPO(Proximal Policy Optimization) 是一种解决 PG 算法中学习率不好确定的问题的算法,因为如果学习率过大,则学出来的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,925
精华内容 770
关键字:

ppo2算法