REINFORCE算法实现“CartPole-v1”游戏

“CartPole-v1”是 Gym 中一个控制类的游戏,如图 1 左所示:一根杆子的一端连接在一个小车上,由于重力的原因,杆子会发生倾斜,当杆子倾斜到一定程度后就会倒下。游戏的任务就是通过左右移动底部的小车来保持杆子竖直向上,一旦杆子倒下,游戏就结束了。
“CartPole-v1”游戏示意图
图 1:“CartPole-v1”游戏示意图

“CartPole-v1”游戏的环境状态由四部分组成,如图 1 右侧表格所示。游戏中小车只有两个动作可以做,分别是向左移动和向右移动(其值分别为 0 和 1)。每做一次动作,如果杆子没有倒下,则得到奖励值 1。

根据蒙特卡罗策略梯度(Monte Carlo Policy Gradient)算法,我们需要一个可微分的策略函数 πθ(a|s)。“CartPole-v1”游戏只有两个动作,可以简单地把它看成一个二分类问题,因此我们选择最简单的 Logistic 函数作为策略函数:

S(x)=1/(1+e-x


Logistic 函数只有一个变量 x,“CartPole-v1”的游戏环境由 4 个因素组成,如果我们全部考虑在内的话,那么策略函数应当至少有 4 个参数,分别定义为 θ:p1,p2,p3,p4,假设环境的 4 个因素分别为 λ1,λ2,λ3,λ4,则令 Logistic 函数中的变量 x 为

x=p1×λ1+p2×λ2+p3×λ3+p4×λ4


选定策略函数之后,就可以实现使用策略来做动作选择的代码:
def policy_function(observation, theta):
    #根据当前的状态值和策略函数的参数值计算策略函数的输出
    x = np.dot(theta, observation)
    s = 1 / (1 + np.exp(-x))
    #根据策略函数的输出进行动作选择
    if s > 0.5:
        action = 1
    else :
        action = 0
    return s, action

在第 3 行和第 4 行代码中,根据环境信息(小车的位置、小车移动速度、杆子的倾斜角度及杆子顶端的运动速度)和策略函数的参数值计算得到了策略函数的输出值(由于选择了 Logistic 函数作为策略函数,所以其值域为 (0, 1)),当策略函数的输出值大于 0.5 时,选择向右移动的动作,否则向左移动。

现在我们有了策略函数,也实现了由策略函数选择动作的代码,接下来需要考虑如何使用梯度的方式对策略函数的参数进行更新。根据蒙特卡罗策略梯度算法的描述,要实现对参数的更新,我们需要有一个完整的游戏“情节”(Episode)。

定义一个获取“episode”的函数:
def generate_an_episode(env,theta):
#定义一个数组用来保存整段“情节”
    episode =[]
    #重置游戏环境
    pre_observation = env.reset()
    #定义一个变量,用来统计采取动作的次数
    count_action = 0
    #产生一个Episode
    while True:
        #根据策略选择动作
        s, action = policy_function(theta, pre_observation)
        #执行选择的动作并得到反馈信息(新的环境状态、奖励等)
        observation, reward, done, info = env.step(action)
        #保存这一步的信息
        episode.append([pre_observation, action, s, reward])
        #更新环境状态
        pre_observation = observation
        #累计执行的动作,如果智能体执行了超过5000次动作后,杆子还没倒下的话,则主动结束游戏
        count_action += 1
        if done or count_action > 5000:
            break
    return episode

第 3 行代码定义了一个数组“episode”,用来保存智能体在玩游戏时产生的整段“episode”中的每一步的信息。从第 13 行代码可以看到,“episode”数组的每一个元素又是一个长度为 4 的数组,其元素分别为当前的环境状态信息、当前环境状态下所采取的动作、策略函数的输出值,以及当前环境状态下执行了根据策略选择的动作后得到的奖励值。除了动作,其余的数据我们在后面更新策略函数的参数的时候都会用到。

在第 7 行代码中,我们定义了一个变量用来累积执行动作的次数,目的是让游戏能在有限的时间内结束。在第 20 行代码中,我们将这个值确定为 5000,即如果智能体执行了 5000 次动作后,杆子还没有倒下,那么就主动结束游戏,因为这时候智能体已经能够把“CartPole-v1”游戏玩得很好了。

第 11 行代码是将当前的环境状态以当前的策略函数的参数代入策略函数中,由策略来选择下一个要执行的动作。接着第 13 行代码执行了这个动作,并在第 17 行代码更新了环境的状态信息,继续游戏。

接下来实现算法的主体部分,即使用梯度上升的方法优化策略函数的参数 θ,使得智能体得到的累积奖励值更大。
def monte_carlo_policy_gradient(env):
#初始化参数(学习率和折扣因子)
    learning_rate = -0.0002
    discount_factor = 0.95

    #随机初始化策略函数的参数theta
    theta = np.random.rand(4)

    #让智能体玩2000个回合
    for i in range(2000):
        #生成一条完整的游戏情节Episode
        episode = generate_an_episode(env, theta)
        #使用梯度上升的方法优化策略函数的参数
        for t in range(len(episode)):
            observation, action, s, reward = episode[t]
            #根据蒙特卡罗策略梯度算法中的公式更新参数theta
            theta += learning_rate * discount_factor ** t * reward *\
                s * (1 - s) * (-observation)
    #测试策略的性能
    reward = test(env, theta)
    print('Total reward: ' , reward)

首先初始化学习率和折扣因子,然后随机初始化了策略函数的参数 θ。这里我们一共让智能体玩了 2000 个回合的游戏,每一个回合的游戏都会生成一条完整的游戏情节 Episode。从第 14 行到第 18 行代码,利用这 2000 个 Episode 对策略函数的参数进行优化。核心在于第 18 行代码中的“ s*(1− s) * ( −observation) ”是策略函数对参数 θ 的导数的计算结果。

到这里,实现蒙特卡罗策略梯度算法玩“CartPole-v1”游戏的主要代码都写完了,我们想要知道实际的效果怎么样,所以在第 20 行代码中,我们调用了一个测试函数:
def test(env, theta):
    #重置游戏环境
    observation = env.reset()
    total_reward = 0.

    #智能体最多执行3000个动作(即奖励值达到3000后就结束游戏)
    for i in range(3000):
        #可视化游戏画面(重绘一帧画面)
        env.render()
        #使用策略函数选择下一个动作
        s, action = policy_function(theta, observation)
        #执行动作
        observation, reward, done, info = env.step(action)
        #计算累积奖励
        total_reward += reward

        if done:
            break
    return total reward

最后定义一个程序的入口“main”函数:
if __name__ == "__main__":
    #注册游戏环境
    game_env = gym.make(* CartPole-vl')
    #取消限制
    game_env = game_env.unwrapped
    #让智能体开始学习玩"CArtPole-vl”游戏
    monte_carlo_policy_gradient(game_env)
第 5 行代码中“unwrapped”的目的是取消 Gym 中对于游戏的各种限制,例如,在“CartPole-v1”游戏中,如果不加这一行代码,那么智能体最多只能执行 500 个动作(即奖励值最高为 500),超过这个值后,游戏就自动结束。