学习文件engine/engine.py (209 行), engine/graph.py (145 行), engine/sample.py (76 行), models/base.py (15 行), models/llama.py (86 行)


1. Engine 的核心职责

Engine 是整个推理系统的数据平面,负责实际执行 GPU 计算:

1.1 职责定位

  • Scheduler:控制平面,决定"做什么"(调度哪些请求)
  • Engine:数据平面,执行"怎么做"(前向传播 + 采样)

1.2 核心功能

  1. 模型加载和权重管理
  2. KV Cache 内存分配
  3. 前向传播执行
  4. Token 采样
  5. CUDA Graph 优化
  6. 跨 Stream 异步同步

2. Engine 初始化流程

2.1 初始化的 6 个步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, config: EngineConfig):
# 1. 设置设备和通信
self.device = torch.device(f"cuda:{config.tp_info.rank}")
torch.cuda.set_device(self.device)
self.stream = torch.cuda.Stream()
self.tp_cpu_group = self._init_communication(config)

# 2. 获取初始内存
init_free_memory = self._sync_get_memory()[1]

# 3. 在 meta 设备上创建模型
with torch.device("meta"), torch_dtype(config.dtype):
self.model = create_model(config.model_path, config.model_config)

# 4. 加载权重到 GPU
self.model.load_state_dict(self._load_weight_state_dict(config))

# 5. 计算 KV Cache 页数
self.num_pages = self._determine_num_pages(init_free_memory, config)

# 6. 初始化 KV Cache、Attention Backend、Context
self.kv_cache = create_kvcache(...)
self.attn_backend = create_attention_backend(...)
self.ctx = Context(...)

# 7. 初始化 Sampler 和 CUDA Graph Runner
self.sampler = Sampler(...)
self.graph_runner = GraphRunner(...)

2.2 为什么用 "meta" 设备?

传统方式(不用 meta):

1
CPU 上创建模型 → 分配 CPU 内存 → 加载权重到 CPU → 拷贝到 GPU → 释放 CPU 内存

使用 meta 设备

1
只创建模型结构(无内存分配)→ 直接加载权重到 GPU

好处

  1. 节省 CPU 内存
  2. 避免 CPU→GPU 拷贝时间
  3. 加载速度更快

3. KV Cache 页数计算

3.1 计算公式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _determine_num_pages(self, old_free_memory: int, config: EngineConfig) -> int:
new_free_memory = self._sync_get_memory()[1]

# 1. 计算单页 KV Cache 大小
cache_per_page = (
2 # key + value
* self.model_config.head_dim
* divide_even(self.model_config.num_kv_heads, config.tp_info.size)
* config.page_size
* self.dtype.itemsize
* self.model_config.num_layers
)

# 2. 计算模型占用内存
model_memory = old_free_memory - new_free_memory

# 3. 计算可用内存
available_memory = int(config.memory_ratio * old_free_memory) - model_memory

# 4. 计算页数
num_pages = available_memory // cache_per_page

return num_pages

3.2 关键点

为什么用 old_free_memory - new_free_memory

  • old_free_memory:加载模型之前的空闲内存
  • new_free_memory:加载模型之后的空闲内存
  • 差值就是模型占用的内存

为什么用 memory_ratio

  • 预留内存给中间计算(如 Attention 的临时 Tensor)
  • 防止内存碎片导致分配失败
  • 典型值:0.85-0.90

为什么用 max_free_memory 而不是 min_free_memory

  • 计算 model_memory 时,用最乐观估计(模型占用最少)
  • 这样 available_memory 不会高估
  • 保证所有 TP rank 都有足够内存

4. forward_batch 流程

4.1 完整流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def forward_batch(self, batch: Batch, args: BatchSamplingArgs) -> ForwardOutput:
# 1. 确保在正确的 Stream 上
assert torch.cuda.current_stream() == self.stream

# 2. 执行前向传播
with self.ctx.forward_batch(batch):
if self.graph_runner.can_use_cuda_graph(batch):
logits = self.graph_runner.replay(batch) # 使用 CUDA Graph
else:
logits = self.model.forward() # 普通前向传播

# 3. 更新请求状态
for req in batch.reqs:
req.complete_one() # output_len -= 1

# 4. 采样得到 next_token
next_tokens_gpu = self.sampler.sample(logits[: batch.size], args).to(torch.int32)

# 5. 异步拷贝到 CPU
next_tokens_cpu = next_tokens_gpu.to("cpu", non_blocking=True)

# 6. 记录 Event 用于跨 Stream 同步
copy_done_event = torch.cuda.Event()
copy_done_event.record(self.stream)

return ForwardOutput(next_tokens_gpu, next_tokens_cpu, copy_done_event)

4.2 为什么 complete_one() 在采样之前?

时序关系

1
前向传播 → complete_one() 更新状态 → 采样 → 返回给 Scheduler → filter_reqs() → 下一轮调度

原因

  • complete_one() 更新 req.output_len -= 1
  • Scheduler 在下一轮调度时,调用 decode_manager.filter_reqs() 过滤已完成请求
  • 如果在采样之后调用,Scheduler 会误判请求未完成,继续调度

核心:及时更新请求状态,让 Scheduler 正确判断是否继续生成。


5. 跨 Stream 异步同步

5.1 为什么需要 Event?

如果直接在 engine.stream 上等待拷贝

1
2
engine.stream:    [前向传播] [等待拷贝] [空闲] [下一批次前向传播]
scheduler.stream: [调度] [空闲] [处理结果] [调度]

engine.stream 被阻塞,无法开始下一批次

使用 Event 跨 Stream 同步

1
2
3
engine.stream:    [前向传播] [异步拷贝] [下一批次前向传播] ...
↓ record event
scheduler.stream: [调度] [wait event] [处理结果] [调度] ...

→ 两个 Stream 并行工作,实现 Overlap Scheduling

5.2 Event 的使用者

1
2
3
4
5
6
7
8
9
# Engine 端(engine.stream)
copy_done_event = torch.cuda.Event()
copy_done_event.record(self.stream) # 记录拷贝完成时间点

# Scheduler 端(scheduler.stream)
def _process_last_data(self, last_data):
copy_done_event.wait() # 等待拷贝完成
# 现在可以安全读取 next_tokens_cpu
self.detokenizer.process(next_tokens_cpu)

关键engine.stream 不等待拷贝,立即开始下一批次;scheduler.stream 等待拷贝完成后处理结果。


6. _sync_get_memory() 的巧妙优化

6.1 问题

在 TP 场景下,需要获取所有 rank 的:

  • 最小空闲内存(min_free_memory
  • 最大空闲内存(max_free_memory

如果分开做,需要 2 次 all_reduce(一次 MIN,一次 MAX)。

6.2 优化方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _sync_get_memory(self) -> Tuple[int, int]:
free_memory = get_free_memory(self.device)

# 巧妙:用负数转换 MAX 为 MIN
free_mem_tensor = torch.tensor([free_memory, -free_memory], device="cpu", dtype=torch.int64)

# 只需要 1 次 all_reduce
torch.distributed.all_reduce(
free_mem_tensor, op=torch.distributed.ReduceOp.MIN, group=self.tp_cpu_group
)

min_free_memory = int(free_mem_tensor[0].item()) # 直接是 min
max_free_memory = -int(free_mem_tensor[1].item()) # 负负得正,得到 max

return min_free_memory, max_free_memory

原理

  • MIN(free_memory) → 得到最小值
  • MIN(-free_memory) → 得到负的最大值
  • 再取负 → 得到最大值

好处:只需要 1 次 all_reduce,减少通信开销。


7. dummy_req 和 dummy_page

7.1 为什么需要 dummy_req?

CUDA Graph 要求固定的 batch size,但实际请求数量是动态的。

解决方案

1
2
3
4
5
6
7
8
9
self.dummy_req = Req(
input_ids=torch.tensor([0], dtype=torch.int32, device="cpu"),
table_idx=config.max_running_req, # 最后一个 slot
cached_len=0,
output_len=1,
uid=-1,
sampling_params=None,
cache_handle=None,
)

当 batch size < CUDA Graph 的固定 size 时,用 dummy_req 填充。

7.2 为什么需要 dummy_page?

1
self.page_table[self.dummy_req.table_idx].fill_(self.dummy_page)

原因

  • dummy_req 在前向传播时也会访问 page_table
  • 如果 page_table 里是无效索引,会访问非法内存导致崩溃
  • dummy_page 指向一个有效的 KV Cache 页(虽然内容无意义)

核心:保证 dummy_req 的内存访问安全。


8. 总结

8.1 Engine 的核心设计

设计点 目的 实现
"meta" 设备 加速模型加载 只创建结构,直接加载权重到 GPU
memory_ratio 预留中间计算内存 只用 85%-90% 内存做 KV Cache
complete_one() 时机 及时更新请求状态 在采样之前调用
copy_done_event 跨 Stream 同步 实现 Overlap Scheduling
_sync_get_memory() 减少通信开销 1 次 all_reduce 同时获取 min 和 mammy_req`
dummy_page 内存访问安全 避免 dummy_req 访问非法内存

8.2 Engine 和 Scheduler 的关系

1
2
3
4
5
Scheduler (控制平面)
↓ 调用 forward_batch
Engine (数据平面)
↓ 返回 ForwardOutput
Scheduler (处理结果)

分工

  • Scheduler:决定"调度哪些请求"
  • Engine:执行"前向传播 + 采样"

9. 费曼挑战

问题:用简单的话解释"为什么需要跨 Stream 异步同步?"

答案
如果 Engine 等待拷贝完成,就无法立即开始下一批次的计算,GPU 会空闲。使用 Event 跨 Stream 同步后,Engine 可以立即开始下一批次,Scheduler 在另一个 Stream 上等待拷贝完成后处理结果。两个 Stream 并行工作,隐藏了拷贝延迟,提升吞吐量。



10. CUDA Graph 优化原理

10.1 CUDA Graph 解决的问题

传统方式的开销

1
2
3
Python → PyTorch → CUDA Driver → GPU
↓ ↓ ↓ ↓
调用 构建计算图 提交kernel 执行kernel

每次 forward() 都要经过这 4 个步骤,CPU 开销 ~100μs。

对于 Decode 阶段

  • 单个 token 的计算量小(~1ms)
  • 但 kernel 数量多(Attention、MLP、LayerNorm 等)
  • CPU 开销占比大(~10%)

CUDA Graph 的优化

1
2
3
4
5
Capture 阶段(只做一次):
记录所有 kernel 调用序列 → 保存为 Graph

Replay 阶段(每次 forward):
直接提交整个 Graph → GPU 执行

效果

  • 跳过 Python/PyTorch/CUDA Driver 层
  • CPU 开销从 ~100μs 降到 ~1μs(100 倍)
  • 对 Decode 阶段加速明显

10.2 Capture 流程详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def _capture_graphs(self, max_seq_len: int, vocab_size: int, model: BaseLLMModel):
# 1. 预分配输出 Tensor(固定地址)
self.logits = torch.empty(
(self.max_graph_bs, vocab_size),
dtype=torch.float32,
device=self.device,
)

# 2. 从大到小 capture
pool = None
for bs in sorted(self.graph_bs_list, reverse=True):
graph = torch.cuda.CUDAGraph()
batch = Batch(reqs=[self.dummy_req] * bs, phase="decode")

# 3. Warmup:预分配所有中间 Tensor
with get_global_ctx().forward_batch(batch):
self.logits[:bs] = model.forward()

# 4. Capture:记录 kernel 调用序列
h torch.cuda.graph(graph, pool=pool, stream=self.stream):
self.logits[:bs] = model.forward()

# 5. 共享内存池
if pool is None:
pool = graph.pool()
graph_map[bs] = graph

return graph_map

10.3 为什么需要 Warmup?

CUDA Graph 的限制:Capture 时不允许内存分配。

如果没有 Warmup

1
2
3
4
5
# 第一次运行 model.forward()
with torch.cuda.graph(graph, ...):
self.logits[:bs] = model.forward()
# PyTorch 需要分配中间 Tensor(Q、K、V 等)
# 但 CUDA Graph 不允许 → 报错!

有了 Warmup

1
2
3
4
5
6
7
8
9
10
# Warmup:第一次运行,分配所有中间 Tensor
with get_global_ctx().forward_batch(batch):
self.logits[:bs] = model.forward()
# PyTorch 分配并缓存所有需要的内存

# Capture:第二次运行,复用已分配的内存
with torch.cuda.graph(graph, ...):
self.logits[:bs] = model.forward()
# 复用缓存的内存,不需要新分配
# CUDA Graph 成功记录所有 kernel 调用

核心:Warmup 预分配内存 → Capture 时复用内存 → 避免分配错误。


10.4 为什么从大到小 Capture?

3 个原因

1. 内存分配策略

  • 大 batch size 需要的内存多
  • 先 capture 大的,PyTorch 分配大块内存
  • 后续小的复用这些内存(通过 pool

2. 避免内存碎片

  • 如果从小到大:先分配小块,后续大的需要新分配 → 内存碎片
  • 从大到小:先分配大块,后续复用 → 无碎片

3. 优雅降级

  • 如果内存不够,至少保证大 batch size 可用
  • 小 batch size 不用 CUDA Graph 影响不大(本身就快)

核心:从大到小 + 共享 pool = 最优内存利用 + 优雅降级。


10.5 为什么预分配 self.logits

CUDA Graph 记录的是 Tensor 的 GPU 地址

1
2
3
4
5
6
7
8
9
10
# Capture 时
self.logits = torch.empty(...) # 地址:0x7f8a12340000
with torch.cuda.graph(graph, ...):
self.logits[:bs] = model.forward()
# CUDA Graph 记录:kernel_output_address = 0x7f8a12340000

# Replay 时
g.replay()
# GPU 直接写入 0x7f8a12340000
# 不经过 Python/PyTorch,无法重新分配

如果每次 replay 时重新分配

1
2
3
4
5
self.logits = torch.empty(...)  # 新地址:0x7f8a99999000
g.replay()
# GPU 还是写入旧地址 0x7f8a12340000
# self.logits 指向新地址 0x7f8a99999000
# 结果:读到垃圾数据!

核心self.logits 必须在整个生命周期内保持固定地址。


10.6 batch size 列表的设计

1
2
3
def _determine_cuda_graph_bs(...) -> List[int]:
return [1, 2, 4] + list(range(8, cuda_graph_max_bs + 1, 8))
# 例如:[1, 2, 4, 8, 16, 24, 32, 40, ..., 256]

为什么是 8 的倍数?

1. GPU 内存对齐

  • GPU 的内存访问以 128 字节为单位最高效
  • 对于 Llama(hidden_size=409=fp16):
    • batch_size * 4096 * 2 = batch_size * 8192
    • 8 的倍数 → 8192 的倍数 → 对齐良好

2. 减少 Graph 数量

  • 如果是连续的 [1, 2, 3, ..., 256],需要 256 个 graph
  • 使用 8 的倍数,只需要 ~30 个 graph
  • 节省内存和 capture 时间

3. Padding 开销可接受

  • 如果 batch.size = 10,pad 到 16,浪费 6 个 dummy_req
  • 但 Decode 计算量小,padding 开销 < 不用 CUDA Graph 的开销

核心:8 的倍数 = 内存对齐 + 减少 graph 数量 + padding 开销可接受。


10.7 Pad Batch 的选择策略

1
2
3
4
def pad_batch(self, batch: Batch) -> int:
padded_size = next(bs for bs in self.graph_bs_list if bs >= batch.size)
batch.padded_reqs = batch.reqs + [self.dummy_req] * (padded_size - batch.size)
return batch.padded_size - batch.size

选择逻辑

  • next():返回第一个满足 bs >= batch.size 的元素
  • 如果 batch.size = 10graph_bs_list = [1, 2, 4, 8, 16, 24, ...]
  • 选择 16(第一个 >= 10 的)

为什么不选 8?

  • 8 < 10,无法容纳所有请求

为什么不选 24?

  • 虽然 24 也满足,但 next() 返回第一个
  • 选最小的满足条件的,减少 padding 开销

核心:选择 >= batch.size 的最小 graph,平衡 padding 开销和加速效果。


10.8 为什么只有 Decode 能用 CUDA Graph?

1
2
def can_use_cuda_graph(self, batch: Batch) -> bool:
return batch.is_decode and batch.size <= self.max_graph_bs

CUDA Graph 的要求

  • 固定的输入形状(batch size、sequence length)
  • 固定的计算图(不能有条件分支)

Prefill 阶段

  • 每个请求的 input_len 不同(10、100、1000)
  • Attention 的计算形状:(batch_size, input_len, hidden_size)
  • 输入形状不固定,无法用 CUDA Graph

Decode 阶段

  • 每个请求都只生成 1 个 token
  • Attention 的计算形状:(batch_size, 1, hidden_size)
  • 输入形状固定(除了 batch_size),可以用 CUDA Graph

核心:Decode 的输入形状固定 + 计算非密集 = CUDA Graph 的最佳场景。


10.9 共享内存池的机制

1
2
3
4
5
6
7
8
9
10
pool = None
for bs in [256, 248, 240, ..., 8, 4, 2, 1]: # 从大到小
graph = torch.cuda.CUDAGraph()

with torch.cuda.graph(graph, pool=pool, stream=self.stream):
self.logits[:bs] = model.forward()

if pool is None:
pool = graph.pool() # 第一个 graph 创建 pool
graph_map[bs] = graph

内存分配

  1. 第一个 graph(bs=256)

    • 分配所有需要的内存(最大)
    • 创建 pool,记录这些内存
  2. 后续 graph(bs=248, 240, …)

    • 复用 pool 里的内存
    • 不需要新分配
    • 只是用其中一部分

好处

  • 所有 graph 共享同一块内存
  • 总内存占用 = 最大 batch size 的内存
  • 而不是 sum(所有 batch size 的内存)

例子

  • 不用 pool:256 + 248 + 240 + … = 巨大内存
  • 用 pool:max(256, 248, 240, …) = 256 的内存

10.10 Replay 流程

1
2
3
4
5
def replay(self, batch: Batch) -> torch.Tensor:
g = self.graph_map[batch.padded_size]
self.attn_backend.prepare_for_replay(batch)
g.replay() # 重放 graph
return self.logits[: batch.size]

Replay 时

  1. 调用 prepare_for_replay(batch):更新输入数据(input_idspage_table
  2. 调用 g.replay():直接提交所有 kernel 到 GPU
  3. GPU 执行所有 kernel,结果写入 self.logits

关键

  • self.logits 的地址在 Capture 时已记录
  • Replay 时,kernel 直接写入这个地址
  • 不需要重新构建计算图,不需要 Python 调用

11. 总结

11.1 Engine 的核心设计

设计点 目的 实现
"meta" 设备 加速模型加载 只创建结构,直接加载权重到 GPU
memory_ratio 预留中间计算内存 只用 85%-90% 内存做 KV Cache
complete_one() 时机 及时更新请求状态 在采样之前调用
copy_done_event 跨 Stream 同步 实现 Overlap Scheduling
_sync_get_memory() 减少通信开销 1 次 all_reduce 同时获取 min 和 max
dummy_req 满足 CUDA Graph 填充 batch 到固定 size
dummy_page 内存访问安全 避免 dummy_req 访问非法内存

11.2 CUDA Graph 的核心设计

设计点 目的 实现
Warmup + Capture 避免 Capture 时分配内存 先运行一次分配内存,再 Capture
从大到小 Capture 最优内存利用 + 优雅降级 大的先分配,小的复用 pool
8 的倍数 batch size 内存对齐 + 减少 graph 数量 [1,2,4,8,16,24,...]
预分配 self.logits 保持固定地址 __init__ 时分配
共享 pool 减少内存占用 所有 graph 共享同一块内存
只用于 Decode 输入形状固定 Prefill 的 input_len 不固定

11.3 性能提升

CUDA Graph 的效果

  • CPU 开销:~100μs → ~1μs(100 倍)
  • 适用场景:Decode 阶段(计算非密集,kernel 多)
  • Trade-off:少量 padding 换取 CUDA Graph 加速

12. 费曼挑战

问题 1:用简单的话解释"为什么需要跨 Stream 异步同步?"

答案
如果 Engine 等待拷贝完成,就无法立即开始下一批次的计算,GPU 会空闲。使用 Event 跨 Stream 同步后,Engine 可以立即开始下一批次,Scheduler 在另一个 Stream 上等待拷贝完成后处理结果。两个 Stream 并行工作,隐藏了拷贝延迟,提升吞吐量。

问题 2:用简单的话解释"CUDA Graph 的核心原理和为什么适合 Decode?"

答案
CUDA Graph 预先记录所有 kernel 调用序列和参数(包括 Tensor 地址),Replay 时直接提交整个 Graph 到 GPU,跳过 Python/PyTorch/CUDA Driver 层,将 CPU 开销从 ~100μs 降到 ~1μs。Decode 阶段每次只生成 1 个 token,输入形状固定,且计算量小、kernel 多,CPU 开销占比大,CUDA Graph 的加速效果最明显。



14. Token 采样机制

14.1 Sampler 在推理流程中的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# engine.py 的 forward_batch
def forward_batch(self, batch: Batch, args: BatchSamplingArgs) -> ForwardOutput:
# 1. 前向传播得到 Logits
logits = self.model.forward() # shape: (batch_size, vocab_size)

# 2. 更新请求状态
for req in batch.reqs:
req.complete_one()

# 3. 采样得到 next_token
next_tokens_gpu = self.sampler.sample(logits[: batch.size], args).to(torch.int32)
# shape: (batch_size,),每个元素是 token_id

return ForwardOutput(next_tokens_gpu, ...)

Sampler 的作用

  • 输入:Logits(每个 token 的概率分布)
  • 输出:next_token(具体的 token_id)
  • 核心:从概率分布中采样出下一个 token

14.2 prepare 方法:统一批次采样参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def prepare(self, batch: Batch) -> BatchSamplingArgs:
params = [r.sampling_params for r in batch.reqs]

# 1. 检查是否全部是 greedy
if all(p.is_greedy for p in params):
return BatchSamplingArgs(temperatures=None) # 特殊处理

# 2. 收集所有请求的采样参数
MIN_P = MIN_T = 1e-6
ts = [max(0.0 if p.is_greedy else p.temperature, MIN_T) for p in params]
top_ks = [p.top_k if p.top_k >= 1 else self.vocab_size for p in params]
top_ps = [min(max(p.top_p, MIN_P), 1.0) for p in params]

# 3. 转换为 GPU Tensor
temperatures = make_device_tensor(ts, torch.float32, self.device)
top_k = make_device_tensor(top_ks, torch.int32, self.device) if any(k != self.vocab_size for k in top_ks) else None
top_p = make_device_tensor(top_ps, torch.float32, self.device) if any(p < 1.0 for p in top_ps) else None

return BatchSamplingArgs(temperatures, top_k=top_k, top_p=top_p)

关键点

  1. 批次内每个请求的采样参数可能不同(有的 greedy,有的 top_k,有的 top_p)
  2. 统一转换为 GPU Tensor,方便批量采样
  3. 优化:如果所有请求都不需要某个参数,就不创建对应的 Tensor(None

14.3 Greedy Sampling 的特殊处理

1
2
3
4
def sample(self, logits: torch.Tensor, args: BatchSamplingArgs) -> torch.Tensor:
if args.temperatures is None: # 全部是 greedy
return torch.argmax(logits, dim=-1) # 直接取最大值
return sample_impl(logits.float(), args.temperatures, args.top_k, args.top_p)

为什么特殊处理?

采样方式 流程 速度
Greedy 直接 argmax(logits) 快(1 个 kernel)
其他(top_k/top_p) softmax → 过滤 → 随机采样 慢(多个 kernel)

核心:Greedy 是确定性的,可以直接 argmax;其他采样是随机的,需要完整流程。


14.4 采样流程详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def sample_impl(
logits: torch.Tensor,
temperatures: torch.Tensor,
top_k: torch.Tensor | int | None,
top_p: torch.Tensor | float | None,
) -> torch.Tensor:
# 1. Softmax + Temperature
probs = sampling.softmax(logits, temperatures, enable_pdl=is_sm90_supported())

# 2. 根据参数选择采样策略
if top_k is None and top_p is None:
return sampling.sampling_from_probs(probs)

if top_p is None:
return sampling.top_k_sampling_from_probs(probs, top_k)

if top_k is None:
return sampling.top_p_sampling_from_probs(probs, top_p)

return sampling.top_k_top_p_sampling_from_probs(probs, top_k, top_p)

步骤 1:Softmax + Temperature

1
probs = softmax(logits / temperature)

Temperature 的作用

Temperature 效果 多样性
< 1.0(如 0.5) 概率分布更尖锐,倾向高概率 token 低(更确定)
= 1.0 标准 softmax 中等
> 1.0(如 2.0) 概率分布更平滑,更随机 高(更创造性)

例子

1
2
3
4
5
6
7
8
9
10
11
12
logits = [2.0, 1.0, 0.5]

temperature = 1.0:
probs = [0.66, 0.24, 0.10] # 标准分布

temperature = 0.5:
logits / 0.5 = [4.0, 2.0, 1.0]
probs = [0.84, 0.14, 0.02] # 更尖锐,更倾向第一个

temperature = 2.0:
logits / 2.0 = [1.0, 0.5, 0.25]
probs = [0.52, 0.29, 0.19] # 更平滑,更随机

步骤 2:Top-K 过滤

原理:只保留概率最大的 K 个 token,其他设为 0,重新归一化。

例子(top_k = 2):

1
2
3
probs = [0.66, 0.24, 0.10]
top_k_probs = [0.66, 0.24, 0.00] # 过滤掉第 3 个
normalized = [0.73, 0.27, 0.00] # 重新归一化

多样性

  • 小 K(如 10):只考虑前 10 个 → 低多样性
  • 大 K(如 100):考虑前 100 个 → 高多样性

步骤 3:Top-P 过滤(Nucleus Sampling)

原理:累积概率达到 P 时停止,只保留累积概率 <= P 的 token。

例子(top_p = 0.9):

1
2
3
4
5
6
probs = [0.66, 0.24, 0.10]
累积: 0.66 0.90 1.00
↑ ↑ ↑
保留 保留 丢弃(累积已达 0.9)
top_p_probs = [0.66, 0.24, 0.00]
normalized = [0.73, 0.27, 0.00]

多样性

  • 小 P(如 0.5):只保留累积 50% → 低多样性
  • 大 P(如 0.95):保留累积 95% → 高多样性

步骤 4:随机采样

从过滤后的概率分布中随机采样,使用 GPU 的随机数生成器。


14.5 防御性编程:MIN_PMIN_T

1
2
3
MIN_P = MIN_T = 1e-6
ts = [max(0.0 if p.is_greedy else p.temperature, MIN_T) for p in params]
top_ps = [min(max(p.top_p, MIN_P), 1.0) for p in params]

防止的问题

非法参数 问题 解决
temperature = 0 logits / 0infnan max(temperature, 1e-6)
top_p = 0 所有 token 被过滤,无法采样 max(top_p, 1e-6)
top_p > 1.0 概率范围应该是 [0, 1] min(top_p, 1.0)

核心:防御性编程,防止用户传入非法参数导致采样失败。


14.6 异步拷贝优化

1
2
def make_device_tensor(data: List, dtype: torch.dtype, device: torch.device) -> torch.Tensor:
return torch.tensor(data, dtype=dtype, pin_memory=True).to(device, non_blocking=True)

为什么用 pin_memory + non_blocking

Pinned Memory(Page-Locked Memory)

  • 操作系统保证不会换出到磁盘
  • 地址固定,GPU 可以直接通过 DMA 访问
  • CPU→GPU 拷贝速度快 2-3 倍

异步拷贝

1
2
3
4
5
6
7
8
# 同步拷贝(阻塞)
temperatures = torch.tensor([1.0, 0.8, 1.2], device="cuda")
# CPU 等待拷贝完成才继续

# 异步拷贝(不阻塞)
temperatures = torch.tensor([1.0, 0.8, 1.2], pin_memory=True).to("cuda", non_blocking=True)
# CPU 立即继续,拷贝在后台进行
# 后续 GPU kernel 会自动等待拷贝完成

核心pin_memory + non_blocking = 异步拷贝 + 更快速度 = 更好的 CPU-GPU 并行。


14.7 条件创建 Tensor

1
2
3
4
5
top_k, top_p = None, None
if any(k != self.vocab_size for k in top_ks):
top_k = make_device_tensor(top_ks, torch.int32, self.device)
if any(p < 1.0 for p in top_ps):
top_p = make_device_tensor(top_ps, torch.float32, self.device)

优化逻辑

场景 1:所有请求都没设置 top_k

1
2
3
4
5
6
7
params = [
SamplingParams(temperature=1.0), # 没设置 top_k
SamplingParams(temperature=0.8), # 没设置 top_k
]
top_ks = [vocab_size, vocab_size] # 都是 vocab_size
any(k != vocab_size for k in top_ks) # False
top_k = None # 不创建 Tensor

好处

  • 不需要创建 GPU Tensor
  • 不需要拷贝数据
  • sample_impl 跳过 top_k 过滤

场景 2:至少有一个请求设置了 top_k

1
2
3
4
5
6
7
params = [
SamplingParams(temperature=1.0, top_k=50), # 设置了 top_k
SamplingParams(temperature=0.8), # 没设置 top_k
]
top_ks = [50, vocab_size] # 第一个是 50,第二个是 vocab_size
any(k != vocab_size for k in top_ks) # True
top_k = make_device_tensor([50, vocab_size], ...) # 创建 Tensor

关键

  • 第一个请求做 top_k 过滤(k=50)
  • 第二个请求不过滤(k=vocab_size)
  • 批量处理,一次 kernel 调用

核心:只有当至少有一个请求需要时,才创建对应的 Tensor,减少不必要的开销。


15. 总结

15.1 Engine 的核心设计

设计点 目的 实现
"meta" 设备 加速模型加载 只创建结构,直接加载权重到 GPU
memory_ratio 预留中间计算内存 只用 85%-90% 内存做 KV Cache
complete_one() 时机 及时更新请求状态 在采样之前调用
copy_done_event 跨 Stream 同步 实现 Overlap Scheduling
_sync_get_memory() 减少通信开销 1 次 all_reduce 同时获取 min 和 max
dummy_req 满足 CUDA Graph 填充 batch 到固定 size
dummy_page 内存访问安全 避免 dummy_req 访问非法内存

15.2 CUDA Graph 的核心设计

设计点 目的 实现
Warmup + Capture 避免 Capture 时分配内存 先运行一次分配内存,再 Capture
从大到小 Capture 最优内存利用 + 优雅降级 大的先分配,小的复用 pool
8 的倍数 batch size 内存对齐 + 减少 graph 数量 [1,2,4,8,16,24,...]
预分配 self.logits 保持固定地址 __init__ 时分配
共享 pool 减少内存占用 所有 graph 共享同一块内存
只用于 Decode 输入形状固定 Prefill 的 input_len 不固定

15.3 Sampler 的核心设计

设计点 目的 实现
prepare 方法 统一批次采样参数 转换为 GPU Tensor
Greedy 特殊处理 加速确定性采样 直接 argmax,跳过 softmax
MIN_PMIN_T 防御性编程 防止非法参数导致采样失败
pin_memory + non_blocking 异步拷贝 实现 CPU-GPU 并行
条件创建 Tensor 减少开销 只有需要时才创建 top_k/top_p

15.4 采样参数对多样性的影响

参数 作用 低多样性 高多样性
Temperature 控制概率分布形状 < 1.0(更确定) > 1.0(更随机)
Top-K 限制候选 token 数量 小 K(如 10) 大 K(如 100)
Top-P 限制累积概率 小 P(如 0.5) 大 P(如 0.95)

核心:三个参数共同控制"从多大的候选集中采样",候选集越小 → 多样性越低 → 生成越确定。


16. 费曼挑战

问题 1:用简单的话解释"为什么需要跨 Stream 异步同步?"

答案
如果 Engine 等待拷贝完成,就无法立即开始下一批次的计算,GPU 会空闲。使用 Event 跨 Stream 同步后,Engine 可以立即开始下一批次,Scheduler 在另一个 Stream 上等待拷贝完成后处理结果。两个 Stream 并行工作,隐藏了拷贝延迟,提升吞吐量。

问题 2:用简单的话解释"CUDA Graph 的核心原理和为什么适合 Decode?"

答案
CUDA Graph 预先记录所有 kernel 调用序列和参数(包括 Tensor 地址),Replay 时直接提交整个 Graph 到 GPU,跳过 Python/PyTorch/CUDA Driver 层,将 CPU 开销从 ~100μs 降到 ~1μs。Decode 阶段每次只生成 1 个 token,输入形状固定,且计算量小、kernel 多,CPU 开销占比大,CUDA Graph 的加速效果最明显。

问题 3:用简单的话解释"Temperature、Top-K、Top-P 如何控制生成多样性?"

答案
Temperature 控制概率分布的形状(低温更确定,高温更随机)。Top-K 限制候选 token 数量(只考虑前 K 个)。Top-P 限制累积概率(只保留累积概率 <= P 的 token)。三个参数共同控制"从多大的候选集中采样",候选集越小,多样性越低,生成越确定。



18. Llama 模型结构

18.1 BaseLLMModel 抽象基类

1
2
3
class BaseLLMModel(ABC, BaseOP):
@abstractmethod
def forward(self) -> torch.Tensor: ...

作用:定义统一接口,Engine 不需要知道具体模型类型。

为什么需要?

Mini-SGLang 支持多种模型(Llama、Qwen、Mistral 等),但 Engine 只需要调用 model.forward()

1
2
3
4
5
6
7
# engine.py
class Engine:
def __init__(self, config):
self.model: BaseLLMModel = create_model(...) # 可能是 Llama、Qwen 等

def forward_batch(self, batch):
logits = self.model.forward() # 统一接口,不关心具体模型

核心:抽象基类 = 统一接口 + 多态。


18.2 Llama 模型的层次结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
LlamaForCausalLM (最外层,用于因果语言建模)
├── LlamaModel (Transformer 主体)
│ ├── embed_tokens (Embedding 层)
│ ├── layers (32 个 LlamaDecoderLayer)
│ │ ├── LlamaDecoderLayer 0
│ │ │ ├── input_layernorm
│ │ │ ├── self_attn (Attention)
│ │ │ ├── post_attention_layernorm
│ │ │ └── mlp (MLP)
│ │ ├── LlamaDecoderLayer 1
│ │ ├── ...
│ │ └── LlamaDecoderLayer 31
│ └── norm (最后的 LayerNorm)
└── lm_head (输出层,vocab_size)

数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
input_ids (token IDs)

embed_tokens (转换为 embeddings)

LlamaDecoderLayer 0 (Attention + MLP)

LlamaDecoderLayer 1

...

LlamaDecoderLayer 31

norm (最后的 LayerNorm)

lm_head (投影到 vocab_size)

logits (每个 token 的概率分布)

18.3 LlamaDecoderLayer 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
class LlamaDecoderLayer(BaseOP):
def __init__(self, config: ModelConfig, layer_id: int):
self.self_attn = LlamaAttn(config, layer_id)
self.mlp = LlamaMLP(config)
self.input_layernorm = RMSNormFused(...)
self.post_attention_layernorm = RMSNormFused(...)

def forward(self, x: torch.Tensor, residual: torch.Tensor | None) -> Tuple[torch.Tensor, torch.Tensor]:
x, residual = self.input_layernorm.forward(x, residual)
x = self.self_attn.forward(x)
x, residual = self.post_attention_layernorm.forward(x, residual)
x = self.mlp.forward(x)
return x, residual

为什么有两个 LayerNorm?

这是 Pre-Norm Transformer 的标准结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
输入 x

input_layernorm (归一化)

self_attn (Attention)

+ residual (残差连接,在 post_attention_layernorm 内部)

post_attention_layernorm (归一化)

mlp (MLP)

+ residual (残差连接,在下一层的 input_layernorm 内部)

输出

两个 LayerNorm 的位置

  1. input_layernorm:在 Attention 之前
  2. post_attention_layernorm:在 MLP 之前

为什么要归一化?

  • 稳定训练(防止梯度爆炸/消失)
  • 加速收敛

Residual Connection(残差连接)

residual 的作用:提供梯度的"高速公路",防止梯度消失。

没有残差连接的问题

1
2
3
4
5
6
x → Layer1 → Layer2 → ... → Layer32 → Loss

反向传播:
Loss → ∂Layer32 → ∂Layer31 → ... → ∂Layer1
↓ ↓ ↓
梯度 * 0.9 梯度 * 0.9 * 0.9 梯度 * 0.9^31 ≈ 0

每经过一层,梯度乘以一个小于 1 的数,32 层后梯度几乎为 0(梯度消失)。

有残差连接

1
2
3
4
5
6
7
8
x → Layer1 → Layer2 → ... → Layer32 → Loss
↘ ↓ ↗ ↓ ↗ ↓ ↗
残差连接 残差连接 残差连接

反向传播:
Loss → ∂Layer32 → ∂Layer31 → ... → ∂Layer1
↓ ↓ ↓
梯度 梯度 + 直接路径 梯度 + 直接路径

数学上

1
2
3
y = F(x) + x  # 残差连接

∂y/∂x = ∂F(x)/∂x + 1 # 梯度至少是 1,不会消失

核心:残差连接让梯度可以直接传播,防止梯度消失,加速训练。


residual 的实现细节

RMSNormFused.forward 的逻辑(推断):

1
2
3
4
5
6
def forward(self, x: torch.Tensor, residual: torch.Tensor | None) -> Tuple[torch.Tensor, torch.Tensor]:
if residual is not None:
x = x + residual # 先加残差
residual = x # 保存当前值作为下一次的残差
x = rms_norm(x) # 归一化
return x, residual

完整流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 初始
x = embeddings
residual = None

# Layer 0
x, residual = input_layernorm(x, residual)
# residual is None,跳过加法
# residual = embeddings
# x = norm(embeddings)

x = self_attn(x)
# x = attn_output

x, residual = post_attention_layernorm(x, residual)
# x = attn_output + embeddings (残差连接!)
# residual = attn_output + embeddings
# x = norm(attn_output + embeddings)

x = mlp(x)
# x = mlp_output

# 返回 (mlp_output, attn_output + embeddings)
# 下一层会用 residual 做残差连接

核心residual 保存上一次的输出,用于下一次的残差连接。


18.4 LlamaModel 详解

1
2
3
4
5
6
7
8
9
10
11
12
class LlamaModel(BaseOP):
def __init__(self, config: ModelConfig):
self.embed_tokens = VocabParallelEmbedding(...)
self.layers = OPList([LlamaDecoderLayer(config, layer_id) for layer_id in range(config.num_layers)])
self.norm = RMSNormFused(...)

def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
x = self.embed_tokens.forward(input_ids)
residual: torch.Tensor | None = None
for layer in self.layers.op_list:
x, residual = layer.forward(x, residual)
return self.norm.forward(x, residual)[0]

为什么 residual 初始化为 None

第一层的特殊性

1
2
3
4
5
6
7
8
9
10
# 第一层
x = embeddings
residual = None

x, residual = layer_0.forward(x, residual)
# 在 input_layernorm 中:
# if residual is not None: # False,跳过
# x = x + residual
# re x # residual = embeddings
# x = norm(x) # x = norm(embeddings)

第一层不需要残差连接,因为没有"上一层的输出"可以加。

后续层

1
2
3
# 第二层
x, residual = layer_1.forward(x, residual)
# residual 不是 None,会做残差连接

核心residual = None 表示"第一层不做残差连接"。


18.5 LlamaForCausalLM 详解

1
2
3
4
5
6
7
8
9
class LlamaForCausalLM(BaseLLMModel):
def __init__(self, config: ModelConfig):
self.model = LlamaModel(config)
self.lm_head = ParallelLMHead(...)

def forward(self) -> torch.Tensor:
output = self.model.forward(get_global_ctx().batch.input_ids)
logits = self.lm_head.forward(output)
return logits

为什么 forward() 没有参数?

答案:通过 全局上下文 get_global_ctx() 获取 input_ids

完整流程

1
2
3
4
# engine.py
def forward_batch(self, batch: Batch, args: BatchSamplingArgs):
with self.ctx.forward_batch(batch): # 设置全局 batch
logits = self.model.forward() # 模型从全局上下文获取 input_ids

为什么这样设计?

  1. 简化接口

    • 不需要每次都传递 batch 参数
    • model.forward() 接口更简洁
  2. 支持 CUDA Graph

    • CUDA Graph 要求固定的函数签名
    • 如果 forward(batch) 每次 batch 不同,无法 Capture
    • 使用全局上下文,forward() 无参数,函数签名固定
  3. 动态数据

    • model.forward()函数签名固定(无参数)
    • 全局上下文的内容可以变化
    • CUDA Graph 记录的是"读取全局上下文"这个操作,不是具体的 batch

核心:全局上下文 = 固定函数签名 + 动态数据 = 满足 CUDA Graph 要求。


18.6 完整前向传播流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 1. Engine 设置全局上下文
with self.ctx.forward_batch(batch):
# 2. 调用模型 forward
logits = self.model.forward()

# 3. LlamaForCausalLM.forward
def forward(self):
# 从全局上下文获取 input_ids
input_ids = get_global_ctx().batch.input_ids

# 4. LlamaModel.forward
x = self.embed_tokens(input_ids) # Embedding
residual = None
for layer in self.layers:
# 5. LlamaDecoderLayer.forward
x, residual = self.input_layernorm(x, residual)
x = self.self_attn(x) # Attention
x, residual = self.post_attention_layernorm(x, residual)
x = self.mlp(x) # MLP

# 6. 最后的 LayerNorm
x = self.norm(x, residual)[0]

# 7. 投影到 vocab_size
logits = self.lm_head(x)
return logits

18.7 Shape 变化

假设:

  • batch_size = 8
  • seq_len = 1(Decode 阶段)
  • hidden_size = 4096
  • vocab_size = 32000
  • num_layers = 32
阶段 Shape 说明
input_ids (8, 1) 8 个请求,每个 1 个 token
Embedding (8, 1, 4096)(8, 4096) 转换为向量,squeeze seq_len
Transformer Layers (8, 4096) 32 层,shape 不变
LM Head (8, 32000) 投影到词表
Logits (8, 32000) 每个请求对应 32000 个词的概率

Prefill 阶段seq_len > 1):

  • Shape 变化:(batch_size, seq_len, hidden_size)(batch_size, seq_len, vocab_size)
  • 每个 token 都有对应的 logits

19. 总结

19.1 Engine 的核心设计

设计点 目的 实现
"meta" 设备 加速模型加载 只创建结构,直接加载权重到 GPU
memory_ratio 预留中间计算内存 只用 85%-90% 内存做 KV Cache
complete_one() 时机 及时更新请求状态 在采样之前调用
copy_done_event 跨 Stream 同步 实现 Overlap Scheduling
_sync_get_memory() 减少通信开销 1 次 all_reduce 同时获取 min 和 max
dummy_req 满足 CUDA Graph 填充 batch 到固定 size
dummy_page 内存访问安全 避免 dummy_req 访问非法内存

19.2 CUDA Graph 的核心设计

设计点 目的 实现
Warmup + Capture 避免 Capture 时分配内存 先运行一次分配内存,再 Capture
从大到小 Capture 最优内存利用 + 优雅降级 大的先分配,小的复用 pool
8 的倍数 batch size 内存对齐 + 减少 graph 数量 [1,2,4,8,16,24,...]
预分配 self.logits 保持固定地址 __init__ 时分配
共享 pool 减少内存占用 所有 graph 共享同一块内存
只用于 Decode 输入形状固定 Prefill 的 input_len 不固定

19.3 Sampler 的核心设计

设计点 目的 实现
prepare 方法 统一批次采样参数 转换为 GPU Tensor
Greedy 特殊处理 加速确定性采样 直接 argmax,跳过 softmax
MIN_PMIN_T 防御性编程 防止非法参数导致采样失败
pin_memory + non_blocking 异步拷贝 实现 CPU-GPU 并行
条件创建 Tensor 减少开销 只有需要时才创建 top_k/top_p

19.4 Llama 模型的核心设计

设计点 目的 实现
BaseLLMModel 统一接口 抽象基类,支持多种模型
Pre-Norm 稳定训练 每个子层之前 LayerNorm
Residual Connection 防止梯度消失 提供梯度的"高速公路"
residual = None 第一层特殊处理 第一层不做残差连接
全局上下文 支持 CUDA Graph 固定函数签名 + 动态数据

20. 费曼挑战

问题 1:用简单的话解释"为什么需要跨 Stream 异步同步?"

答案
如果 Engine 等待拷贝完成,就无法立即开始下一批次的计算,GPU 会空闲。使用 Event 跨 Stream 同步后,Engine 可以立即开始下一批次,Scheduler 在另一个 Stream 上等待拷贝完成后处理结果。两个 Stream 并行工作,隐藏了拷贝延迟,提升吞吐量。

问题 2:用简单的话解释"CUDA Graph 的核心原理和为什么适合 Decode?"

答案
CUDA Graph 预先记录所有 kernel 调用序列和参数(包括 Tensor 地址),Replay 时直接提交整个 Graph 到 GPU,跳过 Python/PyTorch/CUDA Driver 层,将 CPU 开销从 ~100μs 降到 ~1μs。Decode 阶段每次只生成 1 个 token,输入形状固定,且计算量小、kernel 多,CPU 开销占比大,CUDA Graph 的加速效果最明显。

问题 3:用简单的话解释"Temperature、Top-K、Top-P 如何控制生成多样性?"

答案
Temperature 控制概率分布的形状(低温更确定,高温更随机)。Top-K 限制候选 token 数量(只考虑前 K 个)。Top-P 限制累积概率(只保留累积概率 <= P 的 token)。三个参数共同控制"从多大的候选集中采样",候选集越小,多样性越低,生成越确定。

问题 4:用简单的话解释"残差连接如何防止梯度消失?"

答案
没有残差连接时,梯度需要经过 32 层反向传播,每层都会乘以一个小于 1 的数,最后梯度几乎为 0。残差连接提供了一条"高速公路",让梯度可以直接传播到前面的层,不会消失。数学上,y = F(x) + x,所以 ∂y/∂x = ∂F(x)/∂x + 1,梯度至少是 1,不会消失。

问题 5:用简单的话解释"为什么 forward() 没有参数可以支持 CUDA Graph?"

答案
CUDA Graph 要求固定的函数签名。如果 forward(batch) 有参数,每次 batch 不同,CUDA Graph 无法处理。使用全局上下文后,forward() 无参数,函数签名固定,但全局上下文的内容可以变化。CUDA Graph 记录的是"读取全局上下文"这个操作,不是具体的 batch,所以可以支持动态数据。