PPO算法在连续与离散动作空间中的案例实践

1.PPO算法与动作空间类型概览

1.PPO(Proximal Policy Optimization)简介

PPO(近端策略优化)是OpenAI于2017年提出的强化学习算法,通过创新的”剪切目标函数”设计,在保证训练稳定性的同时实现高效策略优化。其核心思想是通过约束策略更新幅度,防止策略突变导致的性能崩溃,解决了传统策略梯度方法(如TRPO)的工程实现复杂性问题

1.剪切比值机制(Clip Objective)

$$L^{\text{CLIP}}(\theta) = \mathbb{E}_t \left[ \min\left(r_t(\theta) \hat{A}_t,; \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon) \cdot \hat{A}_t\right) \right]$$

其中  $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{\text{old}}}(a_t|s_t)}$ 是新旧策略的概率比值。该设计的目的是:

  • 当策略更新幅度在阈值 ϵ(通常取0.1-0.3)内时正常优化
  • 当策略更新幅度过大或过小时进行截断,避免策略突变

2.​广义优势估计(GAE)​

广义优势估计(Generalized Advantage Estimation,简称 GAE)是策略梯度方法中一种用于减少策略训练时方差、提高稳定性的技巧。GAE 的目标结合多个 n-step Advantage 的加权平均,在方差和偏差之间取得更好的平衡。
GAE 定义如下:
$$\hat{A}t^{\mathrm{GAE}(\gamma, \lambda)} = \sum{l=0}^{\infty} (\gamma \lambda)^l \delta_{t+l}$$
其中:

  • $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$:一阶 TD 残差
  • $\lambda \in [0, 1]$:控制偏差与方差的权衡参数
    • $\lambda = 0$:使用 1-step TD,偏差大但方差小
    • $\lambda = 1$:类似 Monte Carlo 返回,偏差小但方差大
    • 介于中间时,效果通常更好(如 PPO 默认 $\lambda = 0.95$)

可以把 GAE 看成是 对未来奖励序列的指数加权平均,越靠近当前步的 TD 残差权重越大,远期的残差权重越小。通过调整 $\lambda$ 来控制对未来的“信任度”。
一个形象的比喻是:

  • $\lambda$ 小 → “短视”:更相信当前的反馈
  • $\lambda$ 大 → “长视”:更相信未来回报的总体趋势

3.双网络架构与数据复用​

  • Actor-Critic框架​:策略网络(Actor)生成动作,价值网络(Critic)评估状态
  • 多轮优化机制​:单次采样数据支持3-10轮策略更新,提升样本效率

2.强化学习中的两类动作空间

在强化学习中,“动作”就是智能体(agent)在每个时刻可以“做什么”。而动作空间,就是所有可能动作的集合——就像游戏角色能做哪些操作:走、跳、射击、转身……都在动作空间里。

1.离散动作空间(Discrete Action Space)

动作空间是有限个、不连续的动作选项。每个动作就像一个编号。像“菜单点菜”:选一个选项,编号是离散的,不能选半个动作或者1.7号动作。举例:

场景 动作空间 含义
游戏角色控制 [0, 1, 2] 0=前进,1=后退,2=跳跃
自动贩售机 [0, 1, 2, 3] 选择四种饮料中的一种
黑白棋 所有合法落子位置(最多 64 个) 每一步落子对应一个动作

2.连续动作空间(Continuous Action Space)

动作是实数向量,可以是任意值,甚至是多个维度组成的动作向量。像“调音台旋钮”:你可以随意旋转旋钮,控制值是连续的,可以精调到任意数值。举例:

场景 动作空间 含义
机械臂控制 [θ₁, θ₂, θ₃] ∈ ℝ³ 每个关节旋转角度是一个连续值
飞船推进 [f₁, f₂] ∈ ℝ² 左右引擎推力,值从 0~1
自动驾驶 [加速度, 方向盘角度] ∈ ℝ² 任意控制强度和方向

2.LunarLander-v3(离散动作空间案例)

1.环境与任务

LunarLander-v3 是 OpenAI Gym/Gymnasium 中的一个经典强化学习环境,模拟了一个飞船在月球上着陆的任务。任务目标是让飞船安全、平稳地降落在地面中央的着陆平台上。

1.状态空间(state)

一个长度为 8 的浮点向量,表示飞船当前的物理状态:

  • x:水平方向的位置
  • y:垂直方向的位置
  • vx:水平方向速度
  • vy:垂直方向速度
  • angle:飞船的旋转角度
  • angle_vel:飞船的角速度
  • left_leg_contact:左腿是否接触地面
  • right_leg_contact:右腿是否接触地面

2.动作空间(action space)

动作是离散的,共有 4 个动作

  • 0:不作为
  • 2:主引擎向下喷气
  • 3:左方向喷气
  • 1:右方向喷气

3.奖励机制(reward)

  • +100~140:成功降落在平台中央
  • -100:摔坏飞船
  • -0.3 / 每次喷气:惩罚使用燃料(节省能量)
  • +10:每条腿接触地面

所以整个着陆任务的最佳策略是如何平稳、省油地降落在中央区域(两个小旗子中间)。

4.成功/终止条件

成功条件:飞船平稳降落在中央的目标平台,并且两个着陆腿都接触地面
终止条件:飞船成功着陆或坠毁或者飞船飞出屏幕外或者任务时间超限

5.相关参数

1
2
3
4
5
6
7
8
9
env = gym.make("LunarLander-v3", render_mode="human") # 创建环境
action = 3 # 向左推进
# terminated:任务完成 truncated:任务被结束
obs, reward, terminated, truncated, info = env.step(action)
x, y, vx, vy, angle, angle_vel, left_leg, right_leg = obs
print(f"横向:{x}, 纵向:{y}, 水平速度:{vx}, 垂直速度:{vy}, 朝向角度:{angle}, \n角速度:{angle_vel}, 左支架:{left_leg}, 右支架:{right_leg},action:{action},得分:{reward}")
"""
横向:-0.18395118415355682, 纵向:0.003272805130109191, 水平速度:-0.005371610634028912, 垂直速度:-0.001628089346922934, 朝向角度:-1.9359147548675537, 角速度:0.017528165131807327, 左支架:1.0, 右支架:0.0,action:3,得分:-100
"""

2.经验缓存

Memory 类的作用是收集和存储智能体与环境交互过程中的经验数据,这也是为后续的策略更新(训练)提供数据支持。主要功能如下:

1. 主要用途

  • 存储一批采样数据:包括每一步的状态、动作、动作概率、奖励、是否终止等信息
  • 支持多轮优化:采集到一批数据后,可以多次利用这些数据进行策略网络的更新,提高样本利用率
  • 便于批量处理:将采集到的数据整理为张量,方便后续神经网络的批量训练

2.代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Memory:  
# 经验缓存类
def __init__(self):
self.actions = [] # 存储动作
self.states = [] # 存储状态
self.logprobs = [] # 存储旧策略下动作的对数概率
self.rewards = [] # 存储奖励
self.is_terminals = [] # 存储是否为终止状态标志

def clear_memory(self):
# 清空缓存(每次 update 后调用)
del self.actions[:]
del self.states[:]
del self.logprobs[:]
del self.rewards[:]
del self.is_terminals[:]

3.工作流程

  1. 采样阶段:智能体与环境交互,每一步都把当前的状态、动作、动作概率、奖励、是否终止等信息存入Memory
  2. 更新阶段:达到一定步数后(如update_timestep),用Memory中存储的数据进行多轮策略更新
  3. 清空阶段:更新完毕后,调用clear_memory()清空缓存,为下一批采样做准备

3.策略网络结构(Actor)

在PPO中,策略网络(Actor)的作用是:输入当前环境状态,输出每个可选动作的概率分布。
对于LunarLander-v3(离散动作空间),策略网络输出4个动作的概率。

1.代码实现

1
2
3
4
5
6
7
8
9
10
11
# 输入层:输入维度state_dim(LunarLander-v3为8,即8维状态特征)
# 隐藏层:两个全连接层,每层后接Tanh激活函数(增加非线性表达能力),隐藏层宽度:n_latent_var(如64)
# 输出层:输出维度:action_dim(LunarLander-v3为4,即4个离散动作)Softmax激活:将输出转为概率分布,保证所有动作概率之和为1
self.action_layer = nn.Sequential(
nn.Linear(state_dim, n_latent_var),
nn.Tanh(), # Tanh 激活函数,给网络增加非线性表达能力,帮助网络学习更复杂的特征
nn.Linear(n_latent_var, n_latent_var),
nn.Tanh(),
nn.Linear(n_latent_var, action_dim),
nn.Softmax(dim=-1) # 输出动作概率分布
)

策略网络输入8维状态特征,输出4维动作概率分布

2.前向推理与动作采样

先输入状态,经过上述网络,得到4个动作的概率分布,再用Categorical分布根据概率采样动作,实现探索,最后将采样的动作、对应的对数概率等信息存入Memory,用于后续PPO更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
def act(self, state, memory):  
# 用当前策略选择动作
state = torch.from_numpy(state).float().to(device)
action_probs = self.action_layer(state) # 得到4维动作概率
dist = Categorical(action_probs) # 创建了一个按策略概率采样动作的分布
action = dist.sample() # 从这个分布中随机抽取一个动作,以便进行策略执行和训练

# 将动作与 log 概率存入 memory,用于后续更新
memory.states.append(state)
memory.actions.append(action)
memory.logprobs.append(dist.log_prob(action))

return action.item()

Categorical 策略是:

“当前状态下,每个动作的概率是多少”,然后按这个分布随机选择一个动作

torch.distributions.Categorical 是 PyTorch 中的一个离散概率分布类,用于处理一维离散随机变量。举例:

1
2
3
probs = torch.tensor([0.1, 0.7, 0.2])
dist = Categorical(probs) # [0, 0.1] 动作0的概率是0.1 [1, 0.7], [2, 0.2]
action = dist.sample()

当执行 dist.sample(),我们就“按概率抽奖”选出一个动作编号。例如抽到action为1的概率是0.7.

3.设计逻辑与优点

  • 多层感知机的结构,适合处理中等复杂度的状态特征
  • 使用Tanh激活函数,有助于稳定训练
  • 采用Softmax输出,天然适配离散动作空间
  • 使用的概率采样,支持策略的探索性

LunarLander-v3的策略网络本质是一个两层隐藏层的全连接神经网络,输入状态,输出每个动作的概率分布。

4.价值网络结构(Critic)

输入当前环境状态,输出该状态的“价值估计”(state value),即智能体在该状态下能获得的期望累计回报。这个数值用于计算优势函数(Advantage),指导策略(Actor)如何去优化。

1.代码实现

1
2
3
4
5
6
7
8
9
10
11
# Critic网络(价值网络):用于输出当前状态的价值(state value)  
# 输入层 输入维度:state_dim(LunarLander-v3为8,即8维状态特征)
# 隐藏层 两个全连接层,每层后接Tanh激活函数 隐藏层宽度:n_latent_var(如64)
# 输出层 输出维度:1(标量,表示该状态的价值)
self.value_layer = nn.Sequential(
nn.Linear(state_dim, n_latent_var),
nn.Tanh(),
nn.Linear(n_latent_var, n_latent_var),
nn.Tanh(),
nn.Linear(n_latent_var, 1) # 输出状态价值
)

输入一个状态向量,经过上述网络,输出一个标量,表示该状态的价值估计。这个值用于PPO损失函数中的优势计算和价值损失部分。

2.评估旧策略

评估旧策略在给定状态下选出某个动作的概率,以及当前状态的价值估计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 输入:状态和动作
def evaluate(self, state, action):
# 1.当前策略网络的前向传播结果,输出动作概率分布(Softmax输出)
action_probs = self.action_layer(state)

# 2.基于动作概率,构建 PyTorch 的 离散概率分布对象。
# 这个dist可以,计算动作的概率、log概率,计算整个分布的熵
dist = Categorical(action_probs)

# 3.重点:计算旧动作在当前策略下的 log 概率
action_logprobs = dist.log_prob(action)

# 4. 熵衡量当前策略有多“随机”。
# PPO 中通常会加上一个探索激励项:loss=policy_loss-c1*value_loss+c2*entropy
dist_entropy = dist.entropy() # 概率分布熵(鼓励探索)

# 5.这是 Critic 网络的输出,对每个状态估计其“价值”
state_value = self.value_layer(state) # 状态价值

# 6.输出
# action_logprobs:当前策略对旧动作的 log 概率(用于更新)
# torch.squeeze(state_value):当前状态的估值
# dist_entropy:当前策略的分布熵(用于鼓励探索)
return action_logprobs, torch.squeeze(state_value), dist_entropy

evaluate() 是 PPO 算法在 训练更新阶段的核心评估函数,它用于:

  • 计算 log概率(用于 PPO 的 clip ratio)
  • 计算 state value(用于 advantage)
  • 计算 entropy(用于策略的探索奖励)

这些值都将直接进入 PPO 的损失函数,指导策略网络和价值网络进行反向传播和参数更新。

5.PPO算法实现

1.代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class PPO:  
def __init__(self, state_dim, action_dim, n_latent_var, lr, betas, gamma, k_epochs, eps_clip):
self.lr = lr
self.betas = betas
self.gamma = gamma
self.eps_clip = eps_clip
self.K_epochs = k_epochs

self.policy = ActorCritic(state_dim, action_dim, n_latent_var).to(device) # 当前策略网络(Actor-Critic结构)
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas) # 用于更新当前策略的 Adam 优化器
self.policy_old = ActorCritic(state_dim, action_dim, n_latent_var).to(device) # 拷贝旧策略网络,采样时使用它(保证采样策略固定)
self.policy_old.load_state_dict(self.policy.state_dict())
self.MseLoss = nn.MSELoss() # 用于 critic 的价值函数回归

# 策略更新核心函数
def update(self, memory):
# 1. 使用 GAE 估计每个状态的回报
# 如果是终止状态(done),则折扣回报归零重新累计
rewards = []
discounted_reward = 0
for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
if is_terminal:
discounted_reward = 0
discounted_reward = reward + (self.gamma * discounted_reward)
rewards.insert(0, discounted_reward)

# 2.对回报进行标准化(加快收敛):有助于更稳定的训练
rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)

# 3.取出旧的数据(detach 防止反向传播)
old_states = torch.stack(memory.states).to(device).detach()
old_actions = torch.stack(memory.actions).to(device).detach()
old_logprobs = torch.stack(memory.logprobs).to(device).detach()

# 4.K次epoch的PPO更新:每次更新循环中,进行一次“策略评估 + 损失计算 + 反向传播”
for _ in range(self.K_epochs):
# 5.策略评估:计算新策略下动作的对数概率、state的value估计和策略的熵(鼓励探索)
logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)

# 6.计算概率比率
ratios = torch.exp(logprobs - old_logprobs.detach())

# 7.计算优势项,衡量动作比期望好多少
advantages = rewards - state_values.detach()

# 8.计算PPO剪切目标
# PPO的核心是防止策略更新太快(剪切比率),所以取两者中较小的作为最终目标
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages

# 9.构造总损失函数
# -torch.min(surr1, surr2):策略损失(Actor)
# 0.5 * self.MseLoss(state_values, rewards):价值损失(Critic)
# - 0.01 * dist_entropy:熵惩罚项,鼓励策略保持一定随机性(探索)
loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy

# 10.执行梯度更新
self.optimizer.zero_grad()
loss.mean().backward()
self.optimizer.step()

# 11.更新旧策略参数,便于下一轮采样
self.policy_old.load_state_dict(self.policy.state_dict())

PPO.update()代码的核心是:用采样轨迹计算 advantage,构造 clipped surrogate loss,在保证稳定更新的前提下优化 actor-critic 网络。

2.举例说说 PPO 的ratio 剪切策略

在PPO算法中,策略更新的核心是限制新旧策略之间的变化幅度,以避免剧烈更新导致策略崩溃。这一策略的核心公式之一为:
$$\text{ratio} = \frac{\pi_{\text{new}}(a|s)}{\pi_{\text{old}}(a|s)}$$
该比率表示新策略与旧策略在相同状态下选择某个动作的概率比值。

在 LunarLander-v3 环境中,飞船需要根据当前的状态(如高度、角度、速度)做出决策,选择激活哪些推进器以实现平稳着陆。策略网络 π 就是用于在每个状态下输出相应动作分布的模型。
在一次回合中,旧策略 π_old 对某个状态 s 采样了一个动作 a,比如“点燃主推进器”。随后,训练过程根据实际的环境反馈计算出该动作的优势(Advantage),即该动作相对于当前状态下平均动作的好坏程度。

策略更新不宜过快
如果该动作的优势为正(说明该动作是“好”的),策略应当提高该动作的概率以鼓励重复选择。然而,如果直接按照策略梯度进行无约束更新,可能会导致该动作的概率被显著放大,从而造成策略过拟合于当前经验,降低策略在未见过情形下的泛化能力,甚至使策略陷入崩溃。

PPO 中的剪切机制
为了抑制策略剧烈更新,PPO 引入了一个“剪切”机制,即限制 ratio 的值在一个范围内(例如 [0.8, 1.2])。当更新导致的概率比率超过该范围时,会使用边界值替代,以实现“保守更新”。

这种剪切机制可类比为飞船的姿态控制系统:

情形 含义 PPO 策略行为
$\text{ratio} \approx 1$ 当前策略与旧策略差异不大,动作选择变化平稳 正常更新,鼓励优势动作
$\text{ratio} \gg 1$ 策略剧烈放大某一动作的概率,容易造成策略偏移或不稳定 剪切 ratio,抑制剧烈变化
$\text{ratio} \ll 1$ 策略对某一动作严重抑制,可能过度惩罚 剪切后保持稳定下降,避免过度惩罚

6.训练参数与控制变量

1.训练参数

1
2
3
4
5
6
7
8
9
state_dim = env.observation_space.shape[0] # 状态向量长度(LunarLander 为 8)
action_dim = 4 # 动作数量(离散 4 种:无、主推进器、左右推进器)
n_latent_var = 64 # 隐藏层宽度
lr = 0.002 # 学习率 控制每次参数更新的步长,影响训练速度和稳定性
betas = (0.9, 0.999) # Adam 优化器的动量参数,通常为 (0.9, 0.999)
gamma = 0.99 # 奖励折扣因子,控制未来奖励的衰减程度,越接近 1 越重视长期奖励
k_epochs = 4 # 每次更新策略时的训练轮数,每收集一批数据后,策略网络要训练多少次,如 4(离散动作),80(连续动作)
eps_clip = 0.2 # PPO 裁剪参数,限制新旧策略概率比的变化范围,防止策略更新过大,保证训练稳定
random_seed = None # 是否设置随机种子(可复现性)

2.控制变量

1
2
3
4
5
6
render = False  # 是否渲染游戏UI
solved_reward = 230 # 平均奖励大于该值即认为任务完成
log_interval = 20 # 每多少个 episode 打印一次日志
max_episodes = 50000 # 训练的总轮数(每轮为一次游戏)
max_timesteps = 300 # 每轮最多多少步
update_timestep = 2000 # 策略更新步数间隔

7.训练流程

1.代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def main():  
# 训练参数与控制变量
# ...

# 设置随机种子
if random_seed is not None:
torch.manual_seed(random_seed)
env.reset(seed=random_seed)
print(f"PyTorch随机数:{torch.rand(1).item()}, 环境初始状态:{env.reset()[0][:2]}")

# 初始化 PPO、Memory、变量
memory = Memory() # 存储一段时间内的经验轨迹(state、action、reward 等)
ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, k_epochs, eps_clip) # 初始化一个 PPO 对象,内部包含策略网络、优化器等

# logging variables
running_reward = 0
total_length = 0
timestep = 0

finished_step = 0

# 训练循环
for i_episode in range(1, max_episodes + 1): # 每次循环表示飞船从头开始着陆一次
state, _ = env.reset() # 初始化
for t in range(max_timesteps): # 每一局最多能走max_timesteps步
finished_step = t
timestep += 1

# 使用旧策略选择动作
action = ppo.policy_old.act(state, memory)
# next_state:执行动作后的新状态 r
# reward:本步获得的奖励
# terminated:是否因为任务完成/失败而结束
# truncated:是否因为达到最大步数等外部原因而结束
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated # 是否终止
state = next_state # 更新状态

# 存储奖励与终止标记
memory.rewards.append(reward)
memory.is_terminals.append(done)

# 满足 update_timestep 时,更新策略并清空经验
if timestep % update_timestep == 0:
ppo.update(memory)
memory.clear_memory()
timestep = 0

running_reward += reward # 奖励累加

if render:
env.render()
if done:
break

total_length += finished_step # 累计每个 episode(回合)实际运行的步数

# 是否完成任务(Early Stop)
# 如果最近 log_interval 个回合的总奖励超过设定阈值,认为已经训练成功,提前结束并保存模型
if running_reward > (log_interval * solved_reward):
print("########## Solved! ##########")
torch.save(ppo.policy.state_dict(), './PPO_{}.pth'.format(env_name))
break

# 打印训练日志,
# 每隔 log_interval 回合打印一次平均奖励和步数
# 清零统计变量,进入下一轮计算
if i_episode % log_interval == 0:
avg_length = int(total_length / log_interval)
running_reward = int((running_reward / log_interval))
print('Episode {} \t avg length: {} \t reward: {}'.format(i_episode, avg_length, running_reward))
running_reward = 0
total_length = 0

2.训练日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Episode 20 	 avg length: 95 	 reward: -264
Episode 40 avg length: 92 reward: -173
Episode 60 avg length: 90 reward: -147
Episode 80 avg length: 93 reward: -134
...
Episode 420 avg length: 99 reward: -70
Episode 440 avg length: 102 reward: -68
Episode 460 avg length: 113 reward: -70
...
Episode 600 avg length: 216 reward: 6
Episode 620 avg length: 237 reward: 68
Episode 640 avg length: 261 reward: 100
...
Episode 820 avg length: 238 reward: 101
Episode 840 avg length: 230 reward: 103
Episode 860 avg length: 250 reward: 104
...
Episode 1360 avg length: 213 reward: 164
Episode 1380 avg length: 232 reward: 174
Episode 1400 avg length: 239 reward: 214
########## Solved! ##########

8.模型验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def start_lunar_lander():  

env_name = "LunarLander-v3"
env = gym.make(env_name, render_mode="human")

state_dim = env.observation_space.shape[0] # 状态向量长度(=8)
action_dim = 4 # 动作数量(4 个离散动作)

n_latent_var = 64 # 隐藏层宽度
lr = 0.0007 # 学习率(略小于训练时使用的 0.002,测试时不重要)
betas = (0.9, 0.999) # Adam 优化器参数
gamma = 0.99 # 奖励折扣因子
K_epochs = 4  # 每次更新策略训练的轮数(此处无用,因不训练)
eps_clip = 0.2 # PPO 的剪切比例


n_episodes = 3 # 测试回合数(让模型飞 3 次)
max_timesteps = 300 # 每次最多模拟 300 步

filename = f"PPO_{env_name}.pth"
directory = "./"

memory = Memory() # 初始化空记忆(虽然不训练,但 act() 函数需要它)
ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip) # 构建 PPO 模型结构
ppo.policy_old.load_state_dict(torch.load(os.path.join(directory, filename)), strict=False) # 加载训练好的模型参数

for ep in range(1, n_episodes + 1):
ep_reward = 0 # 当前回合累计奖励
state, _ = env.reset()

# 飞行动作循环
for t in range(max_timesteps):
action = ppo.policy_old.act(state, memory) # 旧策略选择动作
# 执行动作,获取新状态
state, reward, terminated, truncated, _ = env.step(action)
ep_reward += reward

try:
env.render()
pygame.event.pump() # 防止窗口卡死
except pygame.error:
print("Render window was closed unexpectedly.")
break

# 飞船坠毁或安全降落或 truncated:超过最大步数等强制终止
if terminated or truncated:
break

print(f'Episode: {ep}\tReward: {int(ep_reward)}')

env.close() # 正确地在最后统一关闭环境

日志输出:

1
2
3
Episode: 1	Reward: 223
Episode: 2 Reward: 39
Episode: 3 Reward: 261

测试飞行三次,有两次可以安全降落在指定位置。

3.BipedalWalker-v3(连续动作空间案例)

1.环境与任务

BipedalWalker-v3 是 OpenAI Gym 提供的一个经典 连续控制 强化学习环境,任务目标是:控制一个双足机器人在崎岖地形上行走而不摔倒,并尽可能走得远。这是一个对智能体控制能力要求较高的环境。

1.状态空间(state space)

BipedalWalker-v3的状态空间是一个 24 维的浮点向量,包含:

索引 含义 说明
0 机器人躯干角度 单位:弧度,表示身体的旋转角
1 机器人躯干角速度 躯干旋转的速度
2 水平方向速度(vx) 躯干的水平移动速度
3 垂直方向速度(vy) 躯干的垂直移动速度
4 躯干到地面的水平距离 和目标区域的距离偏移
5 躯干到地面的垂直距离 着陆高度
6-13 4 个腿关节的角度(4 个关节) 分别表示前后腿的上/下段关节
14-17 4 个腿关节的角速度 对应角度的变化速度
18-21 4 个地面接触传感器(布尔值) 是否接触地面(每条腿有两个脚趾)
22-23 最近 2 帧的躯干高度差 有些实现中添加,用于平滑动作判断(非标准)

2.动作空间(action space)

BipedalWalker-v3的动作空间是一个 4维连续动作空间,范围是 [-1, 1],分别控制:

  • 左髋关节的推力
  • 左膝关节的推力
  • 右髋关节的推力
  • 右膝关节的推力

每个数值控制一个电机的激活强度(可正可负),表示关节的力和方向。

3.奖励函数(reward)

正奖励:右移动的距离(越走越远奖励越多),控制越稳,步态越自然,奖励越高
负奖励:每一步都会有小的惩罚(惩罚能量浪费),摔倒或行为不稳定将被惩罚或直接终止

4.成功/终止条件

  • 成功:机器人走完整个地形
  • 失败:摔倒或身体某部分触地

5.难点

  1. 动作是连续的:不像“前进/后退”这种离散选择,而是需要精确调控关节的力,这样容错率就低
  2. 地形复杂:每次生成的地图都不同,有坑、坡、凸起,机器人需要学会灵活适应
  3. 需要协调多个关节动作:形成合理步态(如:迈出一条腿的同时另一条支撑)
  4. 平衡控制:摔倒即结束,类似倒立摆任务,强调稳定性与反馈控制

6.相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
env = gym.make("BipedalWalker-v3", render_mode="human")
# 分别对应:左髋 左膝 右髋 右膝
action = [0.9765114, -0.280038, 0.5163014, -1.10301]
state, reward, terminated, truncated, _ = env.step(action)
print(f"state:{state} reward:{reward} terminated:{terminated} truncated:{truncated}")
"""
state:[-0.02104758 -0.03126067 -0.02977715 -0.01362359
0.47771385 1.0004591 0.07091689 -1.0004667
1. 0.37952623 0.9994885 0.07574213
-0.99983853 1. 0.44616896 0.4512359
0.46702808 0.49549717 0.540591 0.60977966
0.71776354 0.896694 1. 1. ]
reward:-0.24856666951812803 terminated:False truncated:False
"""

2.经验缓存

同LunarLander-v3案例的Memory类,也是收集和存储智能体与环境交互过程中的经验数据,为后续的策略更新(训练)提供数据支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Memory:  
def __init__(self):
self.actions = []
self.states = []
self.logprobs = []
self.rewards = []
self.is_terminals = []

def clear_memory(self):
del self.actions[:]
del self.states[:]
del self.logprobs[:]
del self.rewards[:]
del self.is_terminals[:]

3.策略网络结构(Actor)

1
2
3
4
5
6
7
8
self.actor = nn.Sequential(  
nn.Linear(state_dim, 64),
nn.Tanh(),
nn.Linear(64, 32),
nn.Tanh(),
nn.Linear(32, action_dim),
nn.Tanh() # 输出动作均值,范围[-1,1]
)

LunarLander-v3的策略网络结构最大的不同是,前者输出的是输出动作概率分布,后者输出动作均值,范围[-1,1]

特征 PPO_discrete PPO_continuous
输出激活 Softmax Tanh
概率分布 Categorical MultivariateNormal
探索方式 概率采样 方差控制
输出范围 [0,1]概率 [-1,1]均值
动作类型 离散整数 连续向量
1
2
3
4
5
6
7
8
9
10
11
12
13
def act(self, state, memory):  
action_mean = self.actor(state)
cov_mat = torch.diag(self.action_var).to(device) # 固定方差矩阵

dist = MultivariateNormal(action_mean, cov_mat) # 多维正态分布
action = dist.sample()
action_logprob = dist.log_prob(action)

memory.states.append(state)
memory.actions.append(action)
memory.logprobs.append(action_logprob)

return action.detach()

LunarLander-v3的概率分布类型也不同,前者是用的Categorical分类分布(从有限个离散动作中采样)是离散的,后者用的是MultivariateNormal分布(从连续动作空间中采样),是连续的。

4.价值网络结构(Critic)

1
2
3
4
5
6
7
8
self.critic = nn.Sequential(  
nn.Linear(state_dim, 64),
nn.Tanh(),
nn.Linear(64, 32),
nn.Tanh(),
nn.Linear(32, 1)
)
self.action_var = torch.full((action_dim,), action_std * action_std).to(device)

LunarLander-v3中是通过Softmax输出的概率分布自然实现探索,不同动作有不同的采样概率。而这个例子中通过固定的方差矩阵控制探索,网络输出动作均值,方差固定(如0.5²)。

特征 PPO_discrete PPO_continuous
网络结构 基本相同 基本相同
隐藏层 64→64 64→32
输出 状态价值(标量) 状态价值(标量)
功能 完全相同 完全相同

PPO 中策略评估(而非采样)阶段的核心,通常在更新策略前,对旧轨迹重新评估当前策略 π 的行为,以便计算 PPO 的剪切损失函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 给定状态 state 和动作 action,评估当前策略对这些动作的 log 概率、状态值估计,以及策略分布的熵(用于鼓励探索)
def evaluate(self, state, action):
# 计算策略均值
action_mean = self.actor(state)
# 预设或可学习的方差张量
action_var = self.action_var.expand_as(action_mean)
# 构建对角协方差矩阵(只有对角线有值,代表各动作维度独立),用于构建多维正态分布
cov_mat = torch.diag_embed(action_var).to(device)
# 创建一个多维正态分布(动作策略分布),均值为 action_mean,协方差为 cov_mat
dist = MultivariateNormal(action_mean, cov_mat) # 多维正态分布
# 计算 log 概率
action_logprobs = dist.log_prob(action)
# 计算分布的熵
dist_entropy = dist.entropy()
# 评估 Critic 值函数
state_value = self.critic(state)

# action_logprobs:当前策略下该动作的 log 概率(用于策略更新)
# torch.squeeze(state_value) Critic 网络输出的状态值估计
# dist_entropy 当前策略分布的熵(用于鼓励探索)
return action_logprobs, torch.squeeze(state_value), dist_entropy

5.PPO算法实现

1.代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class PPO:  
def __init__(self, state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip):
self.lr = lr
self.betas = betas
self.gamma = gamma # 奖励折扣因子
self.eps_clip = eps_clip # PPO 剪切的 ε 参数
self.K_epochs = K_epochs # 每次更新中进行的迭代次数

# 当前策略
self.policy = ActorCritic(state_dim, action_dim, action_std).to(device)
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas)
# 行动时用的旧策略(不更新)
self.policy_old = ActorCritic(state_dim, action_dim, action_std).to(device)
self.policy_old.load_state_dict(self.policy.state_dict())

self.MseLoss = nn.MSELoss()

def select_action(self, state, memory):
if isinstance(state, tuple):
state = state[0]
state = torch.FloatTensor(np.array(state).reshape(1, -1)).to(device)
# 输出的是一个连续动作向量,如 [0.23, -0.44]
return self.policy_old.act(state, memory).cpu().data.numpy().flatten()

# 这部分与离散 PPO 大体一致
def update(self, memory): # PPO 策略更新
rewards = []
discounted_reward = 0
# Monte Carlo 方式计算回报
for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
if is_terminal:
discounted_reward = 0
discounted_reward = reward + (self.gamma * discounted_reward)
rewards.insert(0, discounted_reward)

rewards = torch.tensor(rewards, dtype=torch.float).to(device)
# 回报被标准化,以减少方差
rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)

# 转换状态、动作、log_prob为 tensor
old_states = torch.squeeze(torch.stack(memory.states).to(device), 1).detach()
old_actions = torch.squeeze(torch.stack(memory.actions).to(device), 1).detach()
old_logprobs = torch.squeeze(torch.stack(memory.logprobs), 1).to(device).detach()

for _ in range(self.K_epochs):
# 评估当前策略
# logprobs: 当前策略对旧动作的 log π(a|s)
# state_values: Critic 输出的 V(s)
# dist_entropy: 多维高斯分布的熵,鼓励策略多样性
logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)

# PPO核心构造surrogate loss+剪切项
# 剪切项控制策略更新幅度,防止新策略偏离旧策略太远,加入 value loss(Critic)和熵项(Actor 探索)
ratios = torch.exp(logprobs - old_logprobs.detach()) # 概率比
advantages = rewards - state_values.detach()
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy
self.optimizer.zero_grad()

# 更新策略参数
loss.mean().backward()
self.optimizer.step()

# 最后更新策略,并将当前策略复制给 policy_old
self.policy_old.load_state_dict(self.policy.state_dict())

2.与离散动作 PPO的区别对比

项目 连续动作版 离散动作版
动作空间 连续(多维向量) 离散(索引,如 0/1/2/3)
策略分布 MultivariateNormal Categorical
动作采样 高斯分布采样:dist.sample() 概率分布采样:dist.sample()
log_prob dist.log_prob(action)(多维) dist.log_prob(action)(离散)
策略输出 动作的均值(actor 输出) 动作的概率分布(softmax)
方差来源 固定或可学习的标准差 action_std 不需要方差
evaluate() 构建多维高斯分布(协方差矩阵) 构建离散分布(Categorical)
适用环境 如 LunarLanderContinuous-v2, BipedalWalker-v3 如 CartPole-v1, LunarLander-v3

6.训练参数与控制变量

1.参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
render = False  # 是否显示图形窗口(训练时通常关闭,测试时打开)
solved_reward = 300 # 如果过去若干轮的平均奖励超过 300,则认为任务完成
log_interval = 20 # 每训练多少回合打印一次日志
max_episodes = 10000 # 最大训练轮数(episode),每轮指一局游戏
max_timesteps = 1500 # 每局最多运行多少步,步数用完或 agent 死亡则 episode 结束

update_timestep = 4000 # 每收集多少个环境步数后执行一次策略网络的更新
action_std = 0.5 # 动作标准差(仅用于连续动作),控制策略输出高斯分布的探索强度。
K_epochs = 80 # 每次策略更新时,进行多少轮优化
eps_clip = 0.2 # PPO 的裁剪范围,用于限制新旧策略的变动,保持训练稳定。
gamma = 0.99 # 奖励折扣因子,控制未来奖励的重要性。0.99 表示非常看重长远回报。

lr = 0.0003 # 学习率
betas = (0.9, 0.999) # 动量

random_seed = None # 随机种子,控制训练的可复现性

2.区别

连续动作(如 BipedalWalker) 离散动作(如 LunarLander) 区别说明 解释
action_std 有,设定高斯分布方差 离散动作不需要方差
K_epochs 通常较大(如 80) 较小(如 4) 连续动作难度大,更新次数多
update_timestep 大(如 4000) 小(如 2000) 连续动作需要积累更多经验再训练
action_dim 实数向量维度 整数类别数 连续动作需要输出一个动作向量
策略分布 MultivariateNormal Categorical 前者生成连续动作,后者选择概率最大的动作
Actor 输出 动作的均值(连续向量) 动作的概率(Softmax) 连续 vs 离散策略网络结构不同

7.训练流程

1.代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def main():  
# 训练参数与控制变量
# ...

# 创建环境与状态空间、动作空间维度
env = gym.make(env_name)
# 状态维度,如 24(对 BipedalWalker)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0] # 连续动作维度,如 4(对 BipedalWalker)

# 设置随机种子(可选)
if random_seed:
print("Random Seed: {}".format(random_seed))
torch.manual_seed(random_seed)
env.seed(random_seed)
np.random.seed(random_seed)

# 初始化 PPO、Memory
memory = Memory()
ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip)

# 创建日志变量
running_reward = 0
avg_length = 0
time_step = 0

# 主训练循环(每个 episode)
for i_episode in range(1, max_episodes + 1):
state = env.reset() # 每轮开始时重置环境
for t in range(max_timesteps):
time_step += 1

# 每个 timestep 采样动作并交互
action = ppo.select_action(state, memory)
next_state, reward, done, truncated, _ = env.step(action)

# 存储经验、更新策略
memory.rewards.append(reward)
memory.is_terminals.append(done)
if time_step % update_timestep == 0:
ppo.update(memory)
memory.clear_memory()
time_step = 0

running_reward += reward

# 渲染与终止处理
if render:
env.render()
if done:
break

avg_length += t

# 保存模型
if running_reward > (log_interval * solved_reward):
print("########## Solved! ##########")
torch.save(ppo.policy.state_dict(), './PPO_continuous_solved_{}.pth'.format(env_name))
break
if i_episode % 500 == 0:
torch.save(ppo.policy.state_dict(), './PPO_continuous_{}.pth'.format(env_name))

# 日志输出
if i_episode % log_interval == 0:
avg_length = int(avg_length / log_interval)
running_reward = int((running_reward / log_interval))

print('Episode {} \t Avg length: {} \t Avg reward: {}'.format(i_episode, avg_length, running_reward))
running_reward = 0
avg_length = 0

2.训练日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Episode 20 	 Avg length: 299 	 Avg reward: -100
Episode 40 Avg length: 448 Avg reward: -96
Episode 60 Avg length: 503 Avg reward: -94
Episode 80 Avg length: 436 Avg reward: -95
Episode 100 Avg length: 516 Avg reward: -91
Episode 120 Avg length: 716 Avg reward: -84
Episode 140 Avg length: 791 Avg reward: -81
Episode 160 Avg length: 1069 Avg reward: -71
Episode 180 Avg length: 1427 Avg reward: -61
Episode 200 Avg length: 1357 Avg reward: -57
Episode 220 Avg length: 1427 Avg reward: -57
Episode 240 Avg length: 1285 Avg reward: -64
Episode 260 Avg length: 1357 Avg reward: -56
Episode 280 Avg length: 1004 Avg reward: -69
Episode 300 Avg length: 1123 Avg reward: -75
Episode 320 Avg length: 744 Avg reward: -90
...

3.与离散动作版本相比

方面 连续动作 main() 离散动作 main() 区别说明
动作类型 实数向量(np.array) 整数(类别编号) 连续动作使用多元高斯策略,离散使用分类策略
action_dim env.action_space.shape[0] 手动设为 4(动作数量) 离散动作为固定离散集合,连续动作为多维实数
策略网络输出 动作均值 μ 动作概率分布 连续输出用于高斯分布采样,离散输出用于 softmax
action_std 需要设定(探索强度) 不使用 连续策略的探索来自方差,离散策略来自随机采样概率
ppo.select_action() 返回实数向量 返回动作索引 两者行为一致,输出形式不同
策略更新频率 通常设较大 update_timestep=4000 通常较小(如 2000) 连续策略收敛慢,需要更多经验和更新步
K_epochs 大(如 80) 小(如 4) 连续动作策略更新更频繁以稳定训练

8.模型验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def render_bipedal_walker():  
env_name = "BipedalWalker-v3"
env = gym.make(env_name, render_mode="human")

state_dim = env.observation_space.shape[0] # 获取状态空间维度(如 24)
action_dim = env.action_space.shape[0] # 动作空间维度(如 4)

n_episodes = 3
max_timesteps = 1500

filename = f"PPO_continuous_{env_name}.pth" # 模型文件
directory = "./"

action_std = 0.5
K_epochs = 80
eps_clip = 0.2
gamma = 0.99
lr = 0.0003
betas = (0.9, 0.999)

memory = Memory()

# 加载训练好的策略模型
ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip)
ppo.policy_old.load_state_dict(torch.load(directory + filename, weights_only=True))

# 运行并渲染每个 Episode
for ep in range(1, n_episodes + 1):
ep_reward = 0
state, _ = env.reset() # 重置环境开始新一轮

# 每一步执行 PPO 策略并交互环境
for t in range(max_timesteps):
action = ppo.select_action(state, memory)
state, reward, terminated, truncated, _ = env.step(action)
ep_reward += reward

try:
env.render()
pygame.event.pump() # 防止 Pygame 卡死或被系统关闭
except pygame.error:
print("⚠️ Pygame window was closed. Skipping rendering.")
break

# 检查是否结束
if terminated or truncated:
break

# 输出当前 episode 总奖励
print(f'Episode: {ep}\tReward: {int(ep_reward)}')

env.close() # 在所有 episode 后统一关闭

日志输出:

1
2
3
Episode: 1	Reward: 37
Episode: 2 Reward: 259
Episode: 3 Reward: 263

4.总结

1.PPO核心优势​

  • 创新剪切机制:通过约束策略更新幅度(概率比剪切),解决策略突变问题
  • 高效稳定:GAE平衡偏差/方差,双网络架构实现多轮数据复用
  • 工程友好:相比TRPO更易实现

2.动作空间关键差异​

特性 离散动作空间 连续动作空间
策略输出 Softmax概率分布 Tanh均值向量
采样分布 Categorical(分类分布) MultivariateNormal(多元正态)
探索机制 概率采样 固定/可学习方差
环境举例 LunarLander(飞船着陆) BipedalWalker(双足行走)
训练参数差异 K_epochs小(≈4) K_epochs大(≈80)

3. 工程实现要点​

  • 离散动作​:输出层Softmax → 动作概率 → Categorical采样 → 动作索引
  • 连续动作​:输出层Tanh → 动作均值 + 固定方差 → 多维正态分布采样 → 动作向量
  • 关键技巧​:通过GAE优化优势估计;通过回报标准化加速收敛;熵奖励项促进探索;通过旧策略冻结保证采样一致性

4. 环境对比​

  • LunarLander​:8维状态/4离散动作,300步内收敛
  • BipedalWalker​:24维状态/4连续动作,需>1500步更复杂训练

PPO通过统一框架适配两类动作空间,其剪切机制和双网络设计在保证稳定性的同时,为机器人控制、游戏AI等领域提供了高效解决方案。
离散/连续实现的差异主要在于策略表征和采样机制。

5.备注

环境:

  • mac: 15.2
  • python: 3.12.4
  • pytorch: 2.5.1
  • numpy: 1.26.4
  • gym: 0.26.2
  • box2d-py: 2.3.8

完整代码:
https://github.com/keychankc/dl_code_for_blog/tree/main/026_PPO_code