精华内容
下载资源
问答
  • 强化学习经典算法笔记(二):策略迭代算法Policy Iteration
    千次阅读
    2019-04-08 17:09:16

    强化学习经典算法笔记——策略迭代算法

      上一篇讲了价值迭代算法,这一篇介绍另一个动态规划算法——策略迭代算法(Policy Iteration)。

    简单介绍

      Value Iteration的思路是:先迭代找出一个最优的Value Function,然后再根据Value Function迭代出一个最优策略。

      Policy Iteration的思路是反着的,首先给定一个初始化的策略函数,一般是随机策略。给予这个策略,可以得到每个状态下采取的动作,进而得到reward和下一状态,利用更新法则,更新value function。

      然后,根据更新后的value function更新之前的随机策略。如此,就完成了value-policy的交替更新,直至收敛。

      

    编程实现

      还是以FrozenLake游戏为例,实现Policy Iteration算法。

    import gym
    import numpy as np
    
    env = gym.make('FrozenLake-v0')
    
    def compute_value_function(policy, gamma=1.0, threshold = 1e-20):
        '''
        计算value function
        '''
        # len=16
        value_table = np.zeros(env.nS)
        
        while True:
            updated_value_table = np.copy(value_table)
            for state in range(env.nS):
                
                # 根据策略函数,给定一个状态,输出该状态下的应该采取的动作
                action = policy[state]
                
                # 采取动作后,遍历所有可能的转移状态,计算状态价值
                # 和value iteration的区别是:
                # value iteration的策略是从value function中根据greedy原则选出来的
                # policy iteration的策略是事先给定的,value是根据policy得出的,这个action的价值代表了当前状态的价值?
                value_table[state] = sum([ trans_prob * (reward_prob + gamma * updated_value_table[next_state])
                    for trans_prob, next_state, reward_prob,_ in env.P[state][action] ])
                
            if (np.sum((np.fabs(updated_value_table - value_table))) <= threshold):
                break
        return value_table
    
    def extract_policy(value_table, gamma = 1.0):
        '''
        
        '''
        policy = np.zeros(env.observation_space.n)
        for state in range(env.observation_space.n):
            Q_table = np.zeros(env.action_space.n)
            for action in range(env.action_space.n):
                for next_sr in env.P[state][action]:
                    trans_prob, next_state, reward_prob,_= next_sr
                    Q_table[action] += (trans_prob * (reward_prob + gamma * value_table[next_state]))
            policy[state] = np.argmax(Q_table)
    
        return policy
    
    def policy_iteration(env,gamma = 1.0, no_of_iterations = 200000):
        '''
        状态值估计和策略函数的优化是交替进行的,从随机策略出发,估计状态价值
        再从收敛的状态值函数出发,优化之前的随机策略。由此往复,直至收敛
        '''
        gamma = 1.0
        
        # 随机策略
        random_policy = np.zeros(env.observation_space.n)
        
        
        for i in range(no_of_iterations):
            new_value_function = compute_value_function(random_policy, gamma)
            new_policy = extract_policy(new_value_function, gamma)
            if (np.all(random_policy == new_policy)):
                print('Policy-Iteration converged at step %d.'%(i+1))
                break
            random_policy = new_policy
        
        return new_policy
    
    print (policy_iteration(env))
    

      最后结果

    Policy-Iteration converged at step 7.
    [0. 3. 3. 3. 0. 0. 0. 0. 3. 1. 0. 0. 0. 2. 1. 0.]
    
    更多相关内容
  • 众包质量控制策略评估算法研究
  • 结合在人脸检测、检索算法上的测试探索、实践的过程,本文将从以下几个方面介绍人工智能(AI)算法测试策略。1.算法功能测试2.算法性能测试3.算法效果测试(模型评估指标)4.算法指标结果分析5.算法测试报告我们将算法...
  • 计算机网络信息安全防护策略评估算法研究.pdf
  • 策略迭代算法和值函数迭代算法

    千次阅读 2021-03-24 22:40:16
    策略评估算法3. 策略优化算法4. 策略迭代算法和值函数迭代算法5. 代码实现6. 强化学习与最优控制 1. 回顾与引言 上一章中介绍了马尔科夫决策过程(MDP),也介绍了状态值函数和行为-状态值函数的计算方法。由此我们...

    策略迭代算法和值函数迭代算法

    1. 回顾与引言

    大家如果不了解马尔科夫决策过程可以先阅读这篇文章:https://blog.csdn.net/qq_33302004/article/details/115027798
    上一篇文章中介绍了马尔科夫决策过程(MDP),也介绍了状态值函数和行为-状态值函数的计算方法。由此我们已经完成了对强化学习问题的建模过程,我们知道强化学习就是寻找一个最优策略 π \pi π,保证一个已知的MDP ( S , A , P , r , γ ) (S, A, P, r, \gamma) (S,A,P,r,γ)的累计回报期望最大,也就是:
    π = arg max ⁡ π ∫ R ( τ ) p π ( τ ) d τ \pi = \argmax_\pi \int {R(\tau)p_\pi(\tau)} d\tau π=πargmaxR(τ)pπ(τ)dτ
    我们把已知状态转移概率 P P P的问题有模型问题,把未知 P P P的问题叫做无模型问题,由此最优化MDP的方法可分为基于模型的动态规划方法和基于无模型的强化学习方法,如下图所示:
    请添加图片描述
    由图中可知,这两种方法都包括策略迭代算法、值函数迭代算法、策略搜索算法。本文将介绍基于模型的策略迭代算法值函数迭代算法

    2. 思路介绍

    先不考虑策略迭代或者值函数迭代的概念,来回顾一下我们要解决的问题。在序贯决策问题中,我们知道全部的状态S、可以采用的全部动作A,还知道在状态S下采用动作A会转移到什么状态S‘(P),以及对应的反馈R和损失因子 γ \gamma γ。我们现在我们需要考虑两个问题:

    1. 如何产生一个策略 π \pi π,也就是: a = π ( s ) a = \pi(s) a=π(s) => 策略优化
    2. 如何评价一个策略 π \pi π。 => 策略评估

    我们手上有两个武器,状态值函数和行为-状态值函数:
    ν π ( s ) = ∑ a ∈ A π ( a ∣ s ) ( R s a + γ ∑ s ′ ∈ S P s s ′ a ν π ( s ′ ) ) \nu_\pi(s) = \sum_{a\in A} \pi(a|s) \left( R_s^a + \gamma\sum_{s' \in S}P_{ss'}^a\nu_\pi(s') \right) νπ(s)=aAπ(as)(Rsa+γsSPssaνπ(s))
    q π ( s , a ) = R s a + γ ∑ s ′ ∈ S P s s ′ a ν π ( s ′ ) q_\pi(s,a) = R_s^a + \gamma\sum_{s' \in S}P_{ss'}^a\nu_\pi(s') qπ(s,a)=Rsa+γsSPssaνπ(s)
    通过状态值函数,我们可以评估策略 π \pi π当前的状态价值分布是如何的,通过行为-状态值函数,我们可以知道在当前状态下采用哪种动作更好,从而优化策略 π \pi π

    3. 策略评估算法

    策略评估算法就是在已知一个策略 π \pi π的情况下,如何计算出每个状态的值函数。解决方案是采用“高斯-赛尔德迭代算法”求解状态值函数的迭代公式:
    ν π ( s ) = ∑ a ∈ A π ( a ∣ s ) ( R s a + γ ∑ s ′ ∈ S P s s ′ a ν π ( s ′ ) ) \nu_\pi(s) = \sum_{a\in A} \pi(a|s) \left( R_s^a + \gamma\sum_{s' \in S}P_{ss'}^a\nu_\pi(s') \right) νπ(s)=aAπ(as)(Rsa+γsSPssaνπ(s))
    在上面这个式子中出了状态值函数外,所有变量都是已知的,所以本质上就是一个解方程的过程。不理解“高斯-赛尔德迭代算法”的话,可以简单理解为: x = f ( x ′ ) x=f(x') x=f(x),本次的 x x x可以用上一次计算的 x ′ x' x求得,并且通过无限迭代最终会收敛,收敛处的 x x x值就是方程组的解。

    策略评估算法的伪代码如下:
    请添加图片描述

    我们来举个例子,如下图所示为网格世界,其状态空间为S={1,2,…,14},动作空间为A={东,南,⻄,北},回报函数为r≡-1,需要评估的策略为均匀随机策略,也就是东南西北选择的概率都为0.25。
    请添加图片描述

    迭代0次、1次、2次、3次后每个状态的值函数的值如下图所示:
    请添加图片描述

    状态1,第二次迭代的值函数的值计算过程如下:
    v 2 ( 1 ) = 0.25 ( − 1 − 1 ) + 0.25 ( − 1 − 1 ) + 0.25 ( − 1 − 1 ) + 0.25 ( − 1 − 0 ) = − 1.75 ≈ − 1.7 \begin{aligned} v_2(1) &= 0.25(-1-1)+ 0.25(-1-1)+ 0.25(-1-1)+ 0.25(-1-0) \\ &= -1.75\approx-1.7 \end{aligned} v2(1)=0.25(11)+0.25(11)+0.25(11)+0.25(10)=1.751.7
    状态1,第三次迭代的值函数的值计算过程如下:
    v 3 ( 1 ) = 0.25 ( − 1 − 1.7 ) + 0.25 ( − 1 − 2 ) + 0.25 ( − 1 − 2 ) + 0.25 ( − 1 − 0 ) = − 2.425 ≈ − 2.4 \begin{aligned} v_3(1) &= 0.25(-1-1.7)+ 0.25(-1-2)+ 0.25(-1-2)+ 0.25(-1-0) \\ &= -2.425\approx-2.4 \end{aligned} v3(1)=0.25(11.7)+0.25(12)+0.25(12)+0.25(10)=2.4252.4

    3. 策略优化算法

    现在我们来解决如何改善策略的问题。很自然的解决方案是采用贪心的策略来解决这个问题,也就是在每个状态都采用行为-状态值函数最大的动作,如下图所示:
    请添加图片描述

    4. 策略迭代算法和值函数迭代算法

    所谓策略迭代是指先初始化策略、然后在每个策略下计算出收敛的状态值(采用策略评估算法),而后根据计算出的值使用策略优化算法修改策略,如此迭代,直到策略收敛,就作为最优策略,伪代码如下:
    请添加图片描述

    值函数迭代算法也是先初始化策略,而后在每次更新状态的值的时候,直接采用当前状态下能够产生的最大值更新值(而不是向策略迭代算法中那样采用均值),直到值收敛后,采用一次策略优化获得最终的策略,伪代码如下:
    请添加图片描述

    5. 代码实现

    请添加图片描述请添加图片描述
    请添加图片描述
    请添加图片描述

    6. 强化学习与最优控制

    最优控制领域经过几十年的发展有许多的优秀成果,在基于模型的强化学习算法中可以利用这些优秀成果。基于模型的算法中,会先根据数据拟合出一个模型,根据已知的模型,最优控制领域中有很多好的方法可以计算出最优策略解,智能体再根据这些最优控制策略与环境交互,完成进一步优化。

    结合最优控制的强化学习算法最典型的应用就是引导策略搜索算法。

    展开全文
  • 强化学习笔记(二)---- 策略迭代算法

    万次阅读 多人点赞 2018-08-15 23:39:40
    强化学习有两种常见迭代训练算法策略迭代算法和值迭代算法。本文中主要讲述策略迭代算法。 先从一个简答的问题开始,下图为一个四方格子,每个位置的状态空间分别为{1, 2, 3, 4}, 其中 3 的位置是个陷阱, 4的...

    强化学习有两种常见迭代训练算法:策略迭代算法和值迭代算法。本文中主要讲述策略迭代算法。

    先从一个简答的问题开始,下图为一个四方格子,每个位置的状态空间分别为{1, 2, 3, 4}, 其中 3 的位置是个陷阱, 4的位置有个金币。有一个机器人从状态1的位置开始寻找金币。落入陷阱的回报为-1,找到金币的回报为1,在其他位置间移动回报为0,可选的动作空间为{上,下,左,右}, 通过这个简单的问题,来学习强化学习的学习原理。

    这里写图片描述

    强化学习的学习过程,个人理解就是通过不断的尝试,去更新每个状态的值函数(每个状态的值代表了当前状态的优劣,如果状态值很大,从其他状态选择一个动作,转移到该状态便是一个正确的选择),然后通过更新后的值函数去动态的调整策略,在调整策略后,又去更新值函数,不断的迭代更新,最后训练完成一个满足要求的策略。在这个过程中,抽象出两个主要的过程,第一个叫策略评估,第二个叫策略改善。

    针对上面给出的简单问题,先说明一些简单的概念:

    每个状态的值函数:

    代表机器人处于该状态时的优劣值。

    针对问题的当前策略:

    代表机器人处于某状态时,选择的下一步动作。对于选择的下一步动作,可以是确定式的,比如当机器人处于1位置的时候,确定的只选择往右走。也可以是概率式的,可以0.5的概率选择往右走, 0.5的概率选择往下走。当然确定式策略选择是概率式的策略选择的一种特例。下文中采用确定式策略进行描述

    策略评估:

    策略评估就是通过某种方式,计算状态空间中每个状态的值函数。由于状态空间之间存在很多转移关系,要直接计算某个状态的值函数,是很困难的,一般采用
    迭代方法。

    策略改善:

    对策略的改善,即通过当前拥有的信息,对当前策略进行优化,修改当前策略。

    ############################## 策略评估的过程

    初始化的策略和值函数。

    这里写图片描述

    对于这个简单的例子,通过一步计算便得到了稳定的值函数,但是对于大多数的问题,都需要通过多步的迭代,才能得到稳定的值函数。

    ############################## 策略改善的过程

    对于这个简单的例子,采用贪心的方式对策略进行改善,通过上一步策略评估过程计算出的稳定的值函数,让每个状态在选择下一步动作的时候,选择使动作收益最大的动作。
    这里写图片描述

    总结

    强化学习策略迭代算法的过程就是不断的重复 策略评估 和 策略改善的过程,直到整个策略收敛(值函数和策略不再发生大的变化)

    gym的样例代码展示

    上述的简单示例,简单描述了强化学习的策略迭代算法的过程,下面将问题搞复杂一点,通过对该问题进行编程,加深对策略迭代算法的理解。新问题的空间状态如下图所示。

    这里写图片描述

    该图的状态空间由下置上,从左到右,分别为1 – 36

    import numpy as np
    import random
    from gym import spaces
    import gym
    from gym.envs.classic_control import rendering
    
    #模拟环境类
    class GridWorldEnv(gym.Env):
        #相关的全局配置
        metadata = {
            'render.modes':['human', 'rgb_array'],
            'video.frames_per_second': 2
        }
    
        def __init__(self):
            self.states = [i for i in range(1, 37)] #初始化状态
            self.terminate_states = [3, 7, 11, 15, 19, 20, 23, 30,  33, 34] #终结态
            self.actions = ['up', 'down', 'left', 'right'] #动作空间
    
            self.v_states = dict() #状态的值空间
            for state in self.states:
                self.v_states[state] = 0.0
    
            for state in self.terminate_states: #先将所有陷阱和黄金的值函数初始化为-1.0
                self.v_states[state] = -1.0
    
            self.v_states[34] = 1.0  #黄金的位置值函数初始化为 1
    
            self.initStateAction() #初始化每个状态的可行动作空间
            self.initStatePolicyAction() #随机初始化当前策略
    
            self.gamma = 0.8 #计算值函数用的折扣因子
            self.viewer = None #视图对象
            self.current_state = None #当前状态
            return
    
        def translateStateToRowCol(self, state):
            """
            将状态转化为行列坐标返回
            """
            row = (state - 1) // 6
            col = (state - 1) %  6
            return row, col
    
        def translateRowColToState(self, row, col):
            """
            将行列坐标转化为状态值
            """
            return row * 6 + col + 1
    
        def actionRowCol(self, row, col, action):
            """
            对行列坐标执行动作action并返回坐标
            """
            if action == "up":
                row = row - 1
            if action == "down":
                row = row + 1
            if action == "left":
                col = col - 1
            if action == "right":
                col = col + 1
            return row, col
    
        def canUp(self, row, col):
            row = row - 1
            return 0 <= row <= 5
    
        def canDown(self, row, col):
            row = row + 1
            return 0 <= row <= 5
    
        def canLeft(self, row, col):
            col = col - 1
            return 0 <= col <= 5
    
        def canRight(self, row, col):
            col = col + 1
            return 0 <= col <= 5
    
        def initStateAction(self):
            """
            初始化每个状态可行动作空间
            """
            self.states_actions = dict()
            for state in self.states:
                self.states_actions[state] = []
                if state in self.terminate_states:
                    continue
                row, col = self.translateStateToRowCol(state)
                if self.canUp(row, col):
                    self.states_actions[state].append("up")
                if self.canDown(row, col):
                    self.states_actions[state].append("down")
                if self.canLeft(row, col):
                    self.states_actions[state].append('left')
                if self.canRight(row, col):
                    self.states_actions[state].append('right')
            return
    
    
        def initStatePolicyAction(self):
            """
            初始化每个状态的当前策略动作
            """
            self.states_policy_action = dict()
            for state in self.states:
                if state in self.terminate_states:
                    self.states_policy_action[state] = None
                else:
                    self.states_policy_action[state] = random.sample(self.states_actions[state], 1)[0]
            return
    
    
        def seed(self, seed = None):
            random.seed(seed)
            return [seed]
    
        def reset(self):
            """
            重置原始状态
            """
            self.current_state = random.sample(self.states, 1)[0]
    
        def step(self, action):
            """
            动作迭代函数
            """
            cur_state = self.current_state
            if cur_state in self.terminate_states:
                return cur_state, 0, True, {}
            row, col = self.translateStateToRowCol(cur_state)
            n_row, n_col = self.actionRowCol(row, col, action)
            next_state = self.translateRowColToState(n_row, n_col)
            self.current_state = next_state
            if next_state in self.terminate_states:
                return next_state, 0, True, {}
            else:
                return next_state, 0, False, {}
    
        def policy_evaluate(self):
            """
            策略评估过程 
            """
            error = 0.000001 #误差率
            for _ in range(1000):
                max_error = 0.0 #初始化最大误差
                for state in self.states:
                    if state in self.terminate_states:
                        continue
                    action = self.states_policy_action[state]
                    self.current_state = state
                    next_state, reward, isTerminate, info = self.step(action)
                    old_value = self.v_states[state]
                    self.v_states[state] = reward + self.gamma * self.v_states[next_state]
                    abs_error = abs(self.v_states[state] - old_value)
                    max_error = abs_error if abs_error > max_error else max_error #更新最大值
                if max_error < error:
                    break
    
    
        def policy_improve(self):
            """
            根据策略评估的结果,进行策略更新,并返回每个状态的当前策略是否发生了变化
            """
            changed = False
            for state in self.states:
                if state in self.terminate_states:
                    continue
                max_value_action = self.states_actions[state][0] #当前最大值行为
                max_value = -1000000000000.0 #当前最大回报 
                for action in self.states_actions[state]:
                    self.current_state = state
                    next_state, reward, isTerminate, info = self.step(action)
                    q_reward = reward + self.gamma * self.v_states[next_state]
                    if q_reward > max_value:
                        max_value_action = action
                        max_value = q_reward
                if self.states_policy_action[state] != max_value_action:
                    changed = True
                self.states_policy_action[state] = max_value_action
            return changed
    
    
    
    
    
        def createGrids(self):
            """
            创建网格
            """
            start_x = 40
            start_y = 40
            line_length = 40
            for state in self.states:
                row, col = self.translateStateToRowCol(state)
                x = start_x + col * line_length
                y = start_y + row * line_length
                line = rendering.Line((x, y), (x + line_length, y))
                line.set_color(0, 0, 0)
                self.viewer.add_onetime(line)
                line = rendering.Line((x, y), (x, y  + line_length))
                line.set_color(0, 0, 0)
                self.viewer.add_onetime(line)
                line = rendering.Line((x + line_length, y), (x + line_length, y  + line_length))
                line.set_color(0, 0, 0)
                self.viewer.add_onetime(line)
                line = rendering.Line((x, y + line_length), (x + line_length, y  + line_length))
                line.set_color(0, 0, 0)
                self.viewer.add_onetime(line)
    
        def createTraps(self):
            """
            创建陷阱,将黄金的位置也先绘制成陷阱,后面覆盖画成黄金
            """
            start_x = 40 
            start_y = 40
            line_length = 40
            for state in self.terminate_states:
                row, col = self.translateStateToRowCol(state)
                trap = rendering.make_circle(20)
                trans = rendering.Transform()
                trap.add_attr(trans)
                trap.set_color(0, 0, 0)
                trans.set_translation(start_x + line_length * col + 20, start_y + line_length * row + 20)
                self.viewer.add_onetime(trap)
    
        def createGold(self):
            """
            创建黄金
            """
            start_x = 40 
            start_y = 40
            line_length = 40
            state = 34
            row, col = self.translateStateToRowCol(state)
            gold = rendering.make_circle(20)
            trans = rendering.Transform()
            gold.add_attr(trans)
            gold.set_color(1, 0.9, 0)
            trans.set_translation(start_x + line_length * col + 20, start_y + line_length * row + 20)
            self.viewer.add_onetime(gold)
    
        def createRobot(self):
            """
            创建机器人
            """
            start_x = 40 
            start_y = 40
            line_length = 40
            row, col = self.translateStateToRowCol(self.current_state)
            robot = rendering.make_circle(15)
            trans = rendering.Transform()
            robot.add_attr(trans)
            robot.set_color(1, 0, 1)
            trans.set_translation(start_x + line_length * col + 20, start_y + line_length * row + 20)
            self.viewer.add_onetime(robot)
    
        def render(self, mode="human", close=False):
            """
            渲染整个场景
            """
            #关闭视图
            if close:
                if self.viewer is not None:
                    self.viewer.close()
                    self.viewer = None
    
            #视图的大小
            screen_width = 320
            screen_height = 320
    
    
            if self.viewer is None:
                self.viewer = rendering.Viewer(screen_width, screen_height)
    
            #创建网格
            self.createGrids()
            #创建陷阱
            self.createTraps()
            #创建黄金
            self.createGold()
            #创建机器人
            self.createRobot()
            return self.viewer.render(return_rgb_array= mode == 'rgb_array')
    注册环境模拟类到gym
    from gym.envs.registration import register
    try:
        register(id = "GridWorld-v3", entry_point=GridWorldEnv, max_episode_steps = 200, reward_threshold=100.0)
    except:
        pass
    进行策略迭代算法的过程和模拟动画的代码
    from time import sleep
    env = gym.make('GridWorld-v3')
    env.reset()
    
    #策略评估和策略改善 
    not_changed_count = 0
    for _ in range(10000):
        env.env.policy_evaluate()
        changed = env.env.policy_improve()
        if changed:
            not_changed_count = 0
        else:
            not_changed_count += 1
        if not_changed_count == 10: #超过10次策略没有再更新,说明策略已经稳定了
            break
    
    
    #观察env到底是个什么东西的打印信息。
    print(isinstance(env, GridWorldEnv))
    print(type(env))
    print(env.__dict__)
    print(isinstance(env.env, GridWorldEnv))
    
    env.reset()
    
    for _ in range(1000):
        env.render()
        if env.env.states_policy_action[env.env.current_state] is not None:
            observation,reward,done,info = env.step(env.env.states_policy_action[env.env.current_state])
        else:
            done = True
        print(_)
        if done:
            sleep(0.5)
            env.render()
            env.reset()
            print("reset")
        sleep(0.5)
    env.close()
    动画效果

    这里写图片描述

    展开全文
  • 在这篇文章中,我们比较了多种参数学习算法,采用了忽略或者近似partition function等策略。此外,我们还提出了一种估计partition function的算法。我们的实验表明:1)我们的学习算法比之前的隐依存森林模型学习算法...
  • 针对目前电力系统评估结果误差较大的问题,文中提出了一种基于PSO算法的电力系统状 态评估分析方法。首先对现有的电力系统状态评估分析方法进行分析,然后建立区域电力系统状 态评估分析模型,确定相应的评估原则。...
  • 2.1 策略评估 2.2 策略改进 2.3策略迭代 2.4值迭代 前言 在之前的学习中我们学习了有限MDP问题的基于模型的DP解法,也就是策略迭代法,但是很多东西不与实践代码相结合,总会觉得没有成就感,那么我们就将...

    提示:转载请注明出处,若文章无意侵犯到您的合法权益,请及时与作者联系。

    前言

    在之前的学习中我们学习了有限MDP问题的基于模型的DP解法,也就是策略迭代法,但是很多东西不与实践代码相结合,总会觉得没有成就感,那么我们就将之前介绍的算法通过举例并进行代码分析,注意本文并不会给出全部的可实现的代码,只会根据需要截取相应的部分进行讲解,但是本文会将自己参考的原文链接及代码挂出,供有需要的同学学习。


    本文参考的资料文章主要来源:强化学习基础篇: 策略迭代 (Policy Iteration)

    一、典型的方格世界问题说明

    1.1 强化学习的问题定义

           一个Agent环境不断进行交互,在每一个时间步长t中,环境提供当前状态s_{t}给Agent,Agent根据这个当前状态做出决策,这时Agent可能存在多个动作可选,Agent按照一定的策略采取动作a_{t},之后会得到环境反馈的立即回报R_{t},环境此时按照状态转移的概率分布进入了下一个时刻的后继状态,这个后继状态可能是自身也可能是其他状态。

    在以上过程中注意存在2个概率分布:

    一个是当前状态下要采取哪个动作,即以什么样的策略来决策,随机选择还是统一固定的,本质是一个概率分布问题。

    一个是转移到的后继状态将会是什么,即当前状态可以转移到哪些状态,转移到每个状态的概率是多少,本质上就是状态转移的概率分布。

    注意:有时还会提到初始状态分布,即初始时刻选择哪一个初始状态的问题。

    1.2 方格世界的经典MDP问题

    接下来我们给出一个标准的马尔科夫决策过程(MDP)问题来进行研究。假设我们有一个3 x 3的方格世界(GridWorld)作为环境,

    存在一个单元格叫做超级玛丽,它每个时间步长只可以往上、下、左、右四个方向移动,在方格世界中存在一个固定的单元格是宝藏,当超级玛丽找到宝藏则游戏结束,我们的目标是让超级玛丽以最快的速度找到宝藏

    我们将上面的描述进行马尔科夫决策过程(MDP)的描述:

    • 状态空间State:超级玛丽当前的单元格序号,按照从左到右、从上到下分别是0-8的9个离散值。
    • 动作空间Action: 上、下、左、右共计4个离散值
    • 动作概率\large \pi默认初始策略为在任何状态下都选择向下的动作
    • 状态转移概率P:默认采取某个动作后一定百分百转移到该方向的相邻单元格,如果相邻单元格不存在则转移到自身。
    • 折扣因子\large \gamma默认为0.9。
    • 立即回报Reward:本文认为该关系是已知的
      • 超级玛丽每移动一步,Reward = -1
      • 超级玛丽得到宝箱,Reward = 0并且游戏终止

    1.3 环境与Agent对象

    通过分析以上强化学习的基本定义,我们可以归纳出环境Agent所具备的基本特点:

    环境的特点:

    1. 环境要确定Agent的观测空间和动作空间。Agent采取的动作和观测到的状态都是离散的,这里我们可以使用一个两个数组来表示,一个是0-8,一个是0-3.
    2. 环境要根据状态转移概率分布确定后继状态。Agent采取动作后环境可以自己根据状态概率分布切换到相应的后继状态。这里的概率分布比较简单,例如只要Agent当前状态不是在最上层采取向上动作,则状态序号-3即可,依次类推。
    3. 环境要确定Agent获得的立即回报Reward。Agent采取某个动作进入后继状态后环境可以给出相应的立即回报Reward
    4. 环境要确定Agent终止交互的终止条件。Agent什么时候停止与环境的交互,这里只要判断Agent的后继状态是不是宝藏所在单元格即可。

    Agent的特点:

    1. Agent应当具备观测有限状态的能力。Agent并不一定掌握全部的环境状态信息,它只能通过观测掌握可以获得的有限的观测信息,能观测到的信息的程度取决于强化学习问题的难度。这里我们是研究一个已知模型问题,所以Agent可以观察到的状态的观测空间就是状态空间。
    2. Agent应当具备采取有限动作的能力。Agent只能采取有限的动作与环境进行交互且具备采取这些动作的能力,这些动作由环境决定。这里Agent采取动作后仅仅影响环境的后继状态,可以交给环境去完成,
    3. Agent应当具备进行自主决策的能力。它可以决定在当前状态下来判断下一时刻应该采取什么样的动作,采取有限的动作与环境进行交互。Agent选择每个动作的概率分布是会变化的,只有这样我们才能找到最佳策略,这里我们默认初始都采取向下的动作。这种让动作概率分布发生变化的过程就是我们通过策略迭代来完成的。
    4. Agent应当具备获取环境反馈的能力。Agent采取动作后应当可以获取环境响应的反馈,从而自己更新策略来学习新的知识。Agent进入下一个状态可以获取的环境反馈有状态转移概率,后继状态,立即回报和是否终止等信息。这些都是由环境决定的,Agent只需要访问环境的接口来获取这个信息即可。

    二、策略迭代分析

    在本文中我们将会把主要精力集中在研究Agent对象上,特别是它的策略迭代法,其他更多深入研究可参考来源资料中的讲解或者我的实践阶段的代码分析。

    2.1 策略评估

    直接给出之前的策略评估算法流程:

     

    上述评估算法输入一个策略policy,输出一个存储状态价值的数组v,算法中包含大循环来保证每个状态价值可以不断迭代到响应的精确度,在大循环里面存在一个遍历所有状态的循环、一个遍历所有动作的循环和一个遍历所有后继状态的循环,一共是一个四层嵌套循环结构。

    下面是其Python代码实现,这个方法是Agent类中的一个方法,env是其内部的环境类对象的映射。

    def policy_evaluation(self, policy):
            """策略评估"""
            V = np.zeros(self.env.nS)  # 使用一个含有9个0.的数组来存储9个状态的价值
            THETA = 0.0001 # 策略评估的精度
            delta = float("inf") # 当前迭代的差距量,初始化为正无穷
            
            while delta > THETA:
                delta = 0 # 当前迭代的差距量初始化为0
                for s in range(self.env.nS):# 遍历环境给出的所有的状态(即0-8)
                    expected_value = 0 #初始化当前状态价值为0
                    for action, action_prob in enumerate(policy[s]): # 获取当前状态可采取的所有动作(上下左右)及其采取对应动作的概率
                        # 遍历采取动作后的所有可能转移的后继状态
                        # 在当前状态采取当前动作后得到环境反馈的后继状态转移概率,后继状态,立即回报,是否终止
                        for prob, next_state, reward, done in self.env.P[s][action]: 
                            expected_value += action_prob * prob * (reward + DISCOUNT_FACTOR * V[next_state])#贝尔曼期望方程
                    delta = max(delta, np.abs(V[s] - expected_value)) # 本次迭代后价值变化的差异
                    V[s] = expected_value  #保存当前状态的价值
            
            return V

    以上需要特别说明的是输入的policy是一个9行4列的状态*动作的矩阵,这个矩阵初始状态中第2列都为1,其余都为0,表示向下的动作概率为1,其余动作的概率为0。

    另外其中调用的self.env.P[s][a]是一个二维列表,它可以返回指定状态和动作对应的所有后继状态及其被选中的概率、立即回报和是否终结等信息,这个功能由环境类实现,我们这里不做分析。

    2.2 策略改进

    策略改进这里就是根据状态价值计算出当前动作价值最大的动作:

    def next_best_action(self, s, V):
            """下一个最好的动作"""
            action_values = np.zeros(env.nA)# 动作价值
            for a in range(env.nA):
                for prob, next_state, reward, done in self.env.P[s][a]:
                    action_values[a] += prob * (reward + DISCOUNT_FACTOR * V[next_state])
            return np.argmax(action_values), np.max(action_values) #返回最大动作状态的动作索引及其动作价值

    2.3 策略迭代

    下面直接给出策略迭代的算法流程:

    相应的代码实现如下:

    def optimize(self):
            """策略优化"""
            policy = np.tile(np.eye(self.env.nA)[1], (self.env.nS, 1)) #9行4列的矩阵,状态*动作,向下的动作都为1,其余为0
    
            is_stable = False
            
            round_num = 0 #循环次数
            
            while not is_stable:
                is_stable = True
                
                print("\nRound Number:" + str(round_num))
                round_num += 1
                
                print("Current Policy")
                print(np.reshape([env.get_action_name(entry) for entry in [np.argmax(policy[s]) for s in range(self.env.nS)]], self.env.shape))
                # 进行策略评估
                V = self.policy_evaluation(policy)
                print("Expected Value accoridng to Policy Evaluation")
                print(np.reshape(V, self.env.shape))
                
                # 遍历所有的状态
                for s in range(self.env.nS):
                    action_by_policy = np.argmax(policy[s]) #获取动作概率最大的动作序号
                    best_action, best_action_value = self.next_best_action(s, V) # 获取当前状态中动作价值最大的动作序号及其动作价值
                    # print("\nstate=" + str(s) + " action=" + str(best_action))
                    policy[s] = np.eye(self.env.nA)[best_action] #替换原先的动作选取策略
                    if action_by_policy != best_action: #如果当前动作不是最优的就继续迭代
                        is_stable = False
                
            policy = [np.argmax(policy[s]) for s in range(self.env.nS)] #返回最优策略:含有9个表示最优动作序号的数组
            return policy

    在上述代码中可以看到,在每一轮策略迭代中都分为策略评估和策略改进两部分。

    策略评估计算出每个状态的价值后,策略改进部分再遍历所有的状态,计算每个状态下,动作价值最好的动作,然后按照贪婪的策略,将当前最好的策略替换掉原先的动作策略,指导所有状态的动作策略不再变化时,策略迭代就结束了。

    2.4 值迭代

    下面直接给出值迭代的算法流程:

     相应的代码实现如下:

    def optimize(self):
            THETA = 0.0001
            delta = float("inf")
            round_num = 0
    
            while delta > THETA:
                delta = 0
                print("\nValue Iteration: Round " + str(round_num))
                print(np.reshape(self.V, env.shape))
                for s in range(env.nS):
                    best_action, best_action_value = self.next_best_action(s, self.V)
                    delta = max(delta, np.abs(best_action_value - self.V[s]))
                    self.V[s] = best_action_value
                round_num += 1
    
            policy = np.zeros(env.nS)
            for s in range(env.nS):
                best_action, best_action_value = self.next_best_action(s, self.V)
                policy[s] = best_action
    
            return policy

     上述代码中直接使用当前状态下的动作价值最大的动作来代替状态价值,这样子的好处就是可以减少遍历状态空间的次数,但是相应的收敛速度就会变慢。


    展开全文
  • 同时在策略评估方面提出XACML合并算法的优先级及主体的规则索引表, 优先选择符合匹配条件的策略和规则来提高匹配速度。仿真实验验证了采取这些措施后, 缩短了PDP 进行评估的时间, 提高了评估效率。
  • 针对常见分类算法在全局和局部区域性能不一致的问题,提出了双层分类策略及其实现算法。双层分类策略的思想是离线地建立全局分类器,当全局分类器决策信用度低于指定阈值时,在线生成局部分类器进行决策修正。实现...
  • 弹道导弹防御系统中制定防御计划,确定最佳部署策略都需要实时、准确地评估TBM的威胁程度,依据攻防双方的能力及武器的特性,采用了多属性决策理论中的TOPSIS方法,提出了战术弹道导弹威胁等级评估算法,建立了导弹...
  • 为了以非常快的收敛速度在整个搜索空间中搜索全局最优,我们提出了一种多策略自适应粒子群优化算法(MAPSO)。 MAPSO开发了一种创新的多样性测量策略,以评估人口分布,并执行实时交替策略,以在每次迭代中确定两个...
  • 算法设计了对成熟检测器群体进行有效性评估的方法,由检测器的有效性确定克隆规模。理论证明了该算法的收敛性。实验表明,与传统的动态克隆选择算法相比,该算法提高了检测率,有效抑制了误报率,改善了算法的适应性...
  • 统计学习的三个招式:模型、策略算法 https://mp.weixin.qq.com/s/12yhAZ79i_ENAdtyOX63lQ  李航老师在统计学习方法中讲到:方法=模型+策略+算法  可以说模型、策略算法是统计学习的三个重要因素,确定了三...
  • 首先,利用高斯变异和存储机制对传统萤火虫算法进行优化,高斯变异可以有效控制算法搜索空间中解的散射程度,使算法避免陷入局部最优,存储机制有利于评估并存储每只萤火虫的历史状态,防止信息丢失。然后,将改进后...
  • 针对聚类过程中,由于类心选取的随机性导致所选类心偏离数据集,或者类心过于集中而带来的错误聚类...与其他三种现有的自动聚类算法的比较结果表明,所提算法能够获得更好的聚类结果,从而验证了算法所提策略的有效性。
  • 策略评估算法: 最小二乘政策评估/ LSPE(λ)(Nedic和Bertsekas,2003年) 最小二乘时差学习/ LSTD(λ)(Nedic&Bertsekas,2003; Boyan,2002) 环境: 具有标准功能的Tetris基准(请参阅Bertsekas
  • 为改善遗传算法的优化性能,保持种群的多样性,提出了基因丢失检测及修复策略和基于海明距离的动态交叉、变异算子。经标准测试函数评估,与标准遗传算法相比,改进算法在收敛速度和收敛率上都有较大幅度的提升。在...
  • 针对传统免疫算法在网络故障检测中存在的稳定性低...给出依据评估结果对记忆检测器实施分级的策略,对各级别的检测器子群体采用不同的进化策略。实验结果表明,与传统算法相比,该算法的稳定性和检测性能都有一定改善。
  • 算法(逻辑)优劣评估方法

    千次阅读 2019-05-28 17:24:21
    下面列举几个通常衡量算法性能的维度,并逐一分析实现的策略。其实所有的有逻辑的代码都可以认为是算法,并不单单指狭义的数学概念中的“算法”,我这里称之为侠义的代码质量。其实代码质量还要包含编程风格等因素,...
  • RL(十四)深度确定性策略梯度算法

    千次阅读 2019-08-26 22:42:19
    前面我们讲了深度强化学习,虽然强化学习和深度学习得到了一个很好的结合,但是他们在实际的使用中还是有一些...在2014年首次提出了确定性策略梯度算法,并证明了该算法对连续动作任务的有效性。该算法策略梯度...
  • 数学建模国赛优秀论文集锦-2020D:基于梯度下降的决策树算法与非线性规划的信贷风险评估与信贷策略模型
  • 在保障本地任务QoS需求及尊重网格节点的本地调度策略的条件下,提出了一种资源动态性能评估方法,并得出了2个资源动态性能的评估尺度。基于网格资源可用性的相关结果,利用概率法得出网格节点服务于网格任务的平均...
  • 现有的负载均衡策略均是以 CPU、内存、进程等参数的占用率来评估服务器当前的负载情况,但服务器负载情况的复杂性往往使其难以得到准确评估。针对该问题,提出了一种基于排队论综合指标评估的动态负载均衡算法,首先...
  • 用OptorSim进行了实验,结果表明,与遗传算法(GA),蚁群优化(ACO)和粒子群优化相比,基于QEA的副本创建策略可以有效地减少作业响应时间和网络带宽消耗。 (PSO)算法。 特别地,随着工作数量的增加,其性能变得...
  • 其灵感来源于世界传播的灾难性的新冠病毒,模仿了群体免疫策略和社会距离概念。三种类型的个体病例用于群体免疫:易感、感染和免疫。 本文研究了CHIO对其参数的敏感性。然后,CHIO使用23个著名的基准测试函数进行...
  • EM 迭代在执行期望 (E) 步骤和最大化 (M) 步骤之间交替执行,该步骤为使用参数的当前估计评估的对数似然的期望创建函数,以及计算最大化预期对数的参数 -在 E 步骤中找到的可能性。 然后使用这些参数估计来确定下一...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 78,411
精华内容 31,364
关键字:

策略评估算法