Flappy Bird是一款简单操作的手机游戏,在游戏中有一只飞翔的小鸟,在飞行中会遇到管道障碍物,玩家需要操控小鸟往上飞,飞行过程中不能坠地也不能触碰障碍物,不断的实行动作会飞的越来越高;如果不采取飞行动作,则会快速下降。因此玩家要使用合适的策略控制小鸟飞行,使小鸟不会坠地同时能顺利地穿越障碍物。本案例使用强化学习算法DQN训练智能体,使其最终学会玩Flappy Bird游戏。
Flappy Bird是一款火热一时的手机游戏,游戏中玩家需要操控一只小鸟飞行,使小鸟跨越管道所组成的障碍,执行Flappy会使小鸟飞得越来越高,不进行任何动作,则会快速下降。游戏的目标是使小鸟顺利地通过管道,不触碰管道或地面。
通过定义四元组即可将上述问题转化为MDP问题,MDP四元组包括:状态空间$S$,动作空间$A$,奖励$R$以及转移关系$P$。在Flappy Bird中四元组的具体含义如下:
$80\times80\times4$的RGB图像,因为状态空间过大,用Q表决策会占用大量空间,因此需要采用值函数近似法。
小鸟可执行的动作有两个,即向上飞或不操作,因此可定义动作空间为2维,[1,0]表示不进行向上飞动作,小鸟自由下坠;[0,1]表示小鸟向上运动。
DQN(Deep Q Network)是一种将深度学习和Q-learning相结合的强化学习算法,它适合状态数量很大,动作数量较少的问题(例如Atari游戏),它的核心思想是使用深度神经网络近似Q值。
DQN中的值函数模型为卷积神经网络,为提高计算效率,卷积神经网络的输入为状态$𝑠$,输出为$𝑠$下每一个动作$𝑎$对应的Q值。
2013年的论文《Playing Atari with Deep Reinforcement Learning》中,研究者训练DQN算法玩Atari游戏,结果发现DQN算法在6个游戏上的表现超过了之前的所有方法(例如Sarsa),在3个游戏上的表现超过了人类玩家,这表明DQN算法有很高的应用价值。
但是在使用神经网络近似Q值时,强化学习过程并不稳定甚至会发散,主要原因有:
针对上述问题,后续学者对DQN做出了一些改进,具体改进包括:
经验回放方法引入样本缓存区(replay buffer)的概念,其含义是与环境交互时,将产生的数据存储在缓存区中,然后均匀地随机采样一批数据提供给神经网络进行训练,若缓存区数据已满,用新数据覆盖最旧的数据。
目标网络的核心思想是减轻与预测Q值的参数联系。由于目标Q值和预测Q值之间存在参数联系,每次更新的目标都是固定上次更新的参数得来的,优化目标跟着优化过程一直在变,迭代难以收敛:
引入一个与原始网络结构一样的模型,称为目标网络(target network),原模型称为表现网络(behavior network),目标网络计算Q-learning中的目标Q值,每当完成一定轮数迭代,表现网络的参数同步给目标网络。
DQN经过不断的发展,不断取得进步,2015年在学术杂志《Nature》上发表的一篇论文中《Human-level control through deep reinforcement learning》,研究者训练DQN玩了50多种Atari游戏,DQN几乎在所有游戏的表现上都高于之前的强化学习方法,并且对于半数以上的游戏,DQN的表现已经超越了人类。
2.for episode = 1,……,M:
(2) for t = 1,……,T:
end for
# 导入相关的库
import pygame
import sys
import numpy as np
import random
import pygame.surfarray as surfarray
from pygame.locals import *
from itertools import cycle
import tensorflow as tf
import cv2
设置加载函数,将所需要的资源,包括小鸟、管道、背景的图片加载到程序中,方便后续使用。
def load():
# 加载三种小鸟的动作图片——上中下
# 应用的图片可在网页中下载
PLAYER_PATH = (
'redbird-upflap.jpg',
'redbird-midflap.jpg',
'redbird-downflap.jpg'
)
# 加载整体背景为黑色,此处为最底端的背景板
BACKGROUND_PATH = 'background-black.jpg'
# 加载管道图片
PIPE_PATH = 'pipe-green.jpg'
#图像,声音,撞击的
IMAGES, SOUNDS, HITMASKS = {}, {}, {}
# 显示分数的数字图片
IMAGES['numbers'] = (
pygame.image.load('0.jpg').convert_alpha(), # convert_alpha()方法会使用透明方法绘制前景对象
pygame.image.load('1.jpg').convert_alpha(),
pygame.image.load('2.jpg').convert_alpha(),
pygame.image.load('3.jpg').convert_alpha(),
pygame.image.load('4.jpg').convert_alpha(),
pygame.image.load('5.jpg').convert_alpha(),
pygame.image.load('6.jpg').convert_alpha(),
pygame.image.load('7.jpg').convert_alpha(),
pygame.image.load('8.jpg').convert_alpha(),
pygame.image.load('9.jpg').convert_alpha()
)
# 游戏的基础背景
IMAGES['base'] = pygame.image.load('base.jpg').convert_alpha()
# 加载背景
IMAGES['background'] = pygame.image.load(BACKGROUND_PATH).convert()
# 加载小鸟动作图片
IMAGES['player'] = (
pygame.image.load(PLAYER_PATH[0]).convert_alpha(),
pygame.image.load(PLAYER_PATH[1]).convert_alpha(),
pygame.image.load(PLAYER_PATH[2]).convert_alpha(),
)
# 加载管道,一对相对的上下管道
IMAGES['pipe'] = (
pygame.transform.rotate(pygame.image.load(PIPE_PATH).convert_alpha(), 180),
pygame.image.load(PIPE_PATH).convert_alpha(),
)
# 得到管道边界的坐标
HITMASKS['pipe'] = (
getHitmask(IMAGES['pipe'][0]),
getHitmask(IMAGES['pipe'][1]),
)
# 得到小鸟边界的坐标
HITMASKS['player'] = (
getHitmask(IMAGES['player'][0]),
getHitmask(IMAGES['player'][1]),
getHitmask(IMAGES['player'][2]),
)
return IMAGES, SOUNDS, HITMASKS
定义获得图片边界的函数,获得的边界均为横纵坐标组成的列表,将这些值保存下来,方便判定小鸟是否撞击到管道或者边界。
def getHitmask(image):
"""返回图片的边界坐标值"""
# mask用来存储图片的边界坐标列表
mask = []
# 从左到右,从上到下将坐标依次保存到mask中
for x in range(image.get_width()):
mask.append([])
for y in range(image.get_height()):
mask[x].append(bool(image.get_at((x,y))[3]))
# 返回边界坐标列表
return mask
# 游戏帧数为30
FPS = 30
# 游戏屏幕宽度
SCREENWIDTH = 288
# 游戏屏幕高度
SCREENHEIGHT = 512
# 设置一对上下管道之间的间隔
PIPEGAPSIZE = 100
# 游戏基础背景的高度,以左上角为坐标起始点
BASEY = SCREENHEIGHT * 0.79
# #初始化pygame
pygame.init()
# pygame.time.Clock对象帮助确定程序要以多大的帧速率运行,这个对象在游戏每一次迭代都会设置一个暂停,以防止程序运行过快,
# 有时候计算机的速度过快,就可以利用这个对象来让计算机在一个固定的速度运行
FPSCLOCK = pygame.time.Clock()
# 在没有屏幕的情况下训练需要显示的程序
import os
os.environ['SDL_VIDEODRIVER'] = 'dummy'
# 初始化屏幕
SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
# 设置窗口的标题
pygame.display.set_caption('Flappy Bird')
# 调用load函数,加载所需要的图片、边界
IMAGES, SOUNDS, HITMASKS = load()
# 记录小鸟的宽度与高度
PLAYER_WIDTH = IMAGES['player'][0].get_width()
PLAYER_HEIGHT = IMAGES['player'][0].get_height()
# 记录管道的宽度和高度
PIPE_WIDTH = IMAGES['pipe'][0].get_width()
PIPE_HEIGHT = IMAGES['pipe'][0].get_height()
# 记录背景的宽度和宽度
BACKGROUND_WIDTH = IMAGES['background'].get_width()
BACKGROUND_HEIGTH = IMAGES['background'].get_height()
# 创建迭代对象,小鸟状态的循环,0表示下落,1表示平飞,2表示向上
PLAYER_INDEX_GEN = cycle([0, 1, 2, 1])
def getRandomPipe():
# 设置上下管道的间距差
gapYs = [20, 30, 40, 50, 60, 70, 80, 90]
# 随机抽取一个间距差设置为新出现管道的基本参数
index = random.randint(0, len(gapYs)-1)
gapY = gapYs[index]
# 计算管道高度
gapY += int(BASEY * 0.2)
# 将新建管道的横坐标设置为屏幕宽度之后,在下一次刷新屏幕即可出现
pipeX = SCREENWIDTH + 10
# 返回一对新建管道
return [
{'x': pipeX, 'y': gapY - PIPE_HEIGHT}, # 上管道
{'x': pipeX, 'y': gapY + PIPEGAPSIZE}, # 下管道
]
def checkCrash(player, upperPipes, lowerPipes):
# 小鸟图片的宽度和高度
pi = player['index']
player['w'] = IMAGES['player'][0].get_width()
player['h'] = IMAGES['player'][0].get_height()
# 判断是否坠落到地板
if player['y'] + player['h'] >= BASEY - 1:
return True
else:
playerRect = pygame.Rect(player['x'], player['y'],
player['w'], player['h'])
# 对于所有的上、下管道都进行判断
for uPipe, lPipe in zip(upperPipes, lowerPipes):
# 上、下管道的基本数值的赋值
uPipeRect = pygame.Rect(uPipe['x'], uPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)
lPipeRect = pygame.Rect(lPipe['x'], lPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)
# 小鸟和管道的边界设置
pHitMask = HITMASKS['player'][pi]
uHitmask = HITMASKS['pipe'][0]
lHitmask = HITMASKS['pipe'][1]
# 检查是否碰撞到上管道或者下管道
uCollide = pixelCollision(playerRect, uPipeRect, pHitMask, uHitmask)
lCollide = pixelCollision(playerRect, lPipeRect, pHitMask, lHitmask)
# 若uCollide或者lCollide为真,则为碰撞
if uCollide or lCollide:
return True
return False
def pixelCollision(rect1, rect2, hitmask1, hitmask2):
rect = rect1.clip(rect2)
# 判断是否传入准确数据
if rect.width == 0 or rect.height == 0:
return False
# 判断两个边界是否有碰撞
x1, y1 = rect.x - rect1.x, rect.y - rect1.y
x2, y2 = rect.x - rect2.x, rect.y - rect2.y
for x in range(rect.width):
for y in range(rect.height):
if hitmask1[x1+x][y1+y] and hitmask2[x2+x][y2+y]:
return True
return False
class GameState:
# 初始化游戏状态
def __init__(self):
# 初始化分数,index与迭代次数均为0
self.score = 0
self.playerIndex = 0
self.loopIter = 0
# 初始化玩家坐标,每次重新生成的坐标均为固定值
self.playerx = int(SCREENWIDTH * 0.2)
self.playery = int((SCREENHEIGHT - PLAYER_HEIGHT) / 2)
# 初始化基础背景的横坐标
self.basex = 0
self.baseShift = IMAGES['base'].get_width() - BACKGROUND_WIDTH
# 随机生成两个管道
newPipe1 = getRandomPipe()
newPipe2 = getRandomPipe()
# 设置上、下管道的坐标
self.upperPipes = [
{'x': SCREENWIDTH, 'y': newPipe1[0]['y']},
{'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[0]['y']},
]
self.lowerPipes = [
{'x': SCREENWIDTH, 'y': newPipe1[1]['y']},
{'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[1]['y']},
]
# 设置一些基本量,包括玩家速度、最大速度,下降加速度以及上升加速度
# 注意向下速度为正
# 游戏屏幕横移速度
self.pipeVelX = -4
# 初始速度
self.playerVelY = 0
# 下降最大速度
self.playerMaxVelY = 10
# 上升最大速度
self.playerMinVelY = -8
# 自然下降的加速度
self.playerAccY = 1
# 上升加速度
self.playerFlapAcc = -9
# 判断是否摆动翅膀(flap)
self.playerFlapped = False
# 设置进行一步游戏,与gym库中的step作用相似
def frame_step(self, input_actions):
pygame.event.pump()
# 每一帧的基础奖励为0.1
reward = 0.1
# 游戏终止状态设置为False
terminal = False
# 输入应符合游戏标准,不能一次输入多个动作
if sum(input_actions) != 1:
raise ValueError('Multiple input actions!')
# input_actions[0] == 1: 什么都不做
# input_actions[1] == 1: flap
if input_actions[1] == 1:
# 小鸟的位置不能超出边界
if self.playery > -2 * PLAYER_HEIGHT:
self.playerVelY = self.playerFlapAcc
self.playerFlapped = True
# 检查得分情况
playerMidPos = self.playerx + PLAYER_WIDTH / 2
for pipe in self.upperPipes:
# 判断小鸟是否通过管道
pipeMidPos = pipe['x'] + PIPE_WIDTH / 2
# 若通过管道,则额外获得1奖励
if pipeMidPos <= playerMidPos < pipeMidPos + 4:
self.score += 1
# SOUNDS['point'].play()
reward = 1
# 小鸟的坐标变化
if (self.loopIter + 1) % 3 == 0:
self.playerIndex = next(PLAYER_INDEX_GEN)
self.loopIter = (self.loopIter + 1) % 30
# 基础背景横坐标的变化
self.basex = -((-self.basex + 100) % self.baseShift)
# 玩家的动作
if self.playerVelY < self.playerMaxVelY and not self.playerFlapped:
self.playerVelY += self.playerAccY
if self.playerFlapped:
self.playerFlapped = False
self.playery += min(self.playerVelY, BASEY - self.playery - PLAYER_HEIGHT)
if self.playery < 0:
self.playery = 0
# 使游戏屏幕左移
for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
uPipe['x'] += self.pipeVelX
lPipe['x'] += self.pipeVelX
# 增加新的管道障碍
if 0 < self.upperPipes[0]['x'] < 5:
newPipe = getRandomPipe()
self.upperPipes.append(newPipe[0])
self.lowerPipes.append(newPipe[1])
# 假如最左边的管道出了屏幕,则移除
if self.upperPipes[0]['x'] < -PIPE_WIDTH:
self.upperPipes.pop(0)
self.lowerPipes.pop(0)
# 检查是否碰撞
isCrash= checkCrash({'x': self.playerx, 'y': self.playery,
'index': self.playerIndex},
self.upperPipes, self.lowerPipes)
# 假如撞到管道
if isCrash:
#SOUNDS['hit'].play()
#SOUNDS['die'].play()
# 终止游戏
terminal = True
#重新初始化
self.__init__()
reward = -1
# 渲染背景
SCREEN.blit(IMAGES['background'], (0,0))
for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
SCREEN.blit(IMAGES['pipe'][0], (uPipe['x'], uPipe['y']))
SCREEN.blit(IMAGES['pipe'][1], (lPipe['x'], lPipe['y']))
SCREEN.blit(IMAGES['base'], (self.basex, BASEY))
SCREEN.blit(IMAGES['player'][self.playerIndex],
(self.playerx, self.playery))
# 更新游戏画面
image_data = pygame.surfarray.array3d(pygame.display.get_surface())
pygame.display.update()
FPSCLOCK.tick(FPS)
return image_data, reward, terminal
# 游戏名
GAME = 'flappy bird'
# 有效动作数目
ACTIONS = 2
# 折扣因子
GAMMA = 0.99
# 训练前观察的步长
OBSERVE = 100.
# 随机探索的时间
EXPLORE = 3.0e6
# 最终的探索率
FINAL_EPSILON = 1.0e-4
# 初始探索率
INITIAL_EPSILON = 0.1
# 经验池的大小
REPLAY_MEMORY = 50000
# batch的大小
BATCH = 32
# 跳帧
FRAME_PER_ACTION = 1
定义经验回报类即算法中的经验池,用来保存已经采样结束的数据。类似于队列的形式,有最大容量限制。在达到最大限制之后,采取先进先出的更新方法来更新经验回放中的采样数据。
class Experience_Buffer():
# 初始化一个经验回放池,包括一个列表和最大容纳量
def __init__(self,buffer_size = REPLAY_MEMORY):
# 用来存放采样的数据
self.buffer = []
# 设置可以存放的数据条数
self.buffer_size = buffer_size
# 将采样后的数据添加到经验池中,定义添加规则为先进先出
def add_experience(self, experience):
# 将一条数据添加到经验池中,若超过最大容量,则更新掉最旧的采样信息
if len(self.buffer)+len(experience) >= self.buffer_size:
self.buffer[0:len(self.buffer)+len(experience)-self.buffer_size]=[]
# 若小于最大容量,则将该条数据直接添加到经验池最后
self.buffer.extend(experience)
# 采样函数,定义采样规则
def sample(self,samples_num):
# 随机采样
sample_data = random.sample(self.buffer, samples_num)
# 获取采样数据,包括当前状态s,采取的动作a,获得的奖励r,下一状态s_,是否为终止状态
train_s = [d[0] for d in sample_data ]
train_a = [d[1] for d in sample_data]
train_r = [d[2] for d in sample_data]
train_s_=[d[3] for d in sample_data]
train_terminal = [d[4] for d in sample_data]
return train_s, train_a, train_r,train_s_,train_terminal
下面我们开始进行训练,首先定义一个类,接着定义初始化方法,包括创建两个网络、分离两个网络的参数、定义参数替换操作、定义损失函数和优化器、初始化变量以及保存读取模型等:
class Deep_Q_N():
# 初始化网络,设置网络参数以及其它常量
def __init__(self, lr=1.0e-6, model_file=None):
# 设置折扣因子
self.gamma = GAMMA
# 设置更新目标网络参数的步长
self.tau = 0.01
# 创建会话
self.sess = tf.Session()
# 设置学习率
self.learning_rate = lr
# 输入层
# 分别定义表现网络、目标网络的输入和输出
self.obs = tf.placeholder(tf.float32,shape=[None, 80,80,4])
self.obs_=tf.placeholder(tf.float32, shape=[None, 80,80,4])
self.action = tf.placeholder(tf.float32, shape=[None,ACTIONS])
self.action_=tf.placeholder(tf.float32, shape=[None, ACTIONS])
# 创建表现网络
self.Q = self.build_q_net(self.obs, scope='eval',trainable=True)
# 创建目标网络
self.Q_= self.build_q_net(self.obs_, scope='target', trainable=False)
# 分离两套网络参数
self.qe_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='eval')
self.qt_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='target')
# 定义新旧参数的替换操作
self.update_oldq_op = [oldq.assign((1 - self.tau) * oldq + self.tau * p) for p, oldq in zip(self.qe_params, self.qt_params)]
# 构建损失函数
self.Q_target =tf.placeholder(tf.float32, [None])
readout_q = tf.reduce_sum(tf.multiply(self.Q, self.action), reduction_indices=1)
self.q_loss = tf.losses.mean_squared_error(labels=self.Q_target, predictions=readout_q)
# 定义优化器,采用Adam方法,学习率为1.0e-6
self.q_train_op = tf.train.AdamOptimizer(lr).minimize(self.q_loss,var_list=self.qe_params)
# 初始化图中的变量
self.sess.run(tf.global_variables_initializer())
# 定义保存和恢复模型
self.saver = tf.train.Saver()
if model_file is not None:
self.restore_model(model_file)
接着定义保存和读取模型的具体方法以及神经网络的具体结构,本案例使用卷积神经网络近似状态-动作值函数,网络结构如下:
卷积层:32个8×8的卷积核,步长为4,激活函数为ReLU
最大池化层:窗口大小为2×2,步长为2
卷积层:64个4×4的卷积核,步长为2,激活函数为ReLU
卷积层:64个3×3的卷积核,步长为1,激活函数为ReLU
全连接层:512个神经元
# 定义存储模型函数
def save_model(self, model_path,global_step):
self.saver.save(self.sess, model_path,global_step=global_step)
# 定义读取模型函数
def restore_model(self, model_path):
self.saver.restore(self.sess, model_path)
# 建立表现网络
def build_q_net(self, obs, scope, trainable):
with tf.variable_scope(scope, reuse=tf.AUTO_REUSE):
# 卷积层,32个8×8卷积,步长为4,全零填充,激活函数为ReLU
h_conv1 = tf.keras.layers.Conv2D(filters=32, kernel_size=[8,8],strides=4,padding="same",activation=tf.nn.relu,kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(obs)
# 池化层,最大池化,窗口大小为2×2,步长为2
h_pool1 = tf.keras.layers.MaxPooling2D(pool_size=[2,2],strides=2, padding="SAME")(h_conv1)
# 卷积层,64个4×4卷积,步长为2,全零填充,激活函数为ReLU
h_conv2 = tf.keras.layers.Conv2D(filters=64, kernel_size=[4,4],strides=2, padding="same", activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_pool1)
# 卷积层,64个3×3卷积,步长为1,全零填充,激活函数为ReLU
h_conv3 = tf.keras.layers.Conv2D(filters=64, kernel_size=[3,3],strides=1,padding="same",activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0,stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_conv2)
# 扁平化
h_conv3_flat = tf.keras.layers.Flatten()(h_conv3)
# 全连接层,512个神经元,激活函数为ReLU
h_fc1 = tf.keras.layers.Dense(units=512,activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(0,stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_conv3_flat)
# 输出层,2个神经元,表示两个动作分别对应的预测Q值,没有激活函数
qout = tf.keras.layers.Dense(units=2,kernel_initializer=tf.random_normal_initializer(0,stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_fc1)
return qout
然后定义采样数据的策略:
# 采用ε-greedy策略来进行采样
def epsilon_greedy(self,s_t,epsilon):
a_t = np.zeros([ACTIONS])
# 获得最大Q值对应的动作
amax = np.argmax(self.sess.run(self.Q,{self.obs:[s_t]})[0])
# 判断执行贪婪策略还是进行探索
if np.random.uniform() < 1 - epsilon:
# 执行贪婪策略
a_t[amax] = 1
else:
# 进行探索
a_t[random.randrange(ACTIONS)]=1
return a_t
最后定义网络的训练,首先初始化状态,与游戏环境交互产生图像并将图像进行预处理;接着采取$\epsilon$-greedy策略采样动作并执行动作与游戏环境交互产生奖励和下一状态;然后将数据存储在经验池中;最后进行训练,首先从经验池中采样数据,接着运行目标网络得到预测Q值,然后训练一次,最后缓慢更新目标网络的参数。
# 训练网络
def train_Network(self,experience_buffer):
#打开游戏状态与模拟器进行通信
game_state = GameState()
# 获得第一个状态
do_nothing = np.zeros(ACTIONS)
# 初始动作
do_nothing[0]=1
# 与游戏交互一次,并将图像进行预处理
x_t, r_0, terminal = game_state.frame_step(do_nothing)
x_t = cv2.cvtColor(cv2.resize(x_t, (80,80)),cv2.COLOR_BGR2GRAY)
ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY)
s_t = np.stack((x_t, x_t, x_t, x_t),axis=2)
#开始训练
epsilon = INITIAL_EPSILON
t= 0
# 由于训练时间较长,这里只展现前10000轮训练过程
while t<=10000:
# 采取ε-greedy策略来进行动作的选取
a_t = self.epsilon_greedy(s_t,epsilon=epsilon)
# ε递减,要求探索率是不断衰减地来保证最后可以收敛到一个确定的理想状态
if epsilon > FINAL_EPSILON and t>OBSERVE:
epsilon -= (INITIAL_EPSILON-FINAL_EPSILON)/EXPLORE
# 执行动作,与游戏环境交互一次,并获得返回值
x_t1_colored, r_t,terminal = game_state.frame_step(a_t)
x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
x_t1 =np.reshape(x_t1,(80,80,1))
s_t1 = np.append(x_t1, s_t[:,:,:3],axis=2)
# 将数据存储到经验池中
experience = np.reshape(np.array([s_t,a_t,r_t,s_t1,terminal]),[1,5])
experience_buffer.add_experience(experience)
# 进行训练
if t>OBSERVE:
# 由经验池中采集样本,获取信息
train_s, train_a, train_r,train_s_,train_terminal = experience_buffer.sample(BATCH)
target_q=[]
# 运行目标网络
read_target_Q = self.sess.run(self.Q_,{self.obs_:train_s_})
for i in range(len(train_r)):
# 判断是否为终止状态,若是则直接获得累积奖励
if train_terminal[i]:
target_q.append(train_r[i])
# 否则需要计算下一状态的最大Q值并添加到累积奖励中
else:
target_q.append(train_r[i]+GAMMA*np.max(read_target_Q[i]))
#训练一次
self.sess.run(self.q_train_op, feed_dict={self.obs:train_s, self.action:train_a, self.Q_target:target_q})
#更新旧的目标网络
self.sess.run(self.update_oldq_op)
# 往前推进一步
s_t = s_t1
t += 1
# 每10000次迭代保存一次
if t%10000 == 0:
self.save_model('saved_networks/',global_step=t)
else:
if t%1000 == 0:
# 打印迭代轮数,探索率,动作和奖励
print("train, steps",t,"/epsilon", epsilon,"/action_index",a_t, "/reward",r_t)
创建类的完整代码如下,若想得到良好的训练效果,需要训练非常多的轮数和花费较长的训练时间,所以这里的代码我们只展示前10000轮的训练过程:
class Deep_Q_N():
# 初始化网络,设置网络参数以及其它常量
def __init__(self, lr=1.0e-6, model_file=None):
# 设置折扣因子
self.gamma = GAMMA
# 设置更新目标网络参数的步长
self.tau = 0.01
# 创建会话
self.sess = tf.Session()
# 设置学习率
self.learning_rate = lr
# 输入层
# 分别定义表现网络、目标网络的输入和输出
self.obs = tf.placeholder(tf.float32,shape=[None, 80,80,4])
self.obs_=tf.placeholder(tf.float32, shape=[None, 80,80,4])
self.action = tf.placeholder(tf.float32, shape=[None,ACTIONS])
self.action_=tf.placeholder(tf.float32, shape=[None, ACTIONS])
# 创建表现网络
self.Q = self.build_q_net(self.obs, scope='eval',trainable=True)
# 创建目标网络
self.Q_= self.build_q_net(self.obs_, scope='target', trainable=False)
# 分离两套网络参数
self.qe_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='eval')
self.qt_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='target')
# 定义新旧参数的替换操作
self.update_oldq_op = [oldq.assign((1 - self.tau) * oldq + self.tau * p) for p, oldq in zip(self.qe_params, self.qt_params)]
# 构建损失函数
self.Q_target =tf.placeholder(tf.float32, [None])
readout_q = tf.reduce_sum(tf.multiply(self.Q, self.action), reduction_indices=1)
self.q_loss = tf.losses.mean_squared_error(labels=self.Q_target, predictions=readout_q)
# 定义优化器,采用Adam方法,学习率为1.0e-6
self.q_train_op = tf.train.AdamOptimizer(lr).minimize(self.q_loss,var_list=self.qe_params)
# 初始化图中的变量
self.sess.run(tf.global_variables_initializer())
#6.定义保存和读取模型
self.saver = tf.train.Saver()
if model_file is not None:
self.restore_model(model_file)
# 定义存储模型函数
def save_model(self, model_path,global_step):
self.saver.save(self.sess, model_path,global_step=global_step)
# 定义读取模型函数
def restore_model(self, model_path):
self.saver.restore(self.sess, model_path)
# 建立表现网络
def build_q_net(self, obs, scope, trainable):
with tf.variable_scope(scope, reuse=tf.AUTO_REUSE):
# 卷积层,32个8×8卷积,步长为4,全零填充,激活函数为ReLU
h_conv1 = tf.keras.layers.Conv2D(filters=32, kernel_size=[8,8],strides=4,padding="same",activation=tf.nn.relu,kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(obs)
# 池化层,最大池化,窗口大小为2×2,步长为2
h_pool1 = tf.keras.layers.MaxPooling2D(pool_size=[2,2],strides=2, padding="SAME")(h_conv1)
# 卷积层,64个4×4卷积,步长为2,全零填充,激活函数为ReLU
h_conv2 = tf.keras.layers.Conv2D(filters=64, kernel_size=[4,4],strides=2, padding="same", activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_pool1)
# 卷积层,64个3×3卷积,步长为1,全零填充,激活函数为ReLU
h_conv3 = tf.keras.layers.Conv2D(filters=64, kernel_size=[3,3],strides=1,padding="same",activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0,stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_conv2)
# 扁平化
h_conv3_flat = tf.keras.layers.Flatten()(h_conv3)
# 全连接层,512个神经元,激活函数为ReLU
h_fc1 = tf.keras.layers.Dense(units=512,activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(0,stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_conv3_flat)
# 输出层,2个神经元,表示两个动作分别对应的预测Q值,没有激活函数
qout = tf.keras.layers.Dense(units=2,kernel_initializer=tf.random_normal_initializer(0,stddev=0.01),\
bias_initializer=tf.constant_initializer(0.01),trainable=trainable)(h_fc1)
return qout
# 采用ε-greedy策略来进行采样
def epsilon_greedy(self,s_t,epsilon):
a_t = np.zeros([ACTIONS])
# 获得最大Q值对应的动作
amax = np.argmax(self.sess.run(self.Q,{self.obs:[s_t]})[0])
# 判断执行贪婪策略还是进行探索
if np.random.uniform() < 1 - epsilon:
# 执行贪婪策略
a_t[amax] = 1
else:
# 进行探索
a_t[random.randrange(ACTIONS)]=1
return a_t
# 训练网络
def train_Network(self,experience_buffer):
#打开游戏状态与模拟器进行通信
game_state = GameState()
# 获得第一个状态
do_nothing = np.zeros(ACTIONS)
# 初始动作
do_nothing[0]=1
# 与游戏交互一次,并将图像进行预处理
x_t, r_0, terminal = game_state.frame_step(do_nothing)
x_t = cv2.cvtColor(cv2.resize(x_t, (80,80)),cv2.COLOR_BGR2GRAY)
ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY)
s_t = np.stack((x_t, x_t, x_t, x_t),axis=2)
#开始训练
epsilon = INITIAL_EPSILON
t= 0
# 由于训练时间较长,这里只展现前10000轮训练过程
while t<=10000:
# 采取ε-greedy策略来进行动作的选取
a_t = self.epsilon_greedy(s_t,epsilon=epsilon)
# ε递减,要求探索率是不断衰减地来保证最后可以收敛到一个确定的理想状态
if epsilon > FINAL_EPSILON and t>OBSERVE:
epsilon -= (INITIAL_EPSILON-FINAL_EPSILON)/EXPLORE
# 执行动作,与游戏环境交互一次,并获得返回值
x_t1_colored, r_t,terminal = game_state.frame_step(a_t)
x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
x_t1 =np.reshape(x_t1,(80,80,1))
s_t1 = np.append(x_t1, s_t[:,:,:3],axis=2)
# 将数据存储到经验池中
experience = np.reshape(np.array([s_t,a_t,r_t,s_t1,terminal]),[1,5])
experience_buffer.add_experience(experience)
# 进行训练
if t>OBSERVE:
# 由经验池中采集样本,获取信息
train_s, train_a, train_r,train_s_,train_terminal = experience_buffer.sample(BATCH)
target_q=[]
# 运行目标网络
read_target_Q = self.sess.run(self.Q_,{self.obs_:train_s_})
for i in range(len(train_r)):
# 判断是否为终止状态,若是则直接获得累积奖励
if train_terminal[i]:
target_q.append(train_r[i])
# 否则需要计算下一状态的最大Q值并添加到累积奖励中
else:
target_q.append(train_r[i]+GAMMA*np.max(read_target_Q[i]))
#训练一次
self.sess.run(self.q_train_op, feed_dict={self.obs:train_s, self.action:train_a, self.Q_target:target_q})
#更新旧的目标网络
self.sess.run(self.update_oldq_op)
# 往前推进一步
s_t = s_t1
t += 1
# 每10000次迭代保存一次
if t%10000 == 0:
self.save_model('saved_networks/',global_step=t)
else:
if t%1000 == 0:
# 打印迭代轮数,探索率,动作和奖励
print("train, steps",t,"/epsilon", epsilon,"/action_index",a_t, "/reward",r_t)
# 使用GPU进行训练
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
if __name__=="__main__":
# 实例化经验池
buffer = Experience_Buffer()
# 实例化深度Q网络
brain= Deep_Q_N()
# 对于网络进行训练
brain.train_Network(buffer)
最终的运行结果如下:
在本案例中,我们首先将Flappy Bird游戏形式化为一个MDP问题,接着利用Pygame建立了游戏环境,最后使用DQN算法训练智能体玩了Flappy Bird游戏。由于DQN算法使用了深度神经网络以及经验回放等技巧,因此需要大量的内存空间来运行代码,但是经过训练后的结果可以达到近乎完美的水平,几乎可以媲美人类玩家。
在案例运行中,需要的游戏配置图片如下:
(1)10张数字图片,用来显示玩家的得分
(2)基础背景,设置为纯黑
(3)地板图片
(4)管道图片
(5)小鸟的三种状态,依次为向下飞、平飞和向上飞