本文转载自莫烦-强化学习-A3C
相关论文:Asynchronous Methods for Deep Reinforcement Learning

今天我们会来说说强化学习中的一种有效利用计算资源, 并且能提升训练效用的算法, Asynchronous Advantage Actor-Critic, 简称 A3C.

什么是 Asynchronous Advantage Actor-Critic (A3C)

平行宇宙
我们先说说没什么关系的,大家知道平行宇宙这回事. 想像现在有三个平行宇宙, 那么就意味着这3个平行宇宙上存在3个你, 而你可能在电脑前呆了很久, 对, 说的就是你! 然后你会被我催促起来做运动~ 接着你 和 你 还有 你, 就无奈地在做着不同的运动, 油~ 我才不想知道你在做什么样的运动呢. 不过这3个你 都开始活动胫骨啦. 假设3个你都能互相通信, 告诉对方, “我这个动作可以有效缓解我的颈椎病”, “我做那个动作后, 腰就不痛了 “, “我活动了手臂, 肩膀就不痛了”. 这样你是不是就同时学到了对身体好的三招. 这样是不是感觉特别有效率. 让你看看更有效率的, 那就想想3个你同时在写作业, 一共3题, 每人做一题, 只用了1/3 的时间就把作业做完了. 感觉棒棒的. 哈, 你看出来了, 如果把这种方法用到强化学习, 岂不是 “牛逼lity”.
在这里插入图片描述
平行训练
这就是传说中的 A3C.
A3C 其实只是这种平行方式的一种而已, 它采用的是我们之前提到的 Actor-Critic 的形式. 为了训练一对 ActorCritic, 我们将它复制多份红色的, 然后同时放在不同的平行宇宙当中, 让他们各自玩各的. 然后每个红色副本都悄悄告诉黑色的 Actor-Critic 自己在那边的世界玩得怎么样, 有哪些经验值得分享. 然后还能从黑色的 Actor-Critic 这边再次获取综合考量所有副本经验后的通关秘籍. 这样一来一回, 形成了一种有效率的强化学习方式.
在这里插入图片描述多核训练
我们知道目前的计算机多半是有双核, 4核, 甚至 6核, 8核. 一般的学习方法, 我们只能让机器人在一个核上面玩耍. 但是如果使用 A3C 的方法, 我们可以给他们安排去不同的核, 并行运算. 实验结果就是, 这样的计算方式往往比传统的方式快上好多倍. 那我们也多用用这样的红利吧.
在这里插入图片描述

A3C

一句话概括 A3C: Google DeepMind 提出的一种解决 Actor-Critic 不收敛问题的算法. 它会创建多个并行的环境, 让多个拥有副结构agent 同时在这些并行环境上更新主结构中的参数. 并行中的 agent 们互不干扰, 而主结构的参数更新受到副结构提交更新的不连续性干扰, 所以更新的相关性被降低, 收敛性提高.

算法

A3C 的算法实际上就是将 Actor-Critic 放在了多个线程中进行同步训练. 可以想象成几个人同时在玩一样的游戏, 而他们玩游戏的经验都会同步上传到一个中央大脑. 然后他们又从中央大脑中获取最新的玩游戏方法.

这样, 对于这几个人, 他们的好处是: 中央大脑汇集了所有人的经验, 是最会玩游戏的一个, 他们能时不时获取到中央大脑的必杀招, 用在自己的场景中.

对于中央大脑的好处是: 中央大脑最怕一个人的连续性更新, 不只基于一个人推送更新这种方式能打消这种连续性. 使中央大脑不必有用像 DQN, DDPG 那样的记忆库也能很好的更新.

为了达到这个目的, 我们要有两套体系, 可以看作中央大脑拥有 global net 和他的参数, 每位玩家有一个 global net 的副本 local net, 可以定时向 global net 推送更新, 然后定时从 global net 那获取综合版的更新.
在这里插入图片描述
如果在 tensorboard 中查看我们今天要建立的体系, 这就是你会看到的.
W_0 就是第0个 worker, 每个 worker 都可以分享 global_net.
在这里插入图片描述
如果我们调用 sync 中的 pull, 这个 worker 就会从 global_net 中获取到最新的参数.
在这里插入图片描述
如果我们调用 sync 中的 push, 这个 worker 就会将自己的个人更新推送去 global_net.
在这里插入图片描述

主结构

我们用 Tensorflow 搭建神经网络, 对于我们的 Actor, tensorboard 中可以看清晰的看到我们是如果搭建的:
在这里插入图片描述
我们使用了 Normal distribution 来选择动作, 所以在搭建神经网络的时候, actor 这边要输出动作的均值和方差. 然后放入 Normal distribution 去选择动作. 计算 actor loss 的时候我们还需要使用到 critic 提供的 TD error 作为 gradient ascent 的导向.

critic 很简单啦, 只需要得到他对于 state 的价值就好了. 用于计算 TD error.
在这里插入图片描述

Actor Critic 网络

其搭建的代码部分在这:
这些只是在创建网络而已, worker 还有属于自己的 class, 用来执行在每个线程里的工作.

# 这个 class 可以被调用生成一个 global net.
# 也能被调用生成一个 worker 的 net, 因为他们的结构是一样的,
# 所以这个 class 可以被重复利用.
class ACNet(object):
def __init__(self, globalAC=None):
# 当创建 worker 网络的时候, 我们传入之前创建的 globalAC 给这个 worker
if 这是 global: # 判断当下建立的网络是 local 还是 global
with tf.variable_scope('Global_Net'):
self._build_net()
else:
with tf.variable_scope('worker'):
self._build_net()

# 接着计算 critic loss 和 actor loss
# 用这两个 loss 计算要推送的 gradients

with tf.name_scope('sync'): # 同步
with tf.name_scope('pull'):
# 更新去 global
with tf.name_scope('push'):
# 获取 global 参数

def _build_net(self):
# 在这里搭建 Actor 和 Critic 的网络
return 均值, 方差, state_value

def update_global(self, feed_dict):
# 进行 push 操作

def pull_global(self):
# 进行 pull 操作

def choose_action(self, s):
# 根据 s 选动作

Worker

每个 worker 有自己的 class, class 里面有他的工作内容 work.

class Worker(object):
def __init__(self, name, globalAC):
self.env = gym.make(GAME).unwrapped # 创建自己的环境
self.name = name # 自己的名字
self.AC = ACNet(name, globalAC) # 自己的 local net, 并绑定上 globalAC

def work(self):
# s, a, r 的缓存, 用于 n_steps 更新
buffer_s, buffer_a, buffer_r = [], [], []
while not COORD.should_stop() and GLOBAL_EP < MAX_GLOBAL_EP:
s = self.env.reset()

for ep_t in range(MAX_EP_STEP):
a = self.AC.choose_action(s)
s_, r, done, info = self.env.step(a)

buffer_s.append(s) # 添加各种缓存
buffer_a.append(a)
buffer_r.append(r)

# 每 UPDATE_GLOBAL_ITER 步 或者回合完了, 进行 sync 操作
if total_step % UPDATE_GLOBAL_ITER == 0 or done:
# 获得用于计算 TD error 的 下一 state 的 value
if done:
v_s_ = 0 # terminal
else:
v_s_ = SESS.run(self.AC.v, {self.AC.s: s_[np.newaxis, :]})[0, 0]

buffer_v_target = [] # 下 state value 的缓存, 用于算 TD
for r in buffer_r[::-1]: # 进行 n_steps forward view
v_s_ = r + GAMMA * v_s_
buffer_v_target.append(v_s_)
buffer_v_target.reverse()

buffer_s, buffer_a, buffer_v_target = np.vstack(buffer_s), np.vstack(buffer_a), np.vstack(buffer_v_target)

feed_dict = {
self.AC.s: buffer_s,
self.AC.a_his: buffer_a,
self.AC.v_target: buffer_v_target,
}

self.AC.update_global(feed_dict) # 推送更新去 globalAC
buffer_s, buffer_a, buffer_r = [], [], [] # 清空缓存
self.AC.pull_global() # 获取 globalAC 的最新参数

s = s_
if done:
GLOBAL_EP += 1 # 加一回合
break # 结束这回合

Worker 并行工作

这里才是真正的重点! Worker 的并行计算.

with tf.device("/cpu:0"):
GLOBAL_AC = ACNet(GLOBAL_NET_SCOPE) # 建立 Global AC
workers = []
for i in range(N_WORKERS): # 创建 worker, 之后在并行
workers.append(Worker(GLOBAL_AC)) # 每个 worker 都有共享这个 global AC

COORD = tf.train.Coordinator() # Tensorflow 用于并行的工具

worker_threads = []
for worker in workers:
job = lambda: worker.work()
t = threading.Thread(target=job) # 添加一个工作线程
t.start()
worker_threads.append(t)
COORD.join(worker_threads) # tf 的线程调度

我的电脑里可以建立 4个 worker, 也就可以把它们放在4个线程中并行探索更新. 最后的学习结果可以用这个获取 moving averagereward 的图来概括.
在这里插入图片描述

multiprocessing + A3C

除此之外, 我心里一直有一个疙瘩, 因为这个 A3C 中, 我用的是 pythonthreading, 懂 python 的朋友知道, threadingGIL, 运算速度是问题, 我的 CPU 都不是满格的. 我一直想把这个 A3C 代码移植去 multiprocessing, 提高效率. 但是 Tensorflow 的 session 就是和 multiprocessing 不兼容, Global Net 做不好. 怎么办?

Distributed Tensorflow 是一个备选方案. 但是这个要求你是在计算机集群上做, 不然速度上还不如这个 threadingA3C. 这时, 我不爽了, 到在知乎上抱怨了一番. 和知友们聊了会, 然后我想出了下面这个方案.

和 Tensorflow 一样, 我做过一些 Pytorch 的教程, pytorch 也是做神经网络的. 但是它是支持 multiprocessing 的. 我专门开了一个 repo, 把 Pytorch + multiprocessing 的代码分享了出来. 这会儿, CPU 满格, 心情舒畅多了~
在这里插入图片描述


Shiroha