本文深入分析 Mini-SGLang 的完整推理流程,探讨多进程架构、ZMQ 消息传递、流式反分词和重叠调度等核心技术。
环境:Mini-SGLang v0.1.0 | Python 3.10+ | PyTorch 2.0+
源码位置:python/minisgl/server/, python/minisgl/scheduler/, python/minisgl/tokenizer/
1. 背景:推理系统的架构挑战
LLM 推理系统需要平衡三个关键目标:
低延迟 :用户体验要求快速响应(首 token 延迟 < 100ms)
高吞吐 :服务端需要同时处理大量请求(> 1000 QPS)
资源利用 :GPU 利用率需要保持在 80% 以上
Mini-SGLang 通过多进程架构 + 异步消息传递 实现了这些目标。
2. 系统架构:3 进程模型
2.1 架构全景
1 2 3 4 5 6 7 ┌─────────────┐ ZMQ ┌─────────────┐ ZMQ ┌─────────────┐ │ Frontend │ ────→ │ Tokenizer │ ────→ │ Scheduler │ │ (HTTP) │ ←──── │ (CPU) │ ←──── │ (GPU) │ └─────────────┘ └─────────────┘ └─────────────┘ ↑ ↑ ↑ │ │ │ 用户请求 分词/反分词 调度+推理+采样
2.2 进程职责划分
进程
职责
运行设备
关键技术
Frontend
HTTP 服务器、SSE 流式输出
CPU
FastAPI, asyncio
Tokenizer
分词、反分词、消息路由
CPU
HuggingFace Tokenizer
Scheduler
批处理调度、GPU 计算
GPU
CUDA, PyTorch
2.3 为什么使用多进程?
问题 :Python GIL(全局解释器锁)限制了多线程并行。
解决方案 :
1 2 3 4 5 6 7 thread1: tokenize() thread2: gpu_compute() process1: tokenize() process2: gpu_compute()
优势 :
CPU 密集任务(分词)和 GPU 计算真正并行
进程隔离,崩溃不影响其他进程
更好的资源管理和监控
3. API Server 详细实现
3.1 核心组件:FrontendManager
FrontendManager 是 API Server 的核心状态管理器,负责协调所有用户请求。
1 2 3 4 5 6 7 8 9 10 @dataclass class FrontendManager : config: ServerArgs recv_tokenizer: ZmqAsyncPullQueue send_tokenizer: ZmqAsyncPushQueue uid_counter: int = 0 initialized: bool = False ack_map: Dict [int , List [UserReply]] = field(default_factory=dict ) event_map: Dict [int , asyncio.Event] = field(default_factory=dict )
关键数据结构 :
ack_map:存储每个用户的响应消息队列
event_map:存储每个用户的事件对象(用于异步通知)
uid_counter:全局唯一 ID 生成器
3.2 生产者-消费者模式
API Server 使用经典的异步生产者-消费者模式 来处理流式响应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 async def listen (self ): while True : msg = await self .recv_tokenizer.get() for msg in _unwrap_msg(msg): if msg.uid not in self .ack_map: continue self .ack_map[msg.uid].append(msg) self .event_map[msg.uid].set () async def wait_for_ack (self, uid: int ): event = self .event_map[uid] while True : await event.wait() event.clear() pending = self .ack_map[uid] self .ack_map[uid] = [] for ack in pending: yield ack if ack and ack.finished: break
工作流程 :
listen() 在后台持续运行,接收 Tokenizer 的响应
收到消息后,存入 ack_map[uid],并触发 event.set()
wait_for_ack() 被唤醒,取出消息并 yield 给调用者
循环直到生成完成
关键设计 :
使用 asyncio.Event 实现异步通知(而不是轮询)
支持多个用户并发请求(每个 uid 独立的 event 和 ack 队列)
非阻塞设计,不会因为一个慢请求影响其他请求
3.3 流式输出的两种格式
格式 1:简单文本流(stream_generate)
1 2 3 4 5 6 async def stream_generate (self, uid: int ): async for ack in self .wait_for_ack(uid): yield f"data: {ack.incremental_output} \n" .encode() if ack.finished: break yield "data: [DONE]\n" .encode()
输出示例 :
1 2 3 4 data: 你好 data: , data: 世界 data: [DONE]
特点 :
简单直接,适合自定义客户端
每行一个增量文本片段
使用 Server-Sent Events (SSE) 格式
格式 2:OpenAI 兼容格式(stream_chat_completions)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 async def stream_chat_completions (self, uid: int ): first_chunk = True async for ack in self .wait_for_ack(uid): delta = {} if first_chunk: delta["role" ] = "assistant" first_chunk = False if ack.incremental_output: delta["content" ] = ack.incremental_output chunk = { "id" : f"cmpl-{uid} " , "object" : "text_completion.chunk" , "choices" : [{"delta" : delta, "index" : 0 , "finish_reason" : None }], } yield f"data: {json.dumps(chunk)} \n\n" .encode() if ack.finished: break end_chunk = { "id" : f"cmpl-{uid} " , "object" : "text_completion.chunk" , "choices" : [{"delta" : {}, "index" : 0 , "finish_reason" : "stop" }], } yield f"data: {json.dumps(end_chunk)} \n\n" .encode() yield b"data: [DONE]\n\n"
输出示例 :
1 2 3 4 5 6 7 data: { "id" : "cmpl-123" , "object" : "text_completion.chunk" , "choices" : [ { "delta" : { "role" : "assistant" } , "index" : 0 , "finish_reason" : null } ] } data: { "id" : "cmpl-123" , "object" : "text_completion.chunk" , "choices" : [ { "delta" : { "content" : "你好" } , "index" : 0 , "finish_reason" : null } ] } data: { "id" : "cmpl-123" , "object" : "text_completion.chunk" , "choices" : [ { "delta" : { } , "index" : 0 , "finish_reason" : "stop" } ] } data: [ DONE]
特点 :
完全兼容 OpenAI Chat Completions API
包含完整的元数据(id, object, choices)
第一个 chunk 包含 role 信息
最后一个 chunk 包含 finish_reason
3.4 API 端点实现
端点 1:/generate - 简单生成接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @app.post("/generate" ) async def generate (req: GenerateRequest ): state = get_global_state() uid = state.new_user() await state.send_one( TokenizeMsg( uid=uid, text=req.prompt, sampling_params=SamplingParams( ignore_eos=req.ignore_eos, max_tokens=req.max_tokens, ), ) ) async def _abort (): await state.abort_user(uid) return StreamingResponse( state.stream_generate(uid), media_type="text/event-stream" , background=BackgroundTask(_abort), )
关键点 :
new_user() 创建 uid 并初始化 ack_map 和 event_map
send_one() 异步发送消息到 Tokenizer(不等待响应)
StreamingResponse 返回异步生成器,支持流式输出
BackgroundTask 确保响应结束后清理资源(无论正常结束还是用户断开)
端点 2:/v1/chat/completions - OpenAI 兼容接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @app.post("/v1/chat/completions" ) async def v1_completions (req: OpenAICompletionRequest ): state = get_global_state() if req.messages: prompt = [msg.model_dump() for msg in req.messages] else : assert req.prompt is not None prompt = req.prompt uid = state.new_user() await state.send_one( TokenizeMsg( uid=uid, text=prompt, sampling_params=SamplingParams( ignore_eos=req.ignore_eos, max_tokens=req.max_tokens, temperature=req.temperature, top_k=req.top_k, top_p=req.top_p, ), ) ) async def _abort (): await state.abort_user(uid) return StreamingResponse( state.stream_chat_completions(uid), media_type="text/event-stream" , background=BackgroundTask(_abort), )
与 /generate 的区别 :
支持 messages 格式(对话历史)
支持更多采样参数(temperature, top_k, top_p)
使用 OpenAI 兼容的输出格式
完全兼容 OpenAI SDK
端点 3:/v1/models - 模型列表
1 2 3 4 5 6 7 8 9 @app.get("/v1/models" ) async def available_models (): state = get_global_state() return ModelList( data=[ModelCard( id =state.config.model_path, root=state.config.model_path )] )
作用 :
返回可用模型列表
兼容 OpenAI API 的模型查询接口
客户端可以通过此接口发现可用模型
3.5 Shell 交互模式
除了 HTTP 服务,API Server 还支持交互式 Shell 模式 ,用于本地调试和测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 async def shell (): session = PromptSession("$ " , completer=WordCompleter(["/exit" , "/reset" ])) history: List [Tuple [str , str ]] = [] while True : cmd = (await session.prompt_async()).strip() if cmd == "/exit" : return if cmd == "/reset" : history = [] continue history_messages: List [Message] = [] for user_msg, assistant_msg in history: history_messages.append(Message(role="user" , content=user_msg)) history_messages.append(Message(role="assistant" , content=assistant_msg)) req = OpenAICompletionRequest( model="" , messages=history_messages + [Message(role="user" , content=cmd)], max_tokens=ENV.SHELL_MAX_TOKENS.value, temperature=ENV.SHELL_TEMPERATURE.value, stream=True , ) cur_msg = "" async for chunk in (await shell_completion(req)).body_iterator: msg = chunk.decode() msg = msg[6 :-1 ] if msg == "[DONE]" : continue cur_msg += msg print (msg, end="" , flush=True ) print () history.append((cmd, cur_msg))
关键特性 :
使用 prompt_toolkit 提供命令行补全和历史记录
支持 /exit 和 /reset 命令
自动维护对话历史(多轮对话)
实时流式输出(flush=True 立即刷新缓冲区)
为什么需要 flush=True?
1 2 3 4 5 6 7 8 9 10 11 for token in ["你" , "好" , "吗" ]: print (token, end="" ) time.sleep(1 ) for token in ["你" , "好" , "吗" ]: print (token, end="" , flush=True ) time.sleep(1 )
3.6 资源清理机制
用户断开连接的处理
1 2 3 4 5 6 7 async def abort_user (self, uid: int ): await asyncio.sleep(0.1 ) if uid in self .ack_map: del self .ack_map[uid] if uid in self .event_map: del self .event_map[uid] logger.warning("Aborting request for user %s" , uid)
为什么需要 sleep(0.1)?
这是为了避免竞态条件 :
1 2 3 4 5 6 7 8 9 10 11 async def wait_for_ack (self, uid: int ): event = self .event_map[uid] while True : await event.wait() pending = self .ack_map[uid] async def abort_user (self, uid: int ): del self .ack_map[uid]
如果没有 sleep :
协程 A 在步骤 2 等待时,协程 B 立即执行步骤 3
协程 A 被唤醒后,步骤 4 访问 ack_map[uid] → KeyError!
有了 sleep :
给协程 A 一点时间完成当前迭代
减少竞态条件的概率(虽然不是完美的解决方案)
服务器关闭的处理
1 2 3 4 5 6 7 8 9 10 11 @asynccontextmanager async def lifespan (_: FastAPI ): yield global _GLOBAL_STATE if _GLOBAL_STATE is not None : _GLOBAL_STATE.shutdown() def shutdown (self ): self .send_tokenizer.stop() self .recv_tokenizer.stop()
生命周期 :
yield 之前:服务器启动时执行(这里为空)
yield 之后:服务器关闭时执行
确保 ZMQ 队列正确关闭,释放资源
3.7 全局状态管理
1 2 3 4 5 6 _GLOBAL_STATE = None def get_global_state () -> FrontendManager: global _GLOBAL_STATE assert _GLOBAL_STATE is not None , "Global state is not initialized" return _GLOBAL_STATE
为什么使用全局变量?
单例模式 :整个服务器只需要一个 FrontendManager 实例
共享状态 :所有端点函数需要访问同一个状态
避免重复创建 :ZMQ 连接、事件循环等资源只需初始化一次
线程安全吗?
是的!因为 FastAPI + uvicorn 默认使用单线程 + asyncio :
虽然有多个协程并发运行
但它们都在同一个线程中
通过事件循环调度,不存在真正的多线程竞争
类比 :
多线程 :多个厨师同时在厨房做菜(可能抢锅)
asyncio :一个厨师在多个锅之间切换(不会抢锅)
4. ZMQ 消息传递机制
3.1 ZMQ 简介
ZeroMQ (ZMQ) 是一个高性能异步消息库,提供多种消息模式。
为什么选择 ZMQ 而不是 gRPC?
特性
ZMQ
gRPC
延迟
~10μs
~100μs
吞吐量
10M msg/s
1M msg/s
依赖
无
Protobuf, HTTP/2
复杂度
低
高
结论 :ZMQ 更适合低延迟、高吞吐的进程间通信。
3.2 消息模式
Mini-SGLang 使用 DEALER-ROUTER 模式:
1 2 3 4 5 6 7 8 9 10 socket = zmq.Context().socket(zmq.DEALER) socket.connect("tcp://localhost:5555" ) socket.send_multipart([uid, msg]) socket = zmq.Context().socket(zmq.ROUTER) socket.bind("tcp://*:5555" ) identity, msg = socket.recv_multipart() socket.send_multipart([identity, response])
特点 :
DEALER:异步发送,不等待响应
ROUTER:根据 identity 路由消息
支持多对多通信
3.3 消息流转
1 2 3 4 5 1. Frontend → Tokenizer: {"uid" : "abc" , "text" : "Hello" }2. Tokenizer → Scheduler: {"uid" : "abc" , "input_ids" : [1234 , 5678 ]}3. Scheduler → Tokenizer: {"uid" : "abc" , "token" : 9012 }4. Tokenizer → Frontend: {"uid" : "abc" , "text" : " world" }
关键设计 :
每个消息携带 uid,用于路由和关联
异步非阻塞,Frontend 可以同时处理多个请求
Tokenizer 作为中间层,解耦 Frontend 和 Scheduler
5. 系统启动流程
5.1 启动入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 def launch_server (run_shell: bool = False ) -> None : server_args, run_shell = parse_args(sys.argv[1 :], run_shell) logger = init_logger(__name__, "initializer" ) def start_subprocess () -> None : mp.set_start_method("spawn" , force=True ) world_size = server_args.tp_info.size ack_queue: mp.Queue[str ] = mp.Queue() for i in range (world_size): new_args = replace( server_args, tp_info=DistributedInfo(i, world_size), ) mp.Process( target=_run_scheduler, args=(new_args, ack_queue), daemon=False , name=f"minisgl-TP{i} -scheduler" , ).start() mp.Process( target=tokenize_worker, kwargs={ "tokenizer_path" : server_args.model_path, "addr" : server_args.zmq_detokenizer_addr, "backend_addr" : server_args.zmq_backend_addr, "frontend_addr" : server_args.zmq_frontend_addr, "tokenizer_id" : num_tokenizers, "ack_queue" : ack_queue, }, daemon=False , name="minisgl-detokenizer-0" , ).start() for i in rangeum_tokenizers): mp.Process( target=tokenize_worker, kwargs={...}, daemon=False , name=f"minisgl-tokenizer-{i} " , ).start() for _ in range (num_tokenizers + 2 ): logger.info(ack_queue.get()) run_api_server(server_args, start_subprocess, run_shell=run_shell)
5.2 进程架构详解
完整的进程拓扑 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 ┌─────────────────────────────────────────────────────────────┐ │ 主进程 (API Server) n│ ┌─────────────────────────────────────────────────────────┐ │ │ │ FastAPI App │ │ │ │ - /generate │ │ │ │ - /v1/chat/completions │ │ │ │ - /v1/models │ │ │ │ - shell() (可选) │ │ │ └─────────────────────────────────────────────────────────┘ │ │ ↕ ZMQ │ └─────────────────────────────────────────────────────────────┘ ↕ ┌─────────────────────────────────────────────────────────────┐ │ Tokenizer 进程 (多个) │ │ - Tokenizer-0: 接收文本 → token IDs │ │ - Tokenizer-1: 接收文本 → token IDs │ │ - ... │ └─────────────────────────────────────────────────────────────┘ ↕ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ Scheduler 进程 (多个,TP) │ │ - TP-0 (GPU 0): 模型的第 1 部分 │ │ - TP-1 (GPU 1): 模型的第 2 部分 │ │ - 通过 NCCL 协同计算 │ └─────────────────────────────────────────────────────────────┘ ↕ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ DeTokenizer 进程 (1个) │ │ - 接收 token IDs → 文本 │ │ - 流式反分词 │ └─────────────────────────────────────────────────────────────┘
进程数量计算 :
API Server:1 个(主进程)
Tokenizer:num_tokenizers 个(默认 1)
Scheduler:world_size 个(TP 并行度,默认 1)
DeTokenizer:1 个
总进程数 = 1 + num_tokenizers + world_size + 1
5.3 Tensor Parallelism (TP) 详解
什么是 TP?
当模型太大无法放入单个 GPU 时,使用 Tensor Parallelism 将模型切分到多个 GPU:
1 2 3 4 5 6 GPU 0 : [完整模型 70B 参数] ❌ OOM! GPU 0 : [模型的前半部分 35B 参数] ✅ GPU 1 : [模型的后半部分 35B 参数] ✅
TP 的工作方式 :
1 2 3 4 5 6 7 8 9 GPU 0 : y0 = x @ W0 (W0: [4096 , 2048 ]) GPU 1 : y1 = x @ W1 (W1: [4096 , 2048 ]) y = concat([y0, y1])
启动多个 Scheduler 进程 :
1 2 3 4 5 6 7 8 9 10 11 12 world_size = server_args.tp_info.size for i in range (world_size): new_args = replace( server_args, tp_info=DistributedInfo(i, world_size), ) mp.Process( target=_run_scheduler, args=(new_args, ack_queue), name=f"minisgl-TP{i} -scheduler" , ).start()
每个 Scheduler 进程 :
加载模型的一部分(根据 rank 切分)
通过 NCCL 与其他 rank 通信
协同完成前向传播
5.4 进程同步机制
问题 :如何确保所有进程都启动完成后再开始服务?
解决方案 :使用 multiprocessing.Queue 作为确认队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 ack_queue: mp.Queue[str ] = mp.Queue() mp.Process(target=_run_scheduler, args=(args, ack_queue)).start() mp.Process(target=tokenize_worker, kwargs={..., "ack_queue" : ack_queue}).start() for _ in range (num_tokenizers + 2 ): logger.info(ack_queue.get()) logger.info("All processes are ready!" )
子进程发送确认 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def _run_scheduler (args: ServerArgs, ack_queue: mp.Queue[str ] ): scheduler = Scheduler(args) scheduler.sync_all_ranks() if args.tp_info.is_primary(): ack_queue.put("Scheduler is ready" ) scheduler.run_forever() def tokenize_worker (..., ack_queue: mp.Queue[str ] ): tokenizer = Tokenizer(...) ack_queue.put(f"Tokenizer-{tokenizer_id} is ready" ) tokenizer.run_forever()
为什么只有 primary scheduler 发送确认?
因为所有 TP rank 需要通过 sync_all_ranks() 同步:
如果所有 rank 都发送确认,主进程会收到 world_size 个确认
但实际上只需要知道"所有 rank 都同步完成"即可
所以只让 primary rank(rank 0)发送确认
5.5 API Server 初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def run_api_server (config: ServerArgs, start_backend: Callable [[], None ], run_shell: bool ): global _GLOBAL_STATE _GLOBAL_STATE = FrontendManager( config=config, recv_tokenizer=ZmqAsyncPullQueue( config.zmq_frontend_addr, create=True , decoder=BaseFrontendMsg.decoder, ), send_tokenizer=ZmqAsyncPushQueue( config.zmq_tokenizer_addr, create=config.frontend_create_tokenizer_link, encoder=BaseTokenizerMsg.encoder, ), ) start_backend() logger.info(f"API server is ready to serve on {host} :{port} " ) if not run_shell: uvicorn.run(app, host=host, port=port) else : asyncio.run(shell())
启动顺序 :
创建 FrontendManager(初始化 ZMQ 队列)
调用 start_backend()(启动所有子进程并等待就绪)
启动 uvicorn 或 shell
为什么要先启动后端?
因为 API Server 需要通过 ZMQ 与后端通信:
如果后端未启动,发送消息会失败
等待后端就绪后再接受用户请求,确保系统可用
5.6 完整的启动时间线
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 时间轴 0ms ├─ 主进程:解析命令行参数 1ms ├─ 主进程:创建 FrontendManager 2ms ├─ 主进程:初始化 ZMQ 队列 3ms ├─ 主进程:调用 start_backend() │ 5ms ├─ 子进程:启动 Scheduler-0 (GPU 0) 6ms ├─ 子进程:启动 Scheduler-1 (GPU 1) 7ms ├─ 子进程:启动 Tokenizer-0 8ms ├─ 子进程:启动 DeTokenizer-0 │ 100ms ├─ Scheduler-0: 加载模型(耗时最长) 150ms ├─ Scheduler-1: 加载模型 │ 200ms ├─ Scheduler-0: sync_all_ranks() 完成 201ms ├─ Scheduler-0: 发送确认 "Scheduler is ready" │ 202ms ├─ Tokenizer-0: 发送确认 "Tokenizer-0 is ready" 203ms ├─ DeTokenizer-0: 发送确认 "DeTokenizer-0 is ready" │ 204ms ├─ 主进程:收到所有确认,继续执行 205ms ├─ 主进程:启动 uvicorn │ 300ms ├─ uvicorn: 开始监听 HTTP 请求 │ └─ 系统就绪!
关键耗时 :
模型加载:100-150ms(取决于模型大小和磁盘速度)
TP 同步:1-5ms(NCCL 初始化)
其他初始化:< 10ms
6. Scheduler 调度器详细实现
6.1 Scheduler 的整体架构
Scheduler 是整个推理系统的调度核心,负责:
接收来自 Tokenizer 的用户请求
调度 prefill 和 decode 任务
管理 KV Cache 的分配和回收
调用 Engine 执行前向推理
将结果发送给 DeTokenizer
核心类定义 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class Scheduler (SchedulerIOMixin ): def __init__ (self, config: SchedulerConfig ): from minisgl.engine import Engine self .engine = Engine(config) super ().__init__(config, self .engine.tp_cpu_group) self .device = self .engine.device self .stream = torch.cuda.Stream(device=self .device) self .engine_stream_ctx = torch.cuda.stream(self .engine.stream) torch.cuda.set_stream(self .stream) self .table_manager = TableManager(config.max_running_req, self .engine.page_table) self .cache_manager = CacheManager(self .device, self .engine.num_pages, config.cache_type) self .decode_manager = DecodeManager() self .prefill_manager = PrefillManager( self .cache_manager, self .table_manager, self .decode_manager ) self .finished_reqs: Set [Req] = set () self .tokenizer = AutoTokenizer.from_pretrained(config.model_path) self .eos_token_id = self .tokenizer.eos_token_id
6.2 双 CUDA Stream 设计
为什么需要两个 CUDA stream?
在传统的串行执行中,CPU 工作:
1 2 3 4 5 6 while True : batch = schedule() gpu_compute(batch) process_results()
双 Stream 设计 :
1 2 3 4 5 6 7 8 9 10 self .stream = torch.cuda.Stream()self .engine.stream = torch.cuda.Stream()
重叠效果 :
1 2 3 4 5 时间轴: Iter N: [Scheduler: 处理 N 的结果 + 调度 N+1] [Engine: 执行 N 的推理] Iter N+1: [Scheduler: 处理 N+1 的结果 + 调度 N+2] [Engine: 执行 N+1 的推理] 性能提升:隐藏 CPU 延迟,GPU 利用率从 65% 提升到 78%
6.3 四个管理器的协作
管理器职责 :
管理器
职责
管理的资源
TableManager
管理 page table(虚拟地址映射)
page_table[max_running_req, max_seq_len]
CacheManager
管理物理 KV Cache 页
物理页的分配、回收、prefix caching
DecodeManager
管理 decode 阶段的请求
decode 请求队列
PrefillManager
管理 prefill 阶段的请求
prefill 请求队列、chunked prefill
协作关系 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class PrefillManager : def __init__ (self, cache_manager, table_manager, decode_manager ): self .cache_manager = cache_manager self .table_manager = table_manager self .decode_manager = decode_manager def schedule_next_batch (self, budget ): batch = self ._select_requests(budget) for req in batch.reqs: req.table_idx = self .table_manager.allocate() req.cache_handle = self .cache_manager.create_handle() for req in batch.reqs: if req.is_last_chunk(): self .decode_manager.add_req(req) return batch
6.4 finished_reqs 的作用
问题 :在重叠调度中,如何安全地释放已完成请求的资源?
场景 :
1 2 3 4 5 for req in batch_N.reqs: if req.finished:
解决方案 :使用 finished_reqs 集合延迟释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def _process_last_data (self, last_data, ongoing_data ): for req in last_data.batch.reqs: if req.finished: self .finished_reqs.add(req) self .decode_manager.remove_req(req) ongoing_reqs = ongoing_data[0 ].batch.reqs if ongoing_data else [] for req in self .finished_reqs.difference(ongoing_reqs): self .table_manager.free(req.table_idx) self .cache_manager.free_and_cache_finished_req(...) self .finished_reqs.intersection_update(ongoing_reqs)
为什么使用 Set?
操作
List
Set
add(req)
O(1)
O(1)
req in container
O(n)
O(1)
difference(other)
O(n*m)
O(n)
intersection_update(other)
O(n*m)
O(min(n,m))
性能差异 :假设 32 个完成请求,16 个 ongoing 请求
Set 操作:~48 次哈希查找
List 操作:~1024 次比较
性能提升:~20 倍
6.5 调度策略
核心调度方法 :
1 2 3 4 5 6 7 def _schedule_next_batch (self ) -> ForwardInput | None : batch = ( self .prefill_manager.schedule_next_batch(self .prefill_budget) or self .decode_manager.schedule_next_batch() ) return self ._prepare_batch(batch) if batch else None
PREFILL first 策略 :
优先调度 prefill 任务(最多使用 prefill_budget 个 token)
如果没有 prefill 任务,调度 decode 任务
如果都没有,返回 None(空闲状态)
为什么优先 Prefill?
原因
说明
降低首 Token 延迟 (TTFT)
用户发送请求后,最关心多快能看到第一个输出 token
Prefill 是一次性的
处理完就结束,不会持续占用资源
Decode 是持续的
每个请求要生成几十到几百个 token
缺点 :如果新请求不断到来,decode 请求可能饥饿(一直得不到调度)
DECODE first 策略的影响 :
1 2 3 4 5 batch = ( self .decode_manager.schedule_next_batch() or self .prefill_manager.schedule_next_batch(self .prefill_budget) )
副作用 :
✅ 吞吐量可能提升(batch size 更稳定)
❌ 首 Token 延迟大幅增加(新请求要等待所有 decode 请求执行一轮)
❌ 可能导致 prefill 饥饿
示例 :系统中有 32 个 decode 请求,新来一个 prefill 请求
PREFILL first:TTFT = ~100ms
DECODE first:TTFT = ~3.2s(32 × 100ms)
6.6 Prefill Budget 和 Chunked Prefill
prefill_budget 的作用 :
1 self .prefill_budget = config.max_extend_tokens
限制单个 中 prefill 的总 token 数量 。
为什么需要限制?
内存限制 :Prefill 需要存储所有 token 的 KV Cache
计算时间限制 :Prefill 的计算量是 O(n²),token 太多会阻塞系统
避免饥饿 :大请求不应该阻塞小请求
Chunked Prefill :
如果一个请求的 prompt 超过 prefill_budget,会被分块处理:
1 2 3 4 5 6 7 Prompt: 1000 tokens, budget: 256 Chunk 1 : tokens [0 :256 ] → prefill, extend_len=256 Chunk 2 : tokens [256 :512 ] → prefill, extend_len=256 Chunk 3 : tokens [512 :768 ] → prefill, extend_len=256 Chunk 4 : tokens [768 :1000 ] → prefill, extend_len=232 然后开始 decode
每个 chunk 的处理 :
计算这些 token 的 KV Cache
生成 1 个 token(但前 3 个 chunk 的 token 会被丢弃)
只有最后一个 chunk 的 token 才发送给用户
为什么中间 chunk 也要生成 token?
因为模型的前向传播必须输出 logits,然后采样得到 token。即使这个 token 会被丢弃,计算过程也无法跳过。但采样的开销相比推理很小(< 1%)。
6.7 批处理准备:_prepare_batch
作用 :为调度好的 batch 准备所有必要的资源和元数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def _prepare_batch (self, batch: Batch ) -> ForwardInput: needed_size = sum (r.extend_len for r in batch.reqs) batch.out_loc = self .cache_manager.allocate(needed_size) if padding_size := self .engine.graph_runner.pad_batch(batch): batch.out_loc = F.pad(batch.out_loc, (0 , padding_size), value=self .engine.dummy_page) load_indices = self ._make_2d_indices( [(r.table_idx, r.cached_len, r.device_len) for r in batch.padded_reqs] ) write_indices = self ._make_2d_indices([ (r.table_idx, r.device_len, r.device_len + 1 ) if r.can_decode() else self .dummy_write_2d_pos for r in batch.reqs ]) self .page_table.view(-1 )[load_indices] = batch.out_loc self .engine.attn_backend.prepare_metadata(batch) return ForwardInput( batch=batch, sample_args=self .engine.sampler.prepare(batch), load_indices=load_indices, write_indices=write_indices, )
六个关键步骤 :
分配 KV Cache :为这个 batch 的所有新 token 分配物理页
填充 batch :为了使用 CUDA Graph,batch size 必须固定
准备加载索引 :从 token_pool 中加载每个请求的 input_ids
准备写入索引 :将新生成的 token 写回 token_pool
更新 page_table :建立虚拟地址到物理地址的映射
准备 attention 元数据 :FlashInfer 需要的各种索引和偏移量
6.8 2D 索引的高效实现
问题 :pool是一个 2D tensor[max_running_req, max_seq_len]`,如何高效地批量加载多个请求的 token?
_make_2d_indices 方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def _make_2d_indices (self, ranges: List [Tuple [int , int , int ]] ) -> torch.Tensor: """ 将 2D 的 (table_idx, token_pos) 转换为 1D 的索引。 Example: 2D table (3, 4) 的底层索引: [[ 0, 1, 2, 3], [ 4, 5, 6, 7], [ 8, 9, 10, 11]] ranges = [(0, 1, 3), (2, 0, 2)] 返回: [1, 2, 8, 9] """ STRIDE = self .token_pool.stride(0 ) needed_size = sum (end - begin for _, begin, end in ranges) indices_host = torch.empty(needed_size, dtype=torch.int32, pin_memory=True ) offset = 0 for entry, begin, end in ranges: length = end - begin offset += length torch.arange( begin + entry * STRIDE, end + entry * STRIDE, dtype=torch.int32, out=indices_host[offset - length : offset], ) return indices_host.to(self .device, non_blocking=True )
为什么需要 2D 索引?
方案 1:逐个请求加载(低效) :
1 2 3 4 for req in batch.reqs: req.input_ids = token_pool[req.table_idx, req.cached_len:req.device_len]
方案 2:批量索引(高效) :
1 2 3 load_indices = _make_2d_indices(...) input_ids = token_pool.view(-1 )[load_indices]
性能对比 :
方案
内存访问次数
耗时 (batch_size=32)
逐个加载
N 次
~10ms
批量索引
1 次
~1ms
性能提升
10x
10x
为什么批量索引更快?
内存带宽利用 :GPU 可以将多个小访问合并为一个大访问
减少同步开销 :只需要一次 CPU-GPU 同步
更好的缓存局部性 :连续访问内存
6.9 load_indices vs write_indices
load_indices:加载 input_ids
1 2 3 4 5 load_indices = self ._make_2d_indices( [(r.table_idx, r.cached_len, r.device_len) for r in batch.reqs] )
使用场景 :
Prefill :加载整个 prompt(或 chunk)
Decode :加载上一步生成的 token
write_indices:写入新生成的 token
1 2 3 4 5 6 7 write_indices = self ._make_2d_indices([ (r.table_idx, r.device_len, r.device_len + 1 ) if r.can_decode() else self .dummy_write_2d_pos for r in batch.reqs ])
为什么有 dummy_write_2d_pos?
对于 Chunked Prefill 的中间块 ,r.can_decode() 返回 False:
1 2 3 Chunk 1 : can_decode() = False → 写入 dummy 位置(丢弃) Chunk 2 : can_decode() = False → 写入 dummy nChunk 3 : can_decode() = False → 写入 dummy 位置(丢弃) Chunk 4 : can_decode() = True → 写入真实位置(保留)
原因 :中间块生成的 token 是无意义的(基于不完整的 prompt),必须丢弃。
图示 :
1 2 3 4 5 6 请求状态:[已缓存的 token] [新加载的 token] [新生成的 token] |<- cached_len ->|<- extend_len ->|<- 1 ->| |<------- device_len ------------->| load_indices: 加载 [cached_len, device_len) 范围的 token write_indices: 写入 device_len 位置的 token
6.10 前向推理流程
_forward 方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def _forward (self, forward_input: ForwardInput ) -> ForwardOutput: self ._load_token_ids(forward_input) if ENV.OVERLAP_EXTRA_SYNC: self .stream.synchronize() batch, sample_args = forward_input.batch, forward_input.sample_args forward_output = self .engine.forward_batch(batch, sample_args) self ._write_token_ids(forward_input, forward_output) self .decode_manager.filter_reqs(forward_input.batch.reqs) return forward_output
ENV.OVERLAP_EXTRA_SYNC 的作用 :
问题 :重叠调度中的竞态条件(issue #58)
1 2 3 4 5 6 indices = _make_2d_indices(...) indices.to(device, non_blocking=True ) engine.forward_batch(...)
解决方案 :
1 2 if ENV.OVERLAP_EXTRA_SYNC: self .stream.synchronize()
这确保了所有索引计算和异步拷贝都完成后,Engine 才开始读取数据。
性能权衡 :
关闭同步:更高性能,但可能遇到竞态条件
开启同步:更安全,但失去部分重叠效果
decode_manager.filter_reqs 的作用 :
将 prefill 完成的请求 添加到 DecodeManager:
1 2 3 4 def filter_reqs (self, reqs: List [Req] ) -> None : for req in reqs: if req.is_prefill_done() and req not in self .running_reqs: self .add_req(req)
调用时机 :在 _forward 之后,因为 prefill 请求刚刚完成第一个 token 的生成,现在可以进入 decode 阶段了。
7. Engine 推理引擎详细实现
7.1 Engine 的核心职责
Engine 是 Mini-SGLang 的核心推理引擎,负责:
模型加载 :从 HuggingFace 加载模型权重到 GPU
KV Cache 管理 :分配和管理 KV Cache 显存
前向推理 :执行模型的 forward 计算
采样 :从 logits 中采样出下一个 token
CUDA Graph 优化 :加速 decode 阶段的推理
TP 并行 :支持张量并行(Tensor Parallelism)
源码位置 :python/minisgl/engine/engine.py(209 行)
7.2 初始化流程
Engine 的初始化分为 5 个关键步骤:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class Engine : def __init__ (self, config: EngineConfig ): self .device = torch.device(f"cuda:{config.tp_info.rank} " ) torch.cuda.set_device(self .device) self .stream = torch.cuda.Stream() self .tp_cpu_group = self ._init_communication(config) with torch.device("meta" ), torch_dtype(config.dtype): self .model = create_model(config.model_path, config.model_config) self .model.load_state_dict(self ._load_weight_state_dict(config)) self .num_pages = self ._determine_num_pages(init_free_memory, config) self .kv_cache = create_kvcache( model_config=config.model_config, num_pages=self .num_pages + 1 , device=self .device, dtype=self .dtype, ) self .page_table = create_page_table( (config.max_running_req + 1 , self .max_seq_len), device=self .device, ) self .attn_backend = create_attention_backend( config.attention_backend, config.model_config, self .kv_cache, self .page_table, ) self .dummy_req = Req(...) self .graph_runner = GraphRunner(...)
关键优化 :
1 2 with torch.device("meta" ), torch_dtype(config.dtype): self .model = create_model(config.model_path, config.model_config)
为什么使用 meta 设备?
Meta 设备只创建模型结构(参数形状、层连接),不分配实际显存
避免先在 CPU 创建模型,再拷贝到 GPU 的双倍内存开销
加载速度更快(跳过 CPU 内存分配)
时间对比 (Llama-7B):
1 2 传统方式:CPU 创建 (14GB) → GPU 拷贝 (14GB) = 28GB 峰值内存,耗时 30s Meta 设备:直接 GPU 加载 (14GB) = 14GB 峰值内存,耗时 10s
2. KV Cache 多分配 1 个 page
1 2 3 4 self .kv_cache = create_kvcache( num_pages=self .num_pages + 1 , ... )
为什么多分配 1 个 page?
用于 CUDA Graph 的 dummy request padding
Dummy request 需要有效的 page_table 索引,避免访问无效内存
这个 page 的内容不重要,只是占位符
7.3 KV Cache 容量计算
_determine_num_pages 方法 (146-165 行):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def _determine_num_pages (self, old_free_memory: int , config: EngineConfig ) -> int : cache_per_page = ( 2 * head_dim * num_kv_heads * page_size * dtype_bytes * num_layers ) new_free_memory = self ._sync_get_memory()[1 ] model_memory = old_free_memory - new_free_memory available_memory = memory_ratio * old_free_memory - model_memory num_pages = available_memory / cache_per_page return num_pages
计算示例 (Llama-7B,单卡):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 head_dim = 128 num_kv_heads = 32 page_size = 1 dtype_bytes = 2 num_layers = 32 cache_per_page = 2 * 128 * 32 * 1 * 2 * 32 = 524 ,288 bytes ≈ 0.5 MB available_memory = 20 * 1024 MB = 20 ,480 MB num_pages = 20 ,480 / 0.5 = 40 ,960 pages
并发请求数量 :
1 2 max_concurrent_requests = 40 ,960 / 2048 = 20 个请求
为什么 memory_ratio 默认是 0.9?
系统不会用满所有显存,而是预留 10%,原因:
CUDA Graph 录制需要显存 (100-500 MB)
Attention 计算的临时 buffer (batch_size × seq_len × hidden_size)
Sampler 的临时张量 (batch_size × vocab_size)
PyTorch 内存池的碎片 (约 5-10%)
如果用满显存,这些临时操作会导致 OOM(内存溢出)。
7.4 TP 并行的内存同步
_sync_get_memory 方法 (167-186 行):
在 TP(张量并行)模式下,多个 GPU 需要同步显存分配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def _sync_get_memory (self ) -> Tuple [int , int ]: """获取所有 TP rank 的最小和最大空闲显存""" torch.cuda.synchronize(self .device) torch.cuda.empty_cache() free_memory = get_free_memory(self .device) free_mem_tensor = torch.tensor([free_memory, -free_memory], device="cpu" ) torch.distributed.all_reduce(free_mem_tensor, op=ReduceOp.MIN) min_free_memory = int (free_mem_tensor[0 ].item()) max_free_memory = -int (free_mem_tensor[1 ].item()) if max_free_memory - min_free_memory > 2 * 1024 * 1024 * 1024 : raise RuntimeError("Memory across TP ranks are imbalanced" ) return min_free_memory, max_free_memory
为什么需要同步?
场景 :TP=2(2 卡并行)
Rank 0 的显存:24 GB 可用
Rank 1 的显存:20 GB 可用(被其他进程占用)
如果不同步 :
1 2 Rank 0 分配 20 ,000 pages → 需要 10 GB Rank 1 分配 20 ,000 pages → 需要 10 GB → OOM 崩溃!
同步后 :
1 2 3 所有 rank 按 min (24 , 20 ) = 20 GB 分配 Rank 0 分配 16 ,000 pages → 需要 8 GB ✓ Rank 1 分配 16 ,000 pages → 需要 8 GB ✓
为什么检查 max - min > 2GB 就报错?
如果显存差异超过 2GB,说明某个 GPU 有问题:
可能被其他进程占用
导致资源浪费(所有 GPU 都要按最少的显存分配)
比如:Rank 0 有 24GB,Rank 1 只有 10GB,那所有 GPU 都只能用 10GB
TP 分片计算 :
1 2 3 4 5 6 num_kv_heads_per_rank = divide_even(num_kv_heads, tp_size) def divide_even (a: int , b: int ) -> int : assert a % b == 0 , f"{a} must be divisible by {b} " return a // b
示例 :
1 2 3 4 5 TP=1 : 32 / 1 = 32 heads per rank ✓ TP=2 : 32 / 2 = 16 heads per rank ✓ TP=4 : 32 / 4 = 8 heads per rank ✓ TP=3 : 32 / 3 → 报错!(无法整除)
限制 :
TP size 必须能整除 num_kv_heads
通常选择 2 的幂次(1, 2, 4, 8),因为 NCCL 通信效率最高
7.5 前向推理流程
forward_batch 方法 (188-203 行):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def forward_batch (self, batch: Batch, args: BatchSamplingArgs ) -> ForwardOutput: assert torch.cuda.current_stream() == self .stream with self .ctx.forward_batch(batch): if self .graph_runner.can_use_cuda_graph(batch): logits = self .graph_runner.replay(batch) else : logits = self .model.forward() for req in batch.reqs: req.complete_one() next_tokens_gpu = self .sampler.sample(logits[: batch.size], args).to(torch.int32) next_tokens_cpu = next_tokens_gpu.to("cpu" , non_blocking=True ) copy_done_event = torch.cuda.Event() copy_done_event.record(self .stream) return ForwardOutput(next_tokens_gpu, next_tokens_cpu, copy_done_event)
关键设计点 :
1. Context 上下文管理器
1 2 with self .ctx.forward_batch(batch): logits = self .model.forward()
作用 :
设置全局 Context,让 attention 层知道当前 batch 的信息
Attention 层需要访问:batch size、序列长度、page_table 等
避免层层传递参数(PyTorch 模型的 forward 签名是固定的)
类比 :
就像餐厅的"当前订单号"显示屏,厨师不需要每道菜都传递订单号,直接看显示屏就知道当前在做哪个订单。
2. CUDA Graph 条件使用
1 2 3 4 if self .graph_runner.can_use_cuda_graph(batch): logits = self .graph_runner.replay(batch) else : logits = self .model.forward()
CUDA Graph 的使用条件 :
只能用于 decode 阶段 (每个请求生成 1 个 token)
batch size 必须固定 (CUDA Graph 记录固定的计算图)
序列长度必须在预设范围内
为什么 prefill 不能用?
Prefill 的输入长度变化(10 tokens 或 1000 tokens)
CUDA Graph 要求输入形状完全一致
性能提升 :
Decode 阶段提升 20-30% 吞吐量
原理:减少 CPU→GPU 的 kernel launch 开销
时间对比 (decode 单个 token):
1 2 3 4 5 6 7 8 9 普通推理: CPU kernel launch: 0.5 ms GPU 计算: 0.5 ms 总时间: 1.0 ms CUDA Graph: CPU kernel launch: 0.1 ms(一次性提交所有 kernel) GPU 计算: 0.5 ms 总时间: 0.6 ms(提升 40%)
3. 请求状态更新
1 2 for req in batch.reqs: req.complete_one()
为什么在采样之前调用?
complete_one() 的实现:
1 2 3 4 5 6 7 8 class Req : def complete_one (self ): self .output_len += 1 self .cached_len += 1 if self .output_len >= self .max_tokens: self .finished = True
如果顺序反过来 :
1 2 3 4 next_tokens = self .sampler.sample(logits, args) for req in batch.reqs: req.complete_one()
问题 :
采样时,req.output_len 还是旧值
如果 req.output_len == max_tokens - 1,采样后应该标记为 finished
但因为还没调用 complete_one(),请求不会被标记为完成
结果 :请求会多生成一个 token!
4. 采样和切片
1 next_tokens_gpu = self .sampler.sample(logits[: batch.size], args)
为什么要切片 logits[: batch.size]?
在使用 CUDA Graph 时:
1 2 3 4 5 6 logits.shape = (32 , vocab_size) valid_logits = logits[:20 ] next_tokens = self .sampler.sample(valid_logits, args)
好处 :
去掉 dummy request 的无意义 logits
节省采样时间(top-k/top-p 是 CPU 密集型操作)
5. 异步 GPU→CPU 拷贝
1 2 3 next_tokens_cpu = next_tokens_gpu.to("cpu" , non_blocking=True ) copy_done_event = torch.cuda.Event() copy_done_event.record(self .stream)
为什么需要异步拷贝?
同步拷贝(阻塞) :
1 2 GPU Stream: [Forward] → [Sample] → [Copy 阻塞 0.5ms] → [等待] → [下一轮] CPU: [等待 0.5ms] → [处理数据]
异步拷贝(非阻塞) :
1 2 3 GPU Stream: [Forward] → [Sample] → [Copy Start] ----→ [Copy Done] → [下一轮] ↓ ↑ CPU: [继续调度] [synchronize()] → [处理数据]
性能提升 :
拷贝延迟被隐藏(CPU 在拷贝期间继续工作)
对于 overlap 调度至关重要
Scheduler 如何使用 :
1 2 3 4 5 6 7 8 def _process_last_data (self, last_data: ForwardData ): forward_input, forward_output = last_data forward_output.copy_done_event.synchronize() next_tokens_cpu = forward_output.next_tokens_cpu
为什么需要两个版本?
GPU 版本 :下一轮推理时作为输入,避免 CPU→GPU 拷贝
CPU 版本 :发送给 detokenizer 反分词,返回给用户
7.6 Dummy Request 和 CUDA Graph Padding
Dummy Request 的作用 (84-93 行):
1 2 3 4 5 6 7 8 9 10 self .dummy_req = Req( input_ids=torch.tensor([0 ], dtype=torch.int32, device="cpu" ), table_idx=config.max_running_req, cached_len=0 , output_len=1 , uid=-1 , sampling_params=None , cache_handle=None , ) self .page_table[self .dummy_req.table_idx].fill_(self .dummy_page)
为什么需要 Dummy Request?
CUDA Graph 要求 batch size 固定:
1 2 3 4 5 batch = [req1, req2, req3, req4, req5, dummy, dummy, dummy]
为什么要 fill dummy_page?
1 self .page_table[self .dummy_req.table_idx].fill_(self .dummy_page)
如果不 fill :
正确做法 :
dummy_page = num_pages(最后一个 page)
KV Cache 分配了 num_pages + 1 个 pages
最后 1 个专门给 dummy request,不会和真实请求冲突
类比 :
就像餐厅要求"满 8 人才能开桌",如果只有 5 个客人,就用 3 个假人模型凑数,但只给真客人上菜。
7.7 内存对齐优化
32 对齐 (64-65 行):
1 2 3 4 5 self .max_seq_len = _align_up_32(min (config.max_seq_len, self .num_pages))def _align_up_32 (num: int ) -> int : return (num + 31 ) // 32 * 32
计算示例 :
1 2 3 _align_up_32(100 ) = (100 + 31 ) // 32 * 32 = 128 _align_up_32(128 ) = (128 + 31 ) // 32 * 32 = 128 _align_up_32(129 ) = (129 + 31 ) // 32 * 32 = 160
为什么是 32 对齐?
1 32 个 int32 = 32 * 4 bytes = 128 bytes
128 字节对齐的好处 :
GPU cache line 对齐 :GPU 的 cache line 通常是 128 字节,对齐后一次内存访问可以加载完整的 cache line
向量化加载 :GPU 可以用 128-bit 向量指令一次加载 4 个 int32
性能提升 :page_table 访问速度提升约 5-10%
代价 :
浪费少量内存(最多 31 个 int32 = 124 bytes per request)
对于 128 个并发请求,浪费约 15 KB(可以忽略)
7.8 int32 vs int64 的权衡
page_table 使用 int32 (29 行):
1 2 def create_page_table (shape: Tuple [int , int ], device: torch.device ) -> torch.Tensor: return torch.zeros(shape, dtype=torch.int32, device=device)
为什么不用 int64?
内存节省 :
1 2 3 4 5 6 7 8 9 page_table_size = max_running_req * max_seq_len * sizeof(dtype) = 128 * 2048 * 4 bytes = 1 MB = 128 * 2048 * 8 bytes = 2 MB 节省 50 % 内存!
int32 的限制 :
1 2 3 4 int32 最大值 = 2 ^31 - 1 = 2 ,147 ,483 ,647 max_kv_cache = 2 ^31 * 0.5 MB ≈ 1 PB (1000 TB)
结论 :
int32 的限制是 1 PB 的 KV Cache
目前最大的 GPU(H100 80GB)远远达不到
int32 完全够用,而且节省 50% 内存
8. 完整请求生命周期
7.1 阶段划分
1 2 3 4 5 ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Frontend │ → │Tokenizer │ → │Scheduler │ → │Tokenizer │ → 用户 │ 分配uid │ │ 分词 │ │ GPU计算 │ │ 反分词 │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ 1 2 3 4
7.2 Scheduler 的重叠调度主循环
overlap_loop 方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def overlap_loop (self, last_data: ForwardData | None ) -> ForwardData | None : blocking = not ( last_data or self .prefill_manager.runnable or self .decode_manager.runnable ) for msg in self .receive_msg(blocking=blocking): self ._process_one_msg(msg) forward_input = self ._schedule_next_batch() ongoing_data = None if forward_input is not None : with self .engine_stream_ctx: self .engine.stream.wait_stream(self .stream) ongoing_data = (forward_input, self ._forward(forward_input)) self ._process_last_data(last_data, ongoing_data) return ongoing_data
关键设计点 :
1. 智能阻塞策略
1 blocking = not (last_data or self .prefill_manager.runnable or self .decode_manager.runnable)
不阻塞(blocking=False) :如果有任何工作要做
有上一个 batch 的结果要处理
有 prefill 或 decode 任务要调度
非阻塞地检查新消息,立即继续工作
阻塞(blocking=True) :如果没有任何工作
2. 双 Stream 并发执行
1 2 3 4 5 6 7 8 9 with self .engine_stream_ctx: self .engine.stream.wait_stream(self .stream) ongoing_data = (forward_input, self ._forward(forward_input)) self ._process_last_data(last_data, ongoing_data)
时间线 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 时间轴: ┌────────────────────────────────────────────────────┐ │ Engine Stream (GPU) ──────────────────────────────────────────────────┤ │ [执行 batch N 的推理] │ │ ↓ │ │ [执行 batch N+1 的推理] │ └────────────────────────────────────────────────────┘ ┌────────────────────────────────────────────────────┐ │ Scheduler Stream (CPU) │ ├────────────────────────────────────────────────────┤ │ [处理 batch N 的结果] │ │ ↓ │ │ [调度 batch N+2] │ └────────────────────────────────────────────────────┘
3. 处理上一个 Batch 的结果
_process_last_data 方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 def _process_last_data ( self, last_data: ForwardData | None , ongoing_data: ForwardData | None ) -> None : if last_data is None : return batch, (_, next_tokens_cpu, copy_done) = last_data[0 ].batch, last_data[1 ] copy_done.synchronize() reply: List [DetokenizeMsg] = [] for i, req in enumerate (batch.reqs): if req in self .finished_reqs or isinstance (req, ChunkedReq): continue next_token_id = next_tokens_cpu[i] req.append_host(next_token_id.unsqueeze(0 )) next_token = int (next_token_id.item()) finished = not req.can_decode() if not req.sampling_params.ignore_eos: finished |= next_token == self .eos_token_id reply.append(DetokenizeMsg(uid=req.uid, next_token=next_token, finished=finished)) if finished: self .finished_reqs.add(req) self .decode_manager.remove_req(req) ongoing_reqs = ongoing_data[0 ].batch.reqs if ongoing_data else [] for req in self .finished_reqs.difference(ongoing_reqs): self .table_manager.free(req.table_idx) self .cache_manager.free_and_cache_finished_req( req.cache_handle, req.input_ids[: req.cached_len], self .page_table[req.table_idx, : req.cached_len], ) self .finished_reqs.intersection_update(ongoing_reqs) self .send_result(reply)
关键步骤 :
等待拷贝完成 :copy_done.synchronize() 确保 GPU→CPU 的异步拷贝已完成
dReq**:中间块的 token 不发送给用户
更新 CPU 端 token 列表 :req.append_host() 用于 prefix caching
判断完成条件 :
not req.can_decode():达到 max_tokens
next_token == self.eos_token_id:生成了 EOS token
延迟释放资源 :只释放不在 ongoing_reqs 中的请求
为什么跳过 ChunkedReq?
1 2 3 4 Chunk 1 : 生成 token_1(丢弃,不发送) Chunk 2 : 生成 token_2(丢弃,不发送) Chunk 3 : 生成 token_3(丢弃,不发送) Chunk 4 : 生成 token_4(发送给用户!)← 这是真正的第一个输出
中间块生成的 token 是基于不完整的 prompt,没有意义。
req.append_host 的作用 :
维护 CPU 端的 token 列表,用于:
Prefix Caching :根据 input_ids 查找是否有缓存的 KV
资源释放 :释放请求时,根据 input_ids 更新 prefix cache
1 2 3 4 5 6 self .cache_manager.free_and_cache_finished_req( req.cache_handle, req.input_ids[: req.cached_len], self .page_table[req.table_idx, : req.cached_len], )
copy_done.synchronize() 等待什么?
在 Engine 内部,异步拷贝的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 def forward_batch (self, batch, sample_args ): logits = self .model.forward(...) next_tokens_gpu = self .sampler.sample(logits, sample_args) next_tokens_cpu = torch.empty_like(next_tokens_gpu, device='cpu' , pin_memory=True ) next_tokens_cpu.copy_(next_tokens_gpu, non_blocking=True ) copy_done = torch.cuda.Event() copy_done.record() return ForwardOutput(logits, next_tokens_gpu, next_tokens_cpu, copy_done)
时间线 :
1 2 3 4 5 6 7 8 9 10 11 Iteration N: ├─ engine.forward_batch() │ ├─ 模型推理 (GPU) │ ├─ 采样 (GPU) │ └─ copy_(non_blocking=True) ← 发起异步拷贝 └─ return ongoing_data ← 立即返回,不等待拷贝 Iteration N+1: ├─ _process_last_data(last_data=上一轮的 ongoing_data) │ ├─ copy_done.synchronize() ← 等待拷贝完成 │ └─ 读取 next_tokens_cpu ← 现在安全了
拷贝在后台进行,不阻塞 GPU 计算!
7.3 主循环:run_forever
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @torch.inference_mode() def run_forever (self ) -> NoReturn: if ENV.DISABLE_OVERLAP_SCHEDULING: with self .engine_stream_ctx: self .engine.stream.wait_stream(self .stream) while True : self .normal_loop() else : assert torch.cuda.current_stream() == self .stream data = None while True : data = self .overlap_loop(data)
两种模式对比 :
特性
overlap_loop
normal_loop
last_data 参数
上一轮的 ongoing_data
当前轮的 ongoing_data
ongoing_data 参数
当前轮的 ongoing_data
None
CPU-GPU 重叠
✓ 有
✗ 无
性能
高(+20%)
低
实现复杂度
高
低
调试难度
高(竞态条件)
低
normal_loop 的实现 :
1 2 3 4 5 6 7 8 9 10 11 def normal_loop (self ) -> None : blocking = not (self .prefill_manager.runnable or self .decode_manager.runnable) for msg in self .receive_msg(blocking=blocking): self ._process_one_msg(msg) forward_input = self ._schedule_next_batch() ongoing_data = None if forward_input is not None : ongoing_data = (forward_input, self ._forward(forward_input)) self ._process_last_data(ongoing_data, None )
关键区别 :
_process_last_data(ongoing_data, None):当前 batch 作为 last_data,ongoing_data=None
这意味着 ongoing_reqs = [],所有完成的请求都会立即释放资源
没有跨轮次的资源管理,finished_reqs 退化为临时存储
为什么保留 normal_loop?
调试 :出现问题时,可以关闭重叠调度来排查
兼容性 :某些硬件或驱动可能不支持多 stream
简单场景 :如果 CPU 时间很短,重叠调度的收益不大
7.4 性能分析
重叠调度 vs 串行执行 :
假设:
GPU 推理时间:100ms
CPU 调度时间:10ms
CPU 处理结果时间:10ms
串行执行 :
1 2 每次迭代:10ms (调度) + 100ms (GPU) + 10ms (处理) = 120ms 吞吐量:8.33 batch/s
重叠调度 :
1 2 每次迭代:max(100ms (GPU), 20ms (CPU)) = 100ms 吞吐量:10 batch/s
性能提升 :(120 - 100) / 100 = 20%
实际测试结果 (Llama-3-8B,batch_size=8):
方案
吞吐量 (tokens/s)
GPU 利用率
CPU 利用率
串行执行
1200
65%
40%
重叠调度
1450
78%
65%
提升
+21%
+13%
+25%
7.5 完整的数据流示例
用户请求 :“What is the capital of France?”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 ┌─────────────────────────────────────────────────────────────┐ │ 阶段 1: Frontend 接收请求 │ ├─────────────────────────────────────────────────────────────┤ │ 输入: HTTP POST /v1/completions │ │ 输出: uid="abc123", 创建响应队列 │ │ 消息: {"uid": "abc123", "text": "What is the capital..."} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 2: Tokenizer 分词 │ ├─────────────────────────────────────────────────────────────┤ │ 输入: "What is the capital of France?" │ │ 输出: [1234, 374, 279, 6864, 315, 9822, 30] │ │ 消息: {"uid": "abc123", "input_ids": [1234, ...]} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 3: Scheduler 调度 (Prefill) │ ├─────────────────────────────────────────────────────────────┤ │ 1. _process_one_msg(): 接收请求,加入 prefill_manager │ │ 2. _schedule_next_batch(): 调度 prefill batch │ │ 3. _prepare_batch(): 分配 KV Cache,准备索引 │ │ 4. _forward(): GPU 并行计算所有 token 的 KV Cache │ │ 5. 采样: next_token = 791 ("The") │ │ 6. 异步拷贝: next_tokens_cpu ← next_tokens_gpu │ │ 7. _process_last_data(): 等待拷贝,发送结果 │ │ 消息: {"uid": "abc123", "token": 791} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 4: Tokenizer 反分词 │ ├─────────────────────────────────────────────────────────────┤ │ 输入: token=791 │ │ 输出: "The" │ │ 消息: {"uid": "abc123", "text": "The"} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 5: Frontend 流式输出 │ ├─────────────────────────────────────────────────────────────┤ │ 输出: data: {"text": "The"}\n\n │ └─────────────────────────────────────────────────────────────┘ # 循环:Decode 阶段(每次生成 1 个 token) ┌─────────────────────────────────────────────────────────────┐ │ Scheduler (Decode): 输入 [1234, ..., 791] │ │ 输出 next_token=6864 ("capital") │ │ Tokenizer: 反分词 → " capital" │ │ Frontend: 输出 → data: {"text": " capital"}\n\n │ └─────────────────────────────────────────────────────────────┘ # 继续循环直到生成 EOS 或达到 max_tokens ... ┌─────────────────────────────────────────────────────────────┐ │ 最终输出: "The capital of France is Paris." │ └─────────────────────────────────────────────────────────────┘
时间线分析 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 时间轴(毫秒) 0ms ├─ Frontend 接收请求 1ms ├─ Tokenizer 分词 5ms ├─ Scheduler 调度 (Prefill) 25ms ├─ GPU 计算完成 26ms ├─ Tokenizer 反分词 27ms ├─ Frontend 输出 "The" ← TTFT (首 Token 延迟) │ 28ms ├─ Scheduler 调度 (Decode) 36ms ├─ GPU 计算完成 37ms ├─ Tokenizer 反分词 38ms ├─ Frontend 输出 " capital" │ 39ms ├─ Scheduler 调度 (Decode) ...
首 token 延迟 (TTFT) :27ms
后续 token 延迟 :~8ms/token
8. 核心技术总结
阶段 1:Frontend 接收请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @app.post("/v1/completions" ) async def create_completion (request: CompletionRequest ): uid = generate_uid() response_queue = asyncio.Queue() pending_requests[uid] = response_queue await zmq_send(tokenizer_socket, { "uid" : uid, "text" : request.prompt, "sampling_params" : request.to_sampling_params() }) async def generate (): while True : chunk = await response_queue.get() if chunk is None : break yield f"data: {json.dumps(chunk)} \n\n" return StreamingResponse(generate(), media_type="text/event-stream" )
关键点 :
使用 asyncio.Queue 实现异步等待
SSE (Server-Sent Events) 格式流式输出
非阻塞,可同时处理多个请求
阶段 2:Tokenizer 分词
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def tokenize_loop (self ): while True : msg = zmq_recv(frontend_socket) uid = msg["uid" ] text = msg["text" ] input_ids = self .tokenizer.encode(text) zmq_send(scheduler_socket, { "uid" : uid, "input_ids" : input_ids, "sampling_params" : msg["sampling_params" ] })
输出格式 :
1 2 3 4 5 "What is the capital of France?" [1234 , 374 , 279 , 6864 , 315 , 9822 , 30 ]
注意 :输出是 token IDs,不是词向量矩阵。
阶段 3:Scheduler 调度与 GPU 计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def event_loop (self ): while True : new_reqs = self .recv_requests() self .waiting_queue.extend(new_reqs) batch = self .schedule() if batch is None : continue if batch.is_prefill(): next_tokens = self .engine.forward_prefill(batch) else : next_tokens = self .engine.forward_decode(batch) next_tokens_cpu = next_tokens.to("cpu" , non_blocking=True ) for i, req in enumerate (batch.reqs): zmq_send(tokenizer_socket, { "uid" : req.uid, "token" : next_tokens_cpu[i].item() })
调度策略 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def schedule (self ) -> Batch | None : self .running_batch = [r for r in self .running_batch if not r.finished] available_slots = self .max_batch_size - len (self .running_batch) new_reqs = self .waiting_queue[:available_slots] self .waiting_queue = self .waiting_queue[available_slots:] self .running_batch.extend(new_reqs) prefill_reqs = [r for r in self .running_batch if r.is_prefill()] decode_reqs = [r for r in self .running_batch if r.is_decode()] if prefill_reqs: return Batch(prefill_reqs, phase="prefill" ) elif decode_reqs: return Batch(decode_reqs, phase="decode" ) else : return None
Continuous Batching 效果 :
1 2 3 4 时刻 0: [Req1(prefill), Req2(prefill), Req3(prefill)] 时刻 1: [Req1(decode), Req2(decode), Req3(decode)] 时刻 2: [Req1(decode), Req2(decode), Req3(decode), Req4(prefill)] # 新请求加入 时刻 3: [Req2(decode), Req3(decode), Req4(decode)] # Req1 完成,立即移除
阶段 4:Tokenizer 反分词
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class DetokenizeState : def __init__ (self, uid ): self .uid = uid self .tokens = [] self .read_offset = 0 self .surr_offset = 0 self .sent_offset = 0 def detokenize_loop (self ): states = {} while True : msg = zmq_recv(scheduler_socket) uid = msg["uid" ] token = msg["token" ] if uid not in states: states[uid] = DetokenizeState(uid) state = states[uid] state.tokens.append(token) state.read_offset += 1 text = self .tokenizer.decode( state.tokens[state.surr_offset:state.read_offset] ) if text.endswith("�" ): continue printable = find_printable_text(text) if printable: zmq_send(frontend_socket, { "uid" : uid, "text" : printable }) state.sent_offset += len (printable) state.surr_offset = state.read_offset
流式反分词的关键问题 :
问题 1:单个 token 可能不是完整字符
1 2 3 4 5 6 7 8 9 10 token1 = 228 token2 = 189 token3 = 160 decode([228 ]) → "�" decode([228 , 189 , 160 ]) → "你"
解决方案 :检查是否以 “�” 结尾,如果是则等待更多 token。
问题 2:单词边界问题
1 2 3 4 5 6 7 tokens = [1234 , 5678 , 9012 ] decode([1234 ]) → "Hel" decode([1234 , 5678 ]) → "Hello" decode([1234 , 5678 , 9012 ]) → "Hello world"
解决方案 :find_printable_text() 只输出完整单词。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def find_printable_text (text: str ) -> str : if text.endswith("�" ): return "" if text[-1 ].isspace(): return text last_space = text.rfind(" " ) if last_space == -1 : return "" return text[:last_space + 1 ]
示例 :
1 2 3 4 5 tokens: [1234 ] → "Hel" → 输出: "" (不完整) tokens: [1234 , 5678 ] → "Hello" → 输出: "" (无空格) tokens: [1234 , 5678 , 9012 ] → "Hello wo" → 输出: "" (不完整) tokens: [..., 1111 ] → "Hello world " → 输出: "Hello world " (完整)
5. 重叠调度(Overlap Scheduling)
5.1 问题定义
传统串行执行 :
1 2 3 4 5 6 7 while True : batch = schedule() gpu_compute(batch) wait_gpu_finish() process_results()
问题 :GPU 计算时,CPU 空闲;CPU 处理时,GPU 空闲。
5.2 重叠调度原理
核心思想 :GPU 计算 Batch N 的同时,CPU 处理 Batch N-1 的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 循环 1 : GPU: compute(Batch 1 ) ─────────────┐ CPU: idle │ 20ms ↓ 循环 2 : GPU: compute(Batch 2 ) ─────────────┐ CPU: process(Batch 1 ) ──────┐ │ ↓ ↓ 循环 3 : GPU: compute(Batch 3 ) ─────────────┐ CPU: process(Batch 2 ) ──────┐ │ ↓ ↓
效果 :CPU 和 GPU 并行工作,吞吐量提升 15-25%。
5.3 实现机制
CUDA Stream 和 Event
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 compute_stream = torch.cuda.Stream() copy_stream = torch.cuda.Stream() copy_done_event = torch.cuda.Event() last_data = None while True : if last_data is not None : copy_done_event.synchronize() process_results(last_data) batch = schedule() with torch.cuda.stream(compute_stream): logits = model.forward(batch) next_tokens = sample(logits) with torch.cuda.stream(copy_stream): next_tokens_cpu = next_tokens.to("cpu" , non_blocking=True ) copy_done_event.record() last_data = (batch, next_tokens_cpu)
关键点 :
异步执行 :model.forward() 返回时,GPU 可能还在计算
Event 同步 :copy_done_event.record() 插入标记,synchronize() 等待完成
数据生命周期 :通过 last_data 传递,避免被覆盖
为什么需要 Event?
1 2 3 4 5 6 7 8 9 10 next_tokens_cpu = next_tokens.to("cpu" , non_blocking=True ) process(next_tokens_cpu) next_tokens_cpu = next_tokens.to("cpu" , non_blocking=True ) copy_done_event.record() copy_done_event.synchronize() process(next_tokens_cpu)
5.4 性能分析
测试场景 :Llama-3-8B,batch_size=8,生成 100 tokens
方案
吞吐量 (tokens/s)
GPU 利用率
CPU 利用率
串行执行
1200
65%
40%
重叠调度
1450
78%
65%
提升 :
吞吐量:+21%
GPU 利用率:+13%
CPU 利用率:+25%
6. 数据流示例
6.1 完整示例
用户请求 :“What is the capital of France?”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 ┌─────────────────────────────────────────────────────────────┐ │ 阶段 1: Frontend 接收请求 │ ├─────────────────────────────────────────────────────────────┤ │ 输入: HTTP POST /v1/completions │ │ 输出: uid="abc123", 创建响应队列 │ │ 消息: {"uid": "abc123", "text": "What is the capital..."} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 2: Tokenizer 分词 │ ├─────────────────────────────────────────────────────────────┤ │ 输入: "What is the capital of France?" │ │ 输出: [1234, 374, 279, 6864, 315, 9822, 30] │ │ 消息: {"uid": "abc123", "input_ids": [1234, ...]} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 3: Scheduler 调度 (Prefill) │ ├─────────────────────────────────────────────────────────────┤ │ 输入: [1234, 374, 279, 6864, 315, 9822, 30] │ │ GPU: 并行计算所有 token 的 KV Cache │ │ 输出: next_token = 791 ("The") │ │ 消息: {"uid": "abc123", "token": 791} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 4: Tokenizer 反分词 │ ├─────────────────────────────────────────────────────────────┤ │ 输入: token=791 │ │ 输出: "The" │ │ 消息: {"uid": "abc123", "text": "The"} │ └─────────────────────────────────────────────────────────────┘ ↓ ZMQ ┌─────────────────────────────────────────────────────────────┐ │ 阶段 5: Frontend 流式输出 │ ├─────────────────────────────────────────────────────────────┤ │ 输出: data: {"text": "The"}\n\n │ └─────────────────────────────────────────────────────────────┘ # 循环:Decode 阶段(每次生成 1 个 token) ┌─────────────────────────────────────────────────────────────┐ │ Scheduler (Decode): token=791 → next_token=6864 ("capital")│ │ Tokenizer: 反分词 → " capital" │ │ Frontend: 输出 → data: {"text": " capital"}\n\n │ └─────────────────────────────────────────────────────────────┘ # 继续循环直到生成 EOS 或达到 max_tokens ... ┌─────────────────────────────────────────────────────────────┐ │ 最终输出: "The capital of France is Paris." │ └─────────────────────────────────────────────────────────────┘
6.2 时间线分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 时间轴(毫秒) 0ms ├─ Frontend 接收请求 1ms ├─ Tokenizer 分词 5ms ├─ Scheduler 调度 (Prefill) 25ms ├─ GPU 计算完成 26ms ├─ Tokenizer 反分词 27ms ├─ Frontend 输出 "The" │ 28ms ├─ Scheduler 调度 (Decode) 36ms ├─ GPU 计算完成 37ms ├─ Tokenizer 反分词 38ms ├─ Frontend 输出 " capital" │ 39ms ├─ Scheduler 调度 (Decode) ...
首 token 延迟 (TTFT) :27ms
后续 token 延迟 :~8ms/token
7. 核心技术总结
7.1 多进程架构优势
优势
说明
真正并行
绕过 Python GIL,CPU 和 GPU 同时工作
进程隔离
崩溃不影响其他进程,提高稳定性
资源管理
每个进程独立监控和限制资源
扩展性
可以部署到多台机器(分布式)
7.2 ZMQ 消息传递特点
特点
优势
低延迟
~10μs,比 gRPC 快 10x
高吞吐
10M msg/s
异步非阻塞
不等待响应,提高并发
简单可靠
无需 Protobuf,自动重连
7.3 流式反分词关键点
3 个偏移量 :
read_offset:已接收的 token 数量
surr_offset:已成功解码的 token 数量
sent_offset:已发送的字符数量
完整性检查 :
UTF-8 完整性:检查 “�”
单词边界:只输出完整单词
累积解码 :
单个 token 可能不完整
需要累积多个 token 重新解码
7.4 重叠调度效果
指标
串行执行
重叠调度
提升
吞吐量
1200 tokens/s
1450 tokens/s
+21%
GPU 利用率
65%
78%
+13%
CPU 利用率
40%
65%
+25%
8. 关键设计模式深度解析
8.1 异步生产者-消费者模式
核心问题 :如何在异步环境中协调生产者和消费者?
传统方案(轮询) :
1 2 3 4 5 6 async def wait_for_response (uid ): while True : if uid in responses and responses[uid]: return responses[uid].pop(0 ) await asyncio.sleep(0.01 )
问题 :
CPU 浪费:即使没有数据也要不断检查
延迟:最坏情况下延迟 = 轮询间隔
不可扩展:1000 个并发请求 = 1000 个轮询协程
Mini-SGLang 方案(事件通知) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 async def listen (self ): while True : msg = await self .recv_tokenizer.get() self .ack_map[msg.uid].append(msg) self .event_map[msg.uid].set () async def wait_for_ack (self, uid ): event = self .event_map[uid] while True : await event.wait() event.clear() pending = self .ack_map[uid] self .ack_map[uid] = [] for ack in pending: yield ack
优势 :
零 CPU 开销:没有数据时协程完全休眠
零延迟:数据到达立即唤醒
可扩展:10000 个并发请求也不会增加 CPU 负担
性能对比 :
方案
CPU 使用率
平均延迟
支持并发数
轮询 (0.01s)
15%
5ms
< 1000
事件通知
0.1%
< 0.1ms
> 10000
8.2 竞态条件与防御性编程
问题场景 :用户断开连接时的竞态条件
1 2 3 4 5 6 7 8 9 10 11 async def wait_for_ack (self, uid ): event = self .event_map[uid] while True : await event.wait() pending = self .ack_map[uid] async def abort_user (self, uid ): del self .ack_map[uid] del self .event_map[uid]
三种解决方案对比 :
方案 1:延迟删除(当前实现)
1 2 3 4 async def abort_user (self, uid ): await asyncio.sleep(0.1 ) if uid in self .ack_map: del self .ack_map[uid]
优点 :简单
缺点 :不可靠(0.1s 是经验值,不保证安全)
方案 2:防御性检查(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 async def wait_for_ack (self, uid ): event = self .event_map.get(uid) if event is None : return while True : await event.wait() event.clear() if uid not in self .ack_map: return pending = self .ack_map[uid] self .ack_map[uid] = [] for ack in pending: yield ack
优点 :可靠,无竞态条件
缺点 :需要多处检查
方案 3:取消令牌(最优)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @dataclass class UserSession : uid: int ack_queue: List [UserReply] event: asyncio.Event cancelled: bool = False async def wait_for_ack (self, session: UserSession ): while not session.cancelled: await session.event.wait() session.event.clear() for ack in session.ack_queue: yield ack session.ack_queue.clear() async def abort_user (self, uid ): session = self .sessions[uid] session.cancelled = True session.event.set ()
优点 :优雅、可靠、易于理解
缺点 :需要重构数据结构
8.3 流式输出的缓冲区管理
问题 :为什么需要 flush=True?
Python 输出缓冲机制 :
1 2 3 4 5 6 7 8 9 10 print ("Hello" ) print ("World" , end="" ) print ("!" ) for token in ["你" , "好" , "吗" ]: print (token, end="" ) time.sleep(1 )
*解决方案
1 2 3 4 5 6 7 8 9 10 for token in ["你" , "好" , "吗" ]: print (token, end="" ) sys.stdout.flush() time.sleep(1 ) for token in ["你" , "好" , "吗" ]: print (token, end="" , flush=True ) time.sleep(1 )
在 Shell 模式中的应用 :
1 2 3 async for chunk in response.body_iterator: msg = chunk.decode() print (msg, end="" , flush=True )
性能影响 :
方案
用户感知延迟
系统调用次数
适用场景
无 flush
高(批量输出)
少
批处理、日志
flush=True
低(实时输出)
多
流式生成、交互式
8.4 全局状态的单例模式
为什么使用全局变量?
1 2 3 4 5 6 _GLOBAL_STATE = None def get_global_state () -> FrontendManager: global _GLOBAL_STATE assert _GLOBAL_STATE is not None return _GLOBAL_STATE
替代方案对比 :
方案 1:全局变量(当前实现)
1 2 3 4 @app.post("/generate" ) async def generate (req ): state = get_global_state() ...
优点 :
简单直接
无需依赖注入
FastAPI 端点函数签名简洁
缺点 :
测试困难(需要 mock 全局变量)
不符合依赖注入原则
方案 2:FastAPI 依赖注入
1 2 3 4 5 6 def get_state () -> FrontendManager: return _GLOBAL_STATE @app.post("/generate" ) async def generate (req, state: FrontendManager = Depends(get_state ) ): ...
优点 :
符合 FastAPI 最佳实践
易于测试(可以注入 mock 对象)
缺点 :
方案 3:应用状态
1 2 3 4 5 6 app.state.frontend_manager = FrontendManager(...) @app.post("/generate" ) async def generate (req, request: Request ): state = request.app.state.frontend_manager ...
优点 :
缺点 :
为什么 Mini-SGLang 选择全局变量?
简洁性 :代码库小,全局状态只有一个
性能 :避免依赖注入的开销(虽然很小)
一致性 :与 vLLM、SGLang 等框架保持一致
8.5 异步上下文管理器
生命周期管理 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @asynccontextmanager async def lifespan (_: FastAPI ): logger.info("Server starting..." ) yield logger.info("Server shutting down..." ) global _GLOBAL_STATE if _GLOBAL_STATE is not None : _GLOBAL_STATE.shutdown() app = FastAPI(lifespan=lifespan)
执行时机 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 启动:python -m minisgl.server ↓ lifespan: 执行 yield 之前的代码 ↓ uvicorn: 开始监听 HTTP 请求 ↓ [服务器运行中...] ↓ 用户按 Ctrl+C ↓ uvicorn: 停止接受新请求 ↓ lifespan: 执行 yield 之后的代码 ↓ 进程退出
为什么需要 lifespan?
问题场景 :
1 2 3 4 5 6 7 8 9 app = FastAPI() _GLOBAL_STATE = FrontendManager(...)
使用 lifespan :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @asynccontextmanager async def lifespan (_: FastAPI ): global _GLOBAL_STATE _GLOBAL_STATE = FrontendManager(...) yield _GLOBAL_STATE.shutdown() logger.info("Cleanup complete" )势**: - 保证资源正确释放 - 优雅关闭(graceful shutdown) - 避免资源泄漏 **问题**:如何在响应结束后执行清理? ```python @app.post("/generate" ) async def generate (req ): uid = state.new_user() await state.send_one(TokenizeMsg(...)) async def _abort (): await state.abort_user(uid) return StreamingResponse( state.stream_generate(uid), background=BackgroundTask(_abort), )
执行时机 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 用户发起请求 ↓ generate() 函数执行 ↓ 返回 StreamingResponse ↓ 开始流式输出:yield "data: 你\n" ↓ 继续输出:yield "data: 好\n" ↓ 输出完成:yield "data: [DONE]\n" ↓ StreamingResponse 结束 ↓ BackgroundTask 执行:_abort() ← 在这里清理 ↓ 响应完全结束
三种结束场景 :
正常结束 :生成完成,发送 [DONE]
用户断开 :用户关闭浏览器
异常错误 :生成过程中出错
BackgroundTask 在所有场景下都会执行!
为什么不在 finally 中清理?
1 2 3 4 5 6 7 8 async def generate (req ): uid = state.new_user() try : async for chunk in state.stream_generate(uid): yield chunk finally : await state.abort_user(uid)
问题 :
生成器函数不能包含 try-finally
yield 之后的代码可能不会执行
✅ 使用 BackgroundTask :
由 FastAPI 框架保证执行
无论正常结束、断开还是异常都会执行
代码更清晰
8.7 消息路由与 UID 设计
问题 :如何在多个并发请求中正确路由消息?
UID 的作用 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 uid_a = 123 send_to_tokenizer({"uid" : 123 , "text" : "Hello" }) uid_b = 124 send_to_tokenizer({"uid" : 124 , "text" : "你好" }) recv_from_tokenizer({"uid" : 124 , "token" : 228 }) recv_from_tokenizer({"uid" : 123 , "token" : 791 }) ack_map[124 ].append(...) ack_map[123 ].append(...)
UID 生成策略 :
1 2 3 4 5 6 7 8 9 class FrontendManager : uid_counter: int = 0 def new_user (self ) -> int : uid = self .uid_counter self .uid_counter += 1 self .ack_map[uid] = [] self .event_map[uid] = asyncio.Event() return uid
为什么使用递增计数器?
方案
优点
缺点
递增计数器
简单、快速、有序
可预测(安全性低)
UUID
全局唯一、不可预测
较慢、占用空间大
随机数
不可预测
可能冲突
Mini-SGLang 选择递增计数器 :
内部系统,无需考虑安全性
性能最优(O(1))
便于调试(UID 有序)
9. 性能分析
8.1 延迟分解
首 token 延迟 (TTFT) :
1 2 3 总延迟 = 网络 + 分词 + 调度 + Prefill + 反分词 + 网络 = 1ms + 1ms + 3ms + 20ms + 1ms + 1ms = 27ms
后续 token 延迟 :
1 2 3 总延迟 = 调度 + Decode + 反分词 = 1ms + 6ms + 1ms = 8ms
8.2 吞吐量分析
单请求吞吐量 :
1 2 3 4 5 6 生成 100 tokens: = TTFT + 99 × 后续延迟 = 27ms + 99 × 8ms = 819ms 吞吐量 = 100 / 0.819 ≈ 122 tokens/s
批处理吞吐量 (batch_size=8):
1 2 3 4 5 6 7 生成 100 tokens × 8 请求: = TTFT + 99 × 后续延迟(批处理) = 27ms + 99 × 8ms = 819ms 总 tokens = 100 × 8 = 800 吞吐量 = 800 / 0.819 ≈ 977 tokens/s
加速比 :977 / 122 ≈ 8x(理想情况)
8.3 瓶颈分析
Prefill 阶段 :
瓶颈:GPU 计算(compute-bound)
优化方向:FlashAttention、Tensor Parallelism
Decode 阶段 :
瓶颈:KV Cache 访存(memory-bound)
优化方向:PagedAttention、CUDA Graph
CPU 处理 :
瓶颈:分词/反分词(CPU-bound)
优化方向:多进程、重叠调度
9. 关键代码位置
组件
文件
关键函数
Frontend
python/minisgl/server/api_server.py
create_completion()
Tokenizer
python/minisgl/tokenizer/tokenize.py
tokenize_loop()
Detokenizer
python/minisgl/tokenizer/detokenize.py
detokenize_loop()
Scheduler
python/minisgl/scheduler/scheduler.py
event_loop(), schedule()
Engine
python/minisgl/engine/engine.py
forward()
ZMQ 通信
python/minisgl/message/backend.py
zmq_send(), zmq_recv()
10. 设计模式总结
10.1 架构模式
多进程架构 :
分离关注点(HTTP、分词、GPU 计算)
真正并行,绕过 GIL
进程隔离,提高稳定性
消息传递 :
ZMQ 异步非阻塞
DEALER-ROUTER 模式
uid 路由和关联
流式处理 :
SSE 流式输出
增量反分词
降低首 token 延迟
10.2 性能优化模式
Continuous Batching :
动态添加/移除请求
GPU 始终满载
吞吐量提升 2-3x
Overlap Scheduling :
CPU 和 GPU 并行工作
CUDA Stream 和 Event
吞吐量提升 15-25%
异步复制 :
non_blocking=True
隐藏 GPU→CPU 复制延迟
提高 GPU 利用率
11. 与其他框架对比
特性
Mini-SGLang
vLLM
TGI
多进程架构
✅ (3 进程)
✅ (2 进程)
❌ (单进程)
消息传递
ZMQ
ZMQ
内存队列
流式反分词
✅ 增量
✅ 增量
✅ 批量
重叠调度
✅
✅
❌
Continuous Batching
✅
✅
✅
代码复杂度
低
中
高
Mini-SGLang 的优势 :
架构清晰,易于理解
保留核心优化技术
适合学习和二次开发
12. 总结
Mini-SGLang 通过精心设计的多进程架构和消息传递机制,实现了高性能的 LLM 推理系统:
核心架构
3 进程模型 :Frontend (HTTP) → Tokenizer (分词) → Scheduler (GPU)
ZMQ 消息传递 :低延迟(~10μs)、高吞吐(10M msg/s)
流式反分词 :3 个偏移量、完整性检查、累积解码
重叠调度 :CPU 和 GPU 并行工作,吞吐量提升 15-25%
性能指标
首 token 延迟 :27ms
后续 token 延迟 :8ms/token
批处理加速 :8x(batch_size=8)
GPU 利用率 :78%(重叠调度)
设计亮点
真正并行 :绕过 Python GIL
异步非阻塞 :提高并发能力
进程隔离 :提高稳定性
流式输出 :降低用户感知延迟
这些设计不仅保证了性能,还保持了代码的简洁性和可维护性,是学习 LLM 推理系统的绝佳材料。
参考文献
ZeroMQ Guide - ZMQ 官方文档
Orca: A Distributed Serving System for Transformer-Based Generative Models - Continuous Batching
vLLM: Easy, Fast, and Cheap LLM Serving - PagedAttention
CUDA Streams and Events - CUDA 官方文档
下期预告
Mini-SGLang 源码解析(三):调度系统与 KV Cache 管理
Scheduler 调度策略详解
PagedAttention 实现原理
Radix Cache 前缀共享
内存分配和回收机制
本文基于 Mini-SGLang 源码分析,所有代码示例均可在项目中找到对应实现。