1.PPO算法与动作空间类型概览
1.PPO(Proximal Policy Optimization)简介
PPO(近端策略优化)是OpenAI于2017年提出的强化学习算法,通过创新的”剪切目标函数”设计,在保证训练稳定性的同时实现高效策略优化。其核心思想是通过约束策略更新幅度,防止策略突变导致的性能崩溃,解决了传统策略梯度方法(如TRPO)的工程实现复杂性问题。
1.剪切比值机制(Clip Objective)
$$L^{\text{CLIP}}(\theta) = \mathbb{E}_t \left[ \min\left(r_t(\theta) \hat{A}_t,; \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon) \cdot \hat{A}_t\right) \right]$$
其中 $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{\text{old}}}(a_t|s_t)}$ 是新旧策略的概率比值。该设计的目的是:
- 当策略更新幅度在阈值 ϵ(通常取0.1-0.3)内时正常优化
- 当策略更新幅度过大或过小时进行截断,避免策略突变
2.广义优势估计(GAE)
广义优势估计(Generalized Advantage Estimation,简称 GAE)是策略梯度方法中一种用于减少策略训练时方差、提高稳定性的技巧。GAE 的目标是 结合多个 n-step Advantage 的加权平均,在方差和偏差之间取得更好的平衡。
GAE 定义如下:
$$\hat{A}t^{\mathrm{GAE}(\gamma, \lambda)} = \sum{l=0}^{\infty} (\gamma \lambda)^l \delta_{t+l}$$
其中:
- $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$:一阶 TD 残差
- $\lambda \in [0, 1]$:控制偏差与方差的权衡参数
- $\lambda = 0$:使用 1-step TD,偏差大但方差小
- $\lambda = 1$:类似 Monte Carlo 返回,偏差小但方差大
- 介于中间时,效果通常更好(如 PPO 默认 $\lambda = 0.95$)
可以把 GAE 看成是 对未来奖励序列的指数加权平均,越靠近当前步的 TD 残差权重越大,远期的残差权重越小。通过调整 $\lambda$ 来控制对未来的“信任度”。
一个形象的比喻是:
- $\lambda$ 小 → “短视”:更相信当前的反馈
- $\lambda$ 大 → “长视”:更相信未来回报的总体趋势
3.双网络架构与数据复用
- Actor-Critic框架:策略网络(Actor)生成动作,价值网络(Critic)评估状态
- 多轮优化机制:单次采样数据支持3-10轮策略更新,提升样本效率
2.强化学习中的两类动作空间
在强化学习中,“动作”就是智能体(agent)在每个时刻可以“做什么”。而动作空间,就是所有可能动作的集合——就像游戏角色能做哪些操作:走、跳、射击、转身……都在动作空间里。
1.离散动作空间(Discrete Action Space)
动作空间是有限个、不连续的动作选项。每个动作就像一个编号。像“菜单点菜”:选一个选项,编号是离散的,不能选半个动作或者1.7号动作。举例:
场景 |
动作空间 |
含义 |
游戏角色控制 |
[0, 1, 2] |
0=前进,1=后退,2=跳跃 |
自动贩售机 |
[0, 1, 2, 3] |
选择四种饮料中的一种 |
黑白棋 |
所有合法落子位置(最多 64 个) |
每一步落子对应一个动作 |
2.连续动作空间(Continuous Action Space)
动作是实数向量,可以是任意值,甚至是多个维度组成的动作向量。像“调音台旋钮”:你可以随意旋转旋钮,控制值是连续的,可以精调到任意数值。举例:
场景 |
动作空间 |
含义 |
机械臂控制 |
[θ₁, θ₂, θ₃] ∈ ℝ³ |
每个关节旋转角度是一个连续值 |
飞船推进 |
[f₁, f₂] ∈ ℝ² |
左右引擎推力,值从 0~1 |
自动驾驶 |
[加速度, 方向盘角度] ∈ ℝ² |
任意控制强度和方向 |
2.LunarLander-v3(离散动作空间案例)

1.环境与任务
LunarLander-v3 是 OpenAI Gym/Gymnasium 中的一个经典强化学习环境,模拟了一个飞船在月球上着陆的任务。任务目标是让飞船安全、平稳地降落在地面中央的着陆平台上。
1.状态空间(state)
一个长度为 8 的浮点向量,表示飞船当前的物理状态:
- x:水平方向的位置
- y:垂直方向的位置
- vx:水平方向速度
- vy:垂直方向速度
- angle:飞船的旋转角度
- angle_vel:飞船的角速度
- left_leg_contact:左腿是否接触地面
- right_leg_contact:右腿是否接触地面
2.动作空间(action space)
动作是离散的,共有 4 个动作:
- 0:不作为
- 2:主引擎向下喷气
- 3:左方向喷气
- 1:右方向喷气
3.奖励机制(reward)
- +100~140:成功降落在平台中央
- -100:摔坏飞船
- -0.3 / 每次喷气:惩罚使用燃料(节省能量)
- +10:每条腿接触地面
所以整个着陆任务的最佳策略是如何平稳、省油地降落在中央区域(两个小旗子中间)。
4.成功/终止条件
成功条件:飞船平稳降落在中央的目标平台,并且两个着陆腿都接触地面
终止条件:飞船成功着陆或坠毁或者飞船飞出屏幕外或者任务时间超限
5.相关参数
1 2 3 4 5 6 7 8 9
| env = gym.make("LunarLander-v3", render_mode="human") action = 3
obs, reward, terminated, truncated, info = env.step(action) x, y, vx, vy, angle, angle_vel, left_leg, right_leg = obs print(f"横向:{x}, 纵向:{y}, 水平速度:{vx}, 垂直速度:{vy}, 朝向角度:{angle}, \n角速度:{angle_vel}, 左支架:{left_leg}, 右支架:{right_leg},action:{action},得分:{reward}") """ 横向:-0.18395118415355682, 纵向:0.003272805130109191, 水平速度:-0.005371610634028912, 垂直速度:-0.001628089346922934, 朝向角度:-1.9359147548675537, 角速度:0.017528165131807327, 左支架:1.0, 右支架:0.0,action:3,得分:-100 """
|
2.经验缓存
Memory 类的作用是收集和存储智能体与环境交互过程中的经验数据,这也是为后续的策略更新(训练)提供数据支持。主要功能如下:
1. 主要用途
- 存储一批采样数据:包括每一步的状态、动作、动作概率、奖励、是否终止等信息
- 支持多轮优化:采集到一批数据后,可以多次利用这些数据进行策略网络的更新,提高样本利用率
- 便于批量处理:将采集到的数据整理为张量,方便后续神经网络的批量训练
2.代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class Memory: def __init__(self): self.actions = [] self.states = [] self.logprobs = [] self.rewards = [] self.is_terminals = [] def clear_memory(self): del self.actions[:] del self.states[:] del self.logprobs[:] del self.rewards[:] del self.is_terminals[:]
|
3.工作流程
- 采样阶段:智能体与环境交互,每一步都把当前的状态、动作、动作概率、奖励、是否终止等信息存入Memory
- 更新阶段:达到一定步数后(如update_timestep),用Memory中存储的数据进行多轮策略更新
- 清空阶段:更新完毕后,调用clear_memory()清空缓存,为下一批采样做准备
3.策略网络结构(Actor)
在PPO中,策略网络(Actor)的作用是:输入当前环境状态,输出每个可选动作的概率分布。
对于LunarLander-v3(离散动作空间),策略网络输出4个动作的概率。
1.代码实现
1 2 3 4 5 6 7 8 9 10 11
|
self.action_layer = nn.Sequential( nn.Linear(state_dim, n_latent_var), nn.Tanh(), nn.Linear(n_latent_var, n_latent_var), nn.Tanh(), nn.Linear(n_latent_var, action_dim), nn.Softmax(dim=-1) )
|
策略网络输入8维状态特征,输出4维动作概率分布
2.前向推理与动作采样
先输入状态,经过上述网络,得到4个动作的概率分布,再用Categorical分布根据概率采样动作,实现探索,最后将采样的动作、对应的对数概率等信息存入Memory,用于后续PPO更新。
1 2 3 4 5 6 7 8 9 10 11 12 13
| def act(self, state, memory): state = torch.from_numpy(state).float().to(device) action_probs = self.action_layer(state) dist = Categorical(action_probs) action = dist.sample() memory.states.append(state) memory.actions.append(action) memory.logprobs.append(dist.log_prob(action)) return action.item()
|
Categorical 策略是:
“当前状态下,每个动作的概率是多少”,然后按这个分布随机选择一个动作。
torch.distributions.Categorical
是 PyTorch 中的一个离散概率分布类,用于处理一维离散随机变量。举例:
1 2 3
| probs = torch.tensor([0.1, 0.7, 0.2]) dist = Categorical(probs) action = dist.sample()
|
当执行 dist.sample(),我们就“按概率抽奖”选出一个动作编号。例如抽到action为1的概率是0.7.
3.设计逻辑与优点
- 多层感知机的结构,适合处理中等复杂度的状态特征
- 使用Tanh激活函数,有助于稳定训练
- 采用Softmax输出,天然适配离散动作空间
- 使用的概率采样,支持策略的探索性
LunarLander-v3的策略网络本质是一个两层隐藏层的全连接神经网络,输入状态,输出每个动作的概率分布。
4.价值网络结构(Critic)
输入当前环境状态,输出该状态的“价值估计”(state value),即智能体在该状态下能获得的期望累计回报。这个数值用于计算优势函数(Advantage),指导策略(Actor)如何去优化。
1.代码实现
1 2 3 4 5 6 7 8 9 10 11
|
self.value_layer = nn.Sequential( nn.Linear(state_dim, n_latent_var), nn.Tanh(), nn.Linear(n_latent_var, n_latent_var), nn.Tanh(), nn.Linear(n_latent_var, 1) )
|
输入一个状态向量,经过上述网络,输出一个标量,表示该状态的价值估计。这个值用于PPO损失函数中的优势计算和价值损失部分。
2.评估旧策略
评估旧策略在给定状态下选出某个动作的概率,以及当前状态的价值估计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| def evaluate(self, state, action): action_probs = self.action_layer(state)
dist = Categorical(action_probs)
action_logprobs = dist.log_prob(action)
dist_entropy = dist.entropy()
state_value = self.value_layer(state)
return action_logprobs, torch.squeeze(state_value), dist_entropy
|
evaluate() 是 PPO 算法在 训练更新阶段的核心评估函数,它用于:
- 计算 log概率(用于 PPO 的 clip ratio)
- 计算 state value(用于 advantage)
- 计算 entropy(用于策略的探索奖励)
这些值都将直接进入 PPO 的损失函数,指导策略网络和价值网络进行反向传播和参数更新。
5.PPO算法实现
1.代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| class PPO: def __init__(self, state_dim, action_dim, n_latent_var, lr, betas, gamma, k_epochs, eps_clip): self.lr = lr self.betas = betas self.gamma = gamma self.eps_clip = eps_clip self.K_epochs = k_epochs self.policy = ActorCritic(state_dim, action_dim, n_latent_var).to(device) self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas) self.policy_old = ActorCritic(state_dim, action_dim, n_latent_var).to(device) self.policy_old.load_state_dict(self.policy.state_dict()) self.MseLoss = nn.MSELoss() def update(self, memory): rewards = [] discounted_reward = 0 for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)): if is_terminal: discounted_reward = 0 discounted_reward = reward + (self.gamma * discounted_reward) rewards.insert(0, discounted_reward) rewards = torch.tensor(rewards, dtype=torch.float32).to(device) rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5) old_states = torch.stack(memory.states).to(device).detach() old_actions = torch.stack(memory.actions).to(device).detach() old_logprobs = torch.stack(memory.logprobs).to(device).detach() for _ in range(self.K_epochs): logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions) ratios = torch.exp(logprobs - old_logprobs.detach()) advantages = rewards - state_values.detach()
surr1 = ratios * advantages surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy self.optimizer.zero_grad() loss.mean().backward() self.optimizer.step() self.policy_old.load_state_dict(self.policy.state_dict())
|
PPO.update()
代码的核心是:用采样轨迹计算 advantage,构造 clipped surrogate loss,在保证稳定更新的前提下优化 actor-critic 网络。
2.举例说说 PPO 的ratio 剪切策略
在PPO算法中,策略更新的核心是限制新旧策略之间的变化幅度,以避免剧烈更新导致策略崩溃。这一策略的核心公式之一为:
$$\text{ratio} = \frac{\pi_{\text{new}}(a|s)}{\pi_{\text{old}}(a|s)}$$
该比率表示新策略与旧策略在相同状态下选择某个动作的概率比值。
在 LunarLander-v3 环境中,飞船需要根据当前的状态(如高度、角度、速度)做出决策,选择激活哪些推进器以实现平稳着陆。策略网络 π 就是用于在每个状态下输出相应动作分布的模型。
在一次回合中,旧策略 π_old 对某个状态 s 采样了一个动作 a,比如“点燃主推进器”。随后,训练过程根据实际的环境反馈计算出该动作的优势(Advantage),即该动作相对于当前状态下平均动作的好坏程度。
策略更新不宜过快
如果该动作的优势为正(说明该动作是“好”的),策略应当提高该动作的概率以鼓励重复选择。然而,如果直接按照策略梯度进行无约束更新,可能会导致该动作的概率被显著放大,从而造成策略过拟合于当前经验,降低策略在未见过情形下的泛化能力,甚至使策略陷入崩溃。
PPO 中的剪切机制
为了抑制策略剧烈更新,PPO 引入了一个“剪切”机制,即限制 ratio 的值在一个范围内(例如 [0.8, 1.2])。当更新导致的概率比率超过该范围时,会使用边界值替代,以实现“保守更新”。
这种剪切机制可类比为飞船的姿态控制系统:
情形 |
含义 |
PPO 策略行为 |
$\text{ratio} \approx 1$ |
当前策略与旧策略差异不大,动作选择变化平稳 |
正常更新,鼓励优势动作 |
$\text{ratio} \gg 1$ |
策略剧烈放大某一动作的概率,容易造成策略偏移或不稳定 |
剪切 ratio,抑制剧烈变化 |
$\text{ratio} \ll 1$ |
策略对某一动作严重抑制,可能过度惩罚 |
剪切后保持稳定下降,避免过度惩罚 |
6.训练参数与控制变量
1.训练参数
1 2 3 4 5 6 7 8 9
| state_dim = env.observation_space.shape[0] action_dim = 4 n_latent_var = 64 lr = 0.002 betas = (0.9, 0.999) gamma = 0.99 k_epochs = 4 eps_clip = 0.2 random_seed = None
|
2.控制变量
1 2 3 4 5 6
| render = False solved_reward = 230 log_interval = 20 max_episodes = 50000 max_timesteps = 300 update_timestep = 2000
|
7.训练流程
1.代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| def main():
if random_seed is not None: torch.manual_seed(random_seed) env.reset(seed=random_seed) print(f"PyTorch随机数:{torch.rand(1).item()}, 环境初始状态:{env.reset()[0][:2]}") memory = Memory() ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, k_epochs, eps_clip) running_reward = 0 total_length = 0 timestep = 0 finished_step = 0 for i_episode in range(1, max_episodes + 1): state, _ = env.reset() for t in range(max_timesteps): finished_step = t timestep += 1 action = ppo.policy_old.act(state, memory) next_state, reward, terminated, truncated, _ = env.step(action) done = terminated or truncated state = next_state memory.rewards.append(reward) memory.is_terminals.append(done) if timestep % update_timestep == 0: ppo.update(memory) memory.clear_memory() timestep = 0 running_reward += reward if render: env.render() if done: break total_length += finished_step if running_reward > (log_interval * solved_reward): print("########## Solved! ##########") torch.save(ppo.policy.state_dict(), './PPO_{}.pth'.format(env_name)) break if i_episode % log_interval == 0: avg_length = int(total_length / log_interval) running_reward = int((running_reward / log_interval)) print('Episode {} \t avg length: {} \t reward: {}'.format(i_episode, avg_length, running_reward)) running_reward = 0 total_length = 0
|
2.训练日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Episode 20 avg length: 95 reward: -264 Episode 40 avg length: 92 reward: -173 Episode 60 avg length: 90 reward: -147 Episode 80 avg length: 93 reward: -134 ... Episode 420 avg length: 99 reward: -70 Episode 440 avg length: 102 reward: -68 Episode 460 avg length: 113 reward: -70 ... Episode 600 avg length: 216 reward: 6 Episode 620 avg length: 237 reward: 68 Episode 640 avg length: 261 reward: 100 ... Episode 820 avg length: 238 reward: 101 Episode 840 avg length: 230 reward: 103 Episode 860 avg length: 250 reward: 104 ... Episode 1360 avg length: 213 reward: 164 Episode 1380 avg length: 232 reward: 174 Episode 1400 avg length: 239 reward: 214 #
|
8.模型验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| def start_lunar_lander():
env_name = "LunarLander-v3" env = gym.make(env_name, render_mode="human") state_dim = env.observation_space.shape[0] action_dim = 4 n_latent_var = 64 lr = 0.0007 betas = (0.9, 0.999) gamma = 0.99 K_epochs = 4 eps_clip = 0.2
n_episodes = 3 max_timesteps = 300 filename = f"PPO_{env_name}.pth" directory = "./" memory = Memory() ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip) ppo.policy_old.load_state_dict(torch.load(os.path.join(directory, filename)), strict=False) for ep in range(1, n_episodes + 1): ep_reward = 0 state, _ = env.reset()
for t in range(max_timesteps): action = ppo.policy_old.act(state, memory) state, reward, terminated, truncated, _ = env.step(action) ep_reward += reward try: env.render() pygame.event.pump() except pygame.error: print("Render window was closed unexpectedly.") break
if terminated or truncated: break print(f'Episode: {ep}\tReward: {int(ep_reward)}') env.close()
|
日志输出:
1 2 3
| Episode: 1 Reward: 223 Episode: 2 Reward: 39 Episode: 3 Reward: 261
|
测试飞行三次,有两次可以安全降落在指定位置。
3.BipedalWalker-v3(连续动作空间案例)

1.环境与任务
BipedalWalker-v3 是 OpenAI Gym 提供的一个经典 连续控制 强化学习环境,任务目标是:控制一个双足机器人在崎岖地形上行走而不摔倒,并尽可能走得远。这是一个对智能体控制能力要求较高的环境。
1.状态空间(state space)
BipedalWalker-v3的状态空间是一个 24 维的浮点向量,包含:
索引 |
含义 |
说明 |
0 |
机器人躯干角度 |
单位:弧度,表示身体的旋转角 |
1 |
机器人躯干角速度 |
躯干旋转的速度 |
2 |
水平方向速度(vx) |
躯干的水平移动速度 |
3 |
垂直方向速度(vy) |
躯干的垂直移动速度 |
4 |
躯干到地面的水平距离 |
和目标区域的距离偏移 |
5 |
躯干到地面的垂直距离 |
着陆高度 |
6-13 |
4 个腿关节的角度(4 个关节) |
分别表示前后腿的上/下段关节 |
14-17 |
4 个腿关节的角速度 |
对应角度的变化速度 |
18-21 |
4 个地面接触传感器(布尔值) |
是否接触地面(每条腿有两个脚趾) |
22-23 |
最近 2 帧的躯干高度差 |
有些实现中添加,用于平滑动作判断(非标准) |
2.动作空间(action space)
BipedalWalker-v3的动作空间是一个 4维连续动作空间,范围是 [-1, 1],分别控制:
- 左髋关节的推力
- 左膝关节的推力
- 右髋关节的推力
- 右膝关节的推力
每个数值控制一个电机的激活强度(可正可负),表示关节的力和方向。
3.奖励函数(reward)
正奖励:右移动的距离(越走越远奖励越多),控制越稳,步态越自然,奖励越高
负奖励:每一步都会有小的惩罚(惩罚能量浪费),摔倒或行为不稳定将被惩罚或直接终止
4.成功/终止条件
- 成功:机器人走完整个地形
- 失败:摔倒或身体某部分触地
5.难点
- 动作是连续的:不像“前进/后退”这种离散选择,而是需要精确调控关节的力,这样容错率就低
- 地形复杂:每次生成的地图都不同,有坑、坡、凸起,机器人需要学会灵活适应
- 需要协调多个关节动作:形成合理步态(如:迈出一条腿的同时另一条支撑)
- 平衡控制:摔倒即结束,类似倒立摆任务,强调稳定性与反馈控制
6.相关参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| env = gym.make("BipedalWalker-v3", render_mode="human")
action = [0.9765114, -0.280038, 0.5163014, -1.10301] state, reward, terminated, truncated, _ = env.step(action) print(f"state:{state} reward:{reward} terminated:{terminated} truncated:{truncated}") """ state:[-0.02104758 -0.03126067 -0.02977715 -0.01362359 0.47771385 1.0004591 0.07091689 -1.0004667 1. 0.37952623 0.9994885 0.07574213 -0.99983853 1. 0.44616896 0.4512359 0.46702808 0.49549717 0.540591 0.60977966 0.71776354 0.896694 1. 1. ] reward:-0.24856666951812803 terminated:False truncated:False """
|
2.经验缓存
同LunarLander-v3案例的Memory类,也是收集和存储智能体与环境交互过程中的经验数据,为后续的策略更新(训练)提供数据支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class Memory: def __init__(self): self.actions = [] self.states = [] self.logprobs = [] self.rewards = [] self.is_terminals = [] def clear_memory(self): del self.actions[:] del self.states[:] del self.logprobs[:] del self.rewards[:] del self.is_terminals[:]
|
3.策略网络结构(Actor)
1 2 3 4 5 6 7 8
| self.actor = nn.Sequential( nn.Linear(state_dim, 64), nn.Tanh(), nn.Linear(64, 32), nn.Tanh(), nn.Linear(32, action_dim), nn.Tanh() )
|
与LunarLander-v3
的策略网络结构最大的不同是,前者输出的是输出动作概率分布,后者输出动作均值,范围[-1,1]
特征 |
PPO_discrete |
PPO_continuous |
输出激活 |
Softmax |
Tanh |
概率分布 |
Categorical |
MultivariateNormal |
探索方式 |
概率采样 |
方差控制 |
输出范围 |
[0,1]概率 |
[-1,1]均值 |
动作类型 |
离散整数 |
连续向量 |
1 2 3 4 5 6 7 8 9 10 11 12 13
| def act(self, state, memory): action_mean = self.actor(state) cov_mat = torch.diag(self.action_var).to(device) dist = MultivariateNormal(action_mean, cov_mat) action = dist.sample() action_logprob = dist.log_prob(action) memory.states.append(state) memory.actions.append(action) memory.logprobs.append(action_logprob) return action.detach()
|
与LunarLander-v3
的概率分布类型也不同,前者是用的Categorical
分类分布(从有限个离散动作中采样)是离散的,后者用的是MultivariateNormal分布(从连续动作空间中采样),是连续的。
4.价值网络结构(Critic)
1 2 3 4 5 6 7 8
| self.critic = nn.Sequential( nn.Linear(state_dim, 64), nn.Tanh(), nn.Linear(64, 32), nn.Tanh(), nn.Linear(32, 1) ) self.action_var = torch.full((action_dim,), action_std * action_std).to(device)
|
LunarLander-v3
中是通过Softmax输出的概率分布自然实现探索,不同动作有不同的采样概率。而这个例子中通过固定的方差矩阵控制探索,网络输出动作均值,方差固定(如0.5²)。
特征 |
PPO_discrete |
PPO_continuous |
网络结构 |
基本相同 |
基本相同 |
隐藏层 |
64→64 |
64→32 |
输出 |
状态价值(标量) |
状态价值(标量) |
功能 |
完全相同 |
完全相同 |
PPO 中策略评估(而非采样)阶段的核心,通常在更新策略前,对旧轨迹重新评估当前策略 π 的行为,以便计算 PPO 的剪切损失函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| def evaluate(self, state, action): action_mean = self.actor(state) action_var = self.action_var.expand_as(action_mean) cov_mat = torch.diag_embed(action_var).to(device) dist = MultivariateNormal(action_mean, cov_mat) action_logprobs = dist.log_prob(action) dist_entropy = dist.entropy() state_value = self.critic(state)
return action_logprobs, torch.squeeze(state_value), dist_entropy
|
5.PPO算法实现
1.代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| class PPO: def __init__(self, state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip): self.lr = lr self.betas = betas self.gamma = gamma self.eps_clip = eps_clip self.K_epochs = K_epochs
self.policy = ActorCritic(state_dim, action_dim, action_std).to(device) self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas) self.policy_old = ActorCritic(state_dim, action_dim, action_std).to(device) self.policy_old.load_state_dict(self.policy.state_dict()) self.MseLoss = nn.MSELoss() def select_action(self, state, memory): if isinstance(state, tuple): state = state[0] state = torch.FloatTensor(np.array(state).reshape(1, -1)).to(device) return self.policy_old.act(state, memory).cpu().data.numpy().flatten()
def update(self, memory): rewards = [] discounted_reward = 0 for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)): if is_terminal: discounted_reward = 0 discounted_reward = reward + (self.gamma * discounted_reward) rewards.insert(0, discounted_reward) rewards = torch.tensor(rewards, dtype=torch.float).to(device) rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5) old_states = torch.squeeze(torch.stack(memory.states).to(device), 1).detach() old_actions = torch.squeeze(torch.stack(memory.actions).to(device), 1).detach() old_logprobs = torch.squeeze(torch.stack(memory.logprobs), 1).to(device).detach() for _ in range(self.K_epochs): logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions) ratios = torch.exp(logprobs - old_logprobs.detach()) advantages = rewards - state_values.detach() surr1 = ratios * advantages surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy self.optimizer.zero_grad()
loss.mean().backward() self.optimizer.step() self.policy_old.load_state_dict(self.policy.state_dict())
|
2.与离散动作 PPO的区别对比
项目 |
连续动作版 |
离散动作版 |
动作空间 |
连续(多维向量) |
离散(索引,如 0/1/2/3) |
策略分布 |
MultivariateNormal |
Categorical |
动作采样 |
高斯分布采样:dist.sample() |
概率分布采样:dist.sample() |
log_prob |
dist.log_prob(action)(多维) |
dist.log_prob(action)(离散) |
策略输出 |
动作的均值(actor 输出) |
动作的概率分布(softmax) |
方差来源 |
固定或可学习的标准差 action_std |
不需要方差 |
evaluate() |
构建多维高斯分布(协方差矩阵) |
构建离散分布(Categorical) |
适用环境 |
如 LunarLanderContinuous-v2, BipedalWalker-v3 |
如 CartPole-v1, LunarLander-v3 |
6.训练参数与控制变量
1.参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| render = False solved_reward = 300 log_interval = 20 max_episodes = 10000 max_timesteps = 1500 update_timestep = 4000 action_std = 0.5 K_epochs = 80 eps_clip = 0.2 gamma = 0.99 lr = 0.0003 betas = (0.9, 0.999) random_seed = None
|
2.区别
连续动作(如 BipedalWalker) |
离散动作(如 LunarLander) |
区别说明 |
解释 |
action_std |
有,设定高斯分布方差 |
无 |
离散动作不需要方差 |
K_epochs |
通常较大(如 80) |
较小(如 4) |
连续动作难度大,更新次数多 |
update_timestep |
大(如 4000) |
小(如 2000) |
连续动作需要积累更多经验再训练 |
action_dim |
实数向量维度 |
整数类别数 |
连续动作需要输出一个动作向量 |
策略分布 |
MultivariateNormal |
Categorical |
前者生成连续动作,后者选择概率最大的动作 |
Actor 输出 |
动作的均值(连续向量) |
动作的概率(Softmax) |
连续 vs 离散策略网络结构不同 |
7.训练流程
1.代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| def main(): env = gym.make(env_name) state_dim = env.observation_space.shape[0] action_dim = env.action_space.shape[0]
if random_seed: print("Random Seed: {}".format(random_seed)) torch.manual_seed(random_seed) env.seed(random_seed) np.random.seed(random_seed)
memory = Memory() ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip) running_reward = 0 avg_length = 0 time_step = 0 for i_episode in range(1, max_episodes + 1): state = env.reset() for t in range(max_timesteps): time_step += 1 action = ppo.select_action(state, memory) next_state, reward, done, truncated, _ = env.step(action) memory.rewards.append(reward) memory.is_terminals.append(done) if time_step % update_timestep == 0: ppo.update(memory) memory.clear_memory() time_step = 0 running_reward += reward
if render: env.render() if done: break avg_length += t if running_reward > (log_interval * solved_reward): print("########## Solved! ##########") torch.save(ppo.policy.state_dict(), './PPO_continuous_solved_{}.pth'.format(env_name)) break if i_episode % 500 == 0: torch.save(ppo.policy.state_dict(), './PPO_continuous_{}.pth'.format(env_name))
if i_episode % log_interval == 0: avg_length = int(avg_length / log_interval) running_reward = int((running_reward / log_interval)) print('Episode {} \t Avg length: {} \t Avg reward: {}'.format(i_episode, avg_length, running_reward)) running_reward = 0 avg_length = 0
|
2.训练日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Episode 20 Avg length: 299 Avg reward: -100 Episode 40 Avg length: 448 Avg reward: -96 Episode 60 Avg length: 503 Avg reward: -94 Episode 80 Avg length: 436 Avg reward: -95 Episode 100 Avg length: 516 Avg reward: -91 Episode 120 Avg length: 716 Avg reward: -84 Episode 140 Avg length: 791 Avg reward: -81 Episode 160 Avg length: 1069 Avg reward: -71 Episode 180 Avg length: 1427 Avg reward: -61 Episode 200 Avg length: 1357 Avg reward: -57 Episode 220 Avg length: 1427 Avg reward: -57 Episode 240 Avg length: 1285 Avg reward: -64 Episode 260 Avg length: 1357 Avg reward: -56 Episode 280 Avg length: 1004 Avg reward: -69 Episode 300 Avg length: 1123 Avg reward: -75 Episode 320 Avg length: 744 Avg reward: -90 ...
|
3.与离散动作版本相比
方面 |
连续动作 main() |
离散动作 main() |
区别说明 |
动作类型 |
实数向量(np.array) |
整数(类别编号) |
连续动作使用多元高斯策略,离散使用分类策略 |
action_dim |
env.action_space.shape[0] |
手动设为 4(动作数量) |
离散动作为固定离散集合,连续动作为多维实数 |
策略网络输出 |
动作均值 μ |
动作概率分布 |
连续输出用于高斯分布采样,离散输出用于 softmax |
action_std |
需要设定(探索强度) |
不使用 |
连续策略的探索来自方差,离散策略来自随机采样概率 |
ppo.select_action() |
返回实数向量 |
返回动作索引 |
两者行为一致,输出形式不同 |
策略更新频率 |
通常设较大 update_timestep=4000 |
通常较小(如 2000) |
连续策略收敛慢,需要更多经验和更新步 |
K_epochs |
大(如 80) |
小(如 4) |
连续动作策略更新更频繁以稳定训练 |
8.模型验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| def render_bipedal_walker(): env_name = "BipedalWalker-v3" env = gym.make(env_name, render_mode="human") state_dim = env.observation_space.shape[0] action_dim = env.action_space.shape[0] n_episodes = 3 max_timesteps = 1500 filename = f"PPO_continuous_{env_name}.pth" directory = "./" action_std = 0.5 K_epochs = 80 eps_clip = 0.2 gamma = 0.99 lr = 0.0003 betas = (0.9, 0.999) memory = Memory()
ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip) ppo.policy_old.load_state_dict(torch.load(directory + filename, weights_only=True))
for ep in range(1, n_episodes + 1): ep_reward = 0 state, _ = env.reset()
for t in range(max_timesteps): action = ppo.select_action(state, memory) state, reward, terminated, truncated, _ = env.step(action) ep_reward += reward try: env.render() pygame.event.pump() except pygame.error: print("⚠️ Pygame window was closed. Skipping rendering.") break
if terminated or truncated: break
print(f'Episode: {ep}\tReward: {int(ep_reward)}') env.close()
|
日志输出:
1 2 3
| Episode: 1 Reward: 37 Episode: 2 Reward: 259 Episode: 3 Reward: 263
|
4.总结
1.PPO核心优势
- 创新剪切机制:通过约束策略更新幅度(概率比剪切),解决策略突变问题
- 高效稳定:GAE平衡偏差/方差,双网络架构实现多轮数据复用
- 工程友好:相比TRPO更易实现
2.动作空间关键差异
特性 |
离散动作空间 |
连续动作空间 |
策略输出 |
Softmax概率分布 |
Tanh均值向量 |
采样分布 |
Categorical(分类分布) |
MultivariateNormal(多元正态) |
探索机制 |
概率采样 |
固定/可学习方差 |
环境举例 |
LunarLander(飞船着陆) |
BipedalWalker(双足行走) |
训练参数差异 |
K_epochs小(≈4) |
K_epochs大(≈80) |
3. 工程实现要点
- 离散动作:输出层Softmax → 动作概率 → Categorical采样 → 动作索引
- 连续动作:输出层Tanh → 动作均值 + 固定方差 → 多维正态分布采样 → 动作向量
- 关键技巧:通过GAE优化优势估计;通过回报标准化加速收敛;熵奖励项促进探索;通过旧策略冻结保证采样一致性
4. 环境对比
- LunarLander:8维状态/4离散动作,300步内收敛
- BipedalWalker:24维状态/4连续动作,需>1500步更复杂训练
PPO通过统一框架适配两类动作空间,其剪切机制和双网络设计在保证稳定性的同时,为机器人控制、游戏AI等领域提供了高效解决方案。
离散/连续实现的差异主要在于策略表征和采样机制。
5.备注
环境:
- mac: 15.2
- python: 3.12.4
- pytorch: 2.5.1
- numpy: 1.26.4
- gym: 0.26.2
- box2d-py: 2.3.8
完整代码:
https://github.com/keychankc/dl_code_for_blog/tree/main/026_PPO_code