DQN算法实现“CartPole”游戏—DRL实战项目

DQN 算法在前面已经介绍过,这里使用 DQN 算法来玩“CartPole”游戏。

首先实现 DQN 中的基本网络模型:
import gym
import time
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import optimizers

class Model(tf.keras.Model):
    def __init__(self, num_actions):
        super().__init__(name='q_network')
        self. fc1=layers.Dense(32, activation= 'relu', kernel_initializer='he_uniform')
        self. fc2=layers.Dense(32, activation= 'relu', kernel_initializer='he_uniform')
        self.logits=layers.Dense(num_actions, name= 'q_values')

    def call(self, inputs):
        x = self.fc1(inputs)
        x = self.fc2(x)
        x = self.logits(x)
        return x

    def action_value(self, obs):
        q_values = self.predict(obs)
        best_action = np.argmax(q_values, axis=-1)
        return best_action[0], q_values[0]
前面介绍过,DQN 中的 Q 网络和 Target-Q 网络必须使用相同的网络结构,这里使用的是一个简单的三层全连接神经网络:输出层的神经元个数为“num_actions”,即游戏环境中的总动作数,“CartPole”游戏中只有“向左”和“向右”两个动作。模型输出的是 Q(s, a)  值。

接下来实现 Agent 部分:
class DQNAgent:
    def  __init__(self, model, target_model, env, buffer_size=100,
                  learning_rate=.00001,epsilon=.1,
                  epsilon_dacay=0.995,min_epsilon=.01, 
                  gamma=.95, batch_size=4, target_update_iter=400,
                  train_nums=20000, start_learning=10):
        self.model = model
        self.target_model =target_model
        self.model.compile(optimizer=optimizers.Adam(), loss='mse')
        self.env = env         # gym环境
        self.lr = learning_rate   #学习率
        self.epsilon = epsilon   # epsilon-greedy
        self.epsilon_decay = epsilon_dacay      # epsilon衰减因子
        self.min_epsilon = min_epsilon      # epsilon最小值
        self.gamma = gamma     #折扣因子
        self.batch_size = batch_size    # batch_size
        self.target_update_iter = target_update_iter  #目标网络参数的更新周期
        self.train_nums = train_nums   #总训练步数
        self.num_in_buffer = 0     #经验池中已经保存的经验数
        self.buffer_size = buffer_size  #经验池的大小
        self.start_learning = start_learning #开始训练之前要先确保经验池中有一定量数据

        # 经验池参数[(s, a, r, ns, done),...]
        self.obs=np.empty((self.buffer_size,)+self.env.reset().shape)
        self.actions = np.empty(self.buffer_size, dtype=np.int8)
        self.rewards=np.empty(self.buffer_size,dtype=np.float32)
        self.dones = np.empty(self.buffer_size, dtype=np.bool)
        self.next_states=np.empty((self.buffer_size,)+self.env.reset().shape)
        self.next_idx = 0

“__init__”方法主要用于初始化模型的参数,接下来实现模型的训练部分:
def train(self):
    """模型训练"""
    #初始化环境信息
    obs = self.env.reset()
    for t in range(1, self.train_nums):
    best_action, q_values=self.model.action_value(obs[None])
        #采取ε贪心策略对环境进行探索,得到最终执行的动作
        action = self.get_action(best_action)
        #执行动作,获取反馈信息
        next_obs, reward, done, info = self.env.step(action)
        #将经验保存到经验池
        self.store_transition(obs,action,reward,next_obs,done)
        self.num_in_buffer = min(self.num_in_buffer + 1, self.buffer_size)

        #开始学习
        if t > self.start_learning:
            losses = self.train_step()
            if t % 1000 == 0:
                print('losses each 1000 steps:', losses)

        if t % self.targe_update_iter == 0 :
            #更新目标网络的参数
            self.update_target_model()
        if done:
            obs = self.env.reset()
        else:
            obs =next_obs 
第 5 行的“train_nums”是我们总共要训练的游戏步数,训练开始后首先基于 Q 网络在环境中采样,这里我们使用了 ε 贪心策略对环境进行探索。采样得到的结果保存到经验池中,然后通过经验回放来训练模型。

在第 6 行代码中,“obs[None]”会将一维的状态向量 obs 转换为二维(None,4)。在第 21 行代码中,每隔“target_update_iter”步后用 Q 网络的参数更新 Target-Q 网络。

经验回放的过程如下:
def train_step(self):
    """逐步训练,经验回放"""
    idxes = self.replay_transition(self.batch_size)
    s_batch = self.obs[idxes]
    a_batch = self.actions[idxes]
    r_batch = self.rewards[idxes]
    ns_batch = self.next_states[idxes]
    done_batch = self.dones[idxes]

    #使用Target-Q网络计算目标值
    target_q=r_batch+self.gamma*np.amax(self.get_target_value(ns_batch), axis=l) * (1 - done_batch)
    #使用Q网络产生预测值
    q_value = self.model.predict(s_batch)
    for i, val in enumerate(a_batch):
        q_value [i][val] = target_q[i]

    #使用train_on_batch方法进行训练
    losses = self.model.train_on_batch(s_batch, q_value)

    return losses
首先从经验池中随机挑选“batch_size”个样本,然后第 11 行代码是根据算法计算目标值 yj 的。因为我们的目标是让 Q 网络逼近 Target-Q 值,所以将 Target-Q 值作为训练样本的类标。

接下来实现经验的存储和回放:
def store_transition(self, obs, action, reward, next_state, done):
    """存储经验"""
    n_idx = self.next_idx % self.buffer_size
    self.obs[n_idx] = obs
    self.actions[n_idx] =action
    self.rewards[n_idx] = reward
    self.next_states[n_idx] = next_state
    self.dones[n_idx] = done
    self.next_idx =(self.next_idx + 1) % self.buffer_size

def replay_transition(self, n):
    """经验回放"""
    assert n < self.num_in_buffer
    res =[]
    while True:
        num = np.random.randint(0, self.num_in_buffer)
        if num not in res:
            res.append(num)
        if len(res) == n:
            break
    return res

最后是辅助函数:
def get_action(self, best_action):
    """epsilon-greedy"""
    if np.random.rand() < self.epsilon:
        return self.env.action_space.sample()
    return best_action

def update_target_model(self):
    #将Q网络的参数复制给目标Q网络
    self.target_model.set_weights(self.model.get_weights())

def get_target_value(self, obs):
    return self.target_model.predict(obs)

def e_decay(self):
    self.epsilon *= self.epsilon_decay
“get_action”采用 ε 贪心策略对环境进行探索,选择最终要执行的动作。“update_target_model”函数用 Q 网络的参数来更新 Target-Q 网络。“get_target_value”函数是使用 Target-Q 网络计算 Q 值的。“e_decay”函数用来动态地减小 ε 贪心策略中“ε”的值,随着模型的训练,逐渐降低对环境探索的概率。