From 342bf2d4c1187a609adc441f011492b59ac4b4df Mon Sep 17 00:00:00 2001 From: zhangxiao35 Date: Tue, 2 Jun 2026 13:20:06 +0800 Subject: [PATCH 1/3] Revert blockwise CUDAGraph and support piecewise CUDAGraph in prefill --- fastdeploy/config.py | 7 + fastdeploy/envs.py | 10 - .../graph_optimization/cuda_graph_op.py | 320 ------------------ .../graph_optimization/utils.py | 6 + .../layers/attention/append_attn_backend.py | 11 - fastdeploy/model_executor/layers/linear.py | 4 - fastdeploy/model_executor/layers/moe/ep.py | 5 +- fastdeploy/model_executor/layers/moe/moe.py | 1 - .../model_executor/layers/normalization.py | 26 +- fastdeploy/model_executor/models/glm4_moe.py | 5 +- fastdeploy/worker/gpu_model_runner.py | 52 +-- fastdeploy/worker/gpu_worker.py | 3 - .../test_block_wise_cuda_graph.py | 200 ----------- .../test_cuda_graph_op_unit.py | 181 ---------- 14 files changed, 30 insertions(+), 801 deletions(-) delete mode 100644 fastdeploy/model_executor/graph_optimization/cuda_graph_op.py delete mode 100644 tests/graph_optimization/test_block_wise_cuda_graph.py delete mode 100644 tests/graph_optimization/test_cuda_graph_op_unit.py diff --git a/fastdeploy/config.py b/fastdeploy/config.py index ff926a3d90f..6fd70cdd9ee 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1211,6 +1211,13 @@ def _set_cudagraph_sizes( # Shape [256, 288, ... 992, 1024] draft_capture_sizes += [32 * i for i in range(9, 33)] + # Shape [1024, 1088, ... 2048] step=64 + draft_capture_sizes += [64 * i for i in range(17, 33)] + # Shape [2048, 2176, ... 4096] step=128 + draft_capture_sizes += [128 * i for i in range(17, 33)] + # Shape [4096, 4352, ... 8192] step=256 + draft_capture_sizes += [256 * i for i in range(17, 33)] + draft_capture_sizes_prefill = draft_capture_sizes.copy() draft_capture_sizes.append(max_capture_size) self.cudagraph_capture_sizes = sorted(draft_capture_sizes) diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 5fd73e1962b..ad12f32ff4b 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -263,16 +263,6 @@ def _validate_split_kv_size(value: int) -> int: "FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST": lambda: bool( int(os.getenv("FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST", "1")) ), - # Whether to enable block-wise CUDA Graph capture/replay. - # When enabled, individual layer forward methods decorated with @block_wise_cuda_graph_wrap - # will be captured and replayed as CUDA Graphs for improved performance. - # Set to 1 to enable; defaults to 0 (disabled). - "FD_USE_BLOCK_WISE_CUDA_GRAPH": lambda: bool(int(os.getenv("FD_USE_BLOCK_WISE_CUDA_GRAPH", "0"))), - # Comma-separated list of token counts to pre-capture for block-wise CUDA Graphs. - # Used during the warmup phase to pre-capture graphs for these specific sizes. - # At runtime, token counts not in this list fall back to eager execution. - # Example: "1,2,4,8,16,32,64,128,256,512" - "FD_BLOCK_WISE_CUDA_GRAPH_SIZES": lambda: os.getenv("FD_BLOCK_WISE_CUDA_GRAPH_SIZES", "128,256,512,1024,2048"), # Suspend rollouting routing replay "FD_SUSPEND_ROUTING_REPLAY": lambda: bool(int(os.getenv("FD_SUSPEND_ROUTING_REPLAY", "0"))), # train-infer consistency, used in RL diff --git a/fastdeploy/model_executor/graph_optimization/cuda_graph_op.py b/fastdeploy/model_executor/graph_optimization/cuda_graph_op.py deleted file mode 100644 index 3daecc87f69..00000000000 --- a/fastdeploy/model_executor/graph_optimization/cuda_graph_op.py +++ /dev/null @@ -1,320 +0,0 @@ -""" -# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License" -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" - -import functools -import inspect -from typing import Callable, Optional, Sequence - -import paddle -from paddleformers.utils.log import logger as _LOGGER - -import fastdeploy - -# ---- Module-level state for pre-captured block-wise CUDA graphs ---- - -# When True, the wrapper is in the capture phase (during dummy_run) and -# will capture new graphs. When False, uncached keys fall back to eager. -_BLOCK_WISE_CAPTURING: bool = False - -# Registry of all shared-mode graph caches, for bulk clearing. -_ALL_SHARED_CACHES: list = [] - -# Global counter / registry of all captured block-wise graphs (for logging). -# Each entry: (qualname, key, shared_mode) -_CAPTURED_GRAPH_LOG: list = [] - - -def get_captured_graph_log(): - """Return the list of all captured (qualname, key, shared) triples.""" - return list(_CAPTURED_GRAPH_LOG) - - -def dump_captured_graph_summary(): - """Print a summary of all captured block-wise CUDA graphs.""" - from collections import Counter - - if not fastdeploy.envs.FD_BLOCK_WISE_DEBUG: - return - if not _CAPTURED_GRAPH_LOG: - _LOGGER.info("[block_wise_cuda_graph] no graph captured") - return - counter = Counter(q for q, _, _ in _CAPTURED_GRAPH_LOG) - _LOGGER.info( - f"[block_wise_cuda_graph] total captured graphs={len(_CAPTURED_GRAPH_LOG)} " - f"across {len(counter)} distinct methods:" - ) - for qname, cnt in sorted(counter.items(), key=lambda x: -x[1]): - _LOGGER.info(f" - {qname} : {cnt} graph(s)") - - -def set_block_wise_capturing(capturing: bool): - """Toggle the capture phase flag. Only capture graphs when this is True.""" - global _BLOCK_WISE_CAPTURING - _BLOCK_WISE_CAPTURING = capturing - - -def clear_all_block_wise_graphs(): - """Clear all shared block-wise graph caches (e.g. for RL weight updates).""" - for graphs, cinputs, coutputs in _ALL_SHARED_CACHES: - graphs.clear() - cinputs.clear() - coutputs.clear() - - -def block_wise_cuda_graph_wrap( - inputs: Sequence[str], - self_attrs: Sequence[str] = (), - key_fn: Optional[Callable[..., tuple]] = None, -): - """ - Method decorator that wraps a forward method with CUDA Graph capture/replay. - - On the first call for a given cache key (derived from tensor shapes/dtypes), - the decorated method is captured into a CUDA Graph. Subsequent calls with the - same key will replay the graph after updating input data pointers. - - When ``_BLOCK_WISE_CAPTURING`` is managed via ``set_block_wise_capturing``, - new graphs are only captured during the capture phase (dummy_run). At runtime, - uncached keys fall back to eager execution, avoiding expensive on-the-fly captures. - - When ``self_attrs`` is provided, the named tensor attributes of ``self`` - (e.g. ``weight``) are also tracked for pointer replacement, and the graph - cache is **shared across all instances** (closure-level). This allows layers - with identical computation but different weights to share a single captured - graph, dramatically reducing the total number of graphs from O(num_layers) - to O(num_unique_shapes). - - When ``self_attrs`` is empty (default), graphs are cached per instance. - - Output tensors from the capture phase are reused across replays — the graph - always writes to the same output memory. This avoids per-replay allocation - overhead. Callers must consume the output before the next replay of the same - graph (which is naturally satisfied in sequential layer-by-layer forward). - - Args: - inputs: Names of parameters that are input tensors to be tracked for - CUDA Graph pointer replacement. These must be parameter names of the - decorated method. Only non-None tensor arguments are tracked. - self_attrs: Attribute names on ``self`` that are tensor parameters to be - replaced via pointer replacement (e.g. ``["weight"]``). When non-empty, - enables cross-instance graph sharing. - key_fn: Optional callable to generate the cache key from method arguments. - Signature: key_fn(arg0, arg1, ...) with args in declaration order - (excluding self). Defaults to a key based on tensor shapes/dtypes. - - Example: - class MyNorm(nn.Layer): - @block_wise_cuda_graph_wrap( - inputs=["x", "residual"], - self_attrs=["weight"], # all layers share one graph - ) - def forward(self, x, residual=None): - return rms_norm(x, self.weight), residual - """ - - def decorator(method: Callable) -> Callable: - sig = inspect.signature(method) - params = list(sig.parameters.keys()) # ["self", "x", "residual_input", ...] - _qualname = method.__qualname__ - - for name in inputs: - if name not in params or name == "self": - raise ValueError( - f"cuda_graph_wrap: input '{name}' is not a parameter of " - f"{method.__qualname__}. Available: {[p for p in params if p != 'self']}" - ) - - # ---- Pre-compute at decoration time (runs once) ---- - - _EMPTY = inspect.Parameter.empty - _Tensor = paddle.Tensor - - # For each non-self param: (name, args_index, default_value) - # args_index is position in *args (0-based, since self is consumed by Python) - _param_info = tuple((p, i - 1, sig.parameters[p].default) for i, p in enumerate(params) if p != "self") - - # For each declared input tensor: (name, args_index) - _input_info = tuple((name, params.index(name) - 1) for name in inputs) - - _self_attr_names = tuple(self_attrs) - _shared = len(_self_attr_names) > 0 - - _use_custom_key = key_fn is not None - - # --- Cache storage --- - # When self_attrs is provided: closure-level (shared across all instances) - # When not: per-instance (stored in self.__dict__) - if _shared: - _shared_graphs = {} - _shared_cinputs = {} - _shared_coutputs = {} # stores actual result tensors (reused across replays) - _ALL_SHARED_CACHES.append((_shared_graphs, _shared_cinputs, _shared_coutputs)) - - # Per-instance attribute key names - _g = f"_cg_{method.__name__}_g" - _ci = f"_cg_{method.__name__}_ci" - _co = f"_cg_{method.__name__}_co" - - @functools.wraps(method) - def wrapper(self, *args, **kwargs): - if not fastdeploy.envs.FD_USE_BLOCK_WISE_CUDA_GRAPH: - return method(self, *args, **kwargs) - - nargs = len(args) - - # Skip CUDA graph if any input tensor has a 0 in its shape - for a in args: - if isinstance(a, _Tensor) and 0 in a.shape: - return method(self, *args, **kwargs) - for v in kwargs.values(): - if isinstance(v, _Tensor) and 0 in v.shape: - return method(self, *args, **kwargs) - - # === Key generation: inline, no sig.bind === - if _use_custom_key: - # Resolve all args for custom key_fn - resolved = [] - for pname, aidx, default in _param_info: - if pname in kwargs: - resolved.append(kwargs[pname]) - elif aidx < nargs: - resolved.append(args[aidx]) - elif default is not _EMPTY: - resolved.append(default) - else: - resolved.append(None) - key = key_fn(*resolved) - else: - # Default: fast inline key from shapes/dtypes - _kp = [] - for pname, aidx, default in _param_info: - if pname in kwargs: - v = kwargs[pname] - elif aidx < nargs: - v = args[aidx] - else: - v = default - if isinstance(v, _Tensor): - _kp.append((tuple(v.shape), v.dtype)) - elif v is None: - _kp.append(None) - elif callable(v): - _kp.append(True) - # Include self_attrs shapes/dtypes in key - for attr_name in _self_attr_names: - attr = getattr(self, attr_name, None) - if attr is not None and isinstance(attr, _Tensor): - _kp.append((attr_name, tuple(attr.shape), attr.dtype)) - else: - _kp.append((attr_name, None)) - key = tuple(_kp) - - # === Get cache (shared or per-instance) === - if _shared: - graphs = _shared_graphs - cinputs = _shared_cinputs - coutputs = _shared_coutputs - else: - _d = self.__dict__ - try: - graphs = _d[_g] - cinputs = _d[_ci] - coutputs = _d[_co] - except KeyError: - graphs = {} - cinputs = {} - coutputs = {} - _d[_g] = graphs - _d[_ci] = cinputs - _d[_co] = coutputs - - if key not in graphs: - # === First encounter: only capture during capture phase === - if not _BLOCK_WISE_CAPTURING: - # Not in capture phase -- fall back to eager - return method(self, *args, **kwargs) - - # === Capture === - graph = paddle.device.cuda.graphs.CUDAGraph(enable_replace=True) - graphs[key] = graph - ci = {} - for name, aidx in _input_info: - v = kwargs[name] if name in kwargs else (args[aidx] if aidx < nargs else None) - if v is not None and isinstance(v, _Tensor): - ci[name] = v.data_ptr() - - # Record self_attrs pointers for cross-instance replacement - for attr_name in _self_attr_names: - attr = getattr(self, attr_name, None) - if attr is not None and isinstance(attr, _Tensor): - ci[f"__attr_{attr_name}"] = attr.data_ptr() - - cinputs[key] = ci - - graph.capture_begin() - result = method(self, *args, **kwargs) - graph.capture_end() - - # --- Log which op just entered the CUDA graph --- - _CAPTURED_GRAPH_LOG.append((_qualname, key, _shared)) - if fastdeploy.envs.FD_BLOCK_WISE_DEBUG: - _LOGGER.info( - f"[block_wise_cuda_graph] captured #{len(_CAPTURED_GRAPH_LOG)} " - f"op={_qualname} shared={_shared} key={key}" - ) - - graph.replay() - - # Store the actual result for reuse. The graph always writes to - # the same output memory, so we return the same tensors on replay. - coutputs[key] = result - return result - else: - # === Replay path (HOT PATH) === - old_ptrs = [] - new_ptrs = [] - ci = cinputs[key] - - for name, aidx in _input_info: - v = kwargs[name] if name in kwargs else (args[aidx] if aidx < nargs else None) - if v is not None and name in ci: - old_ptrs.append(ci[name]) - new_ptr = v.data_ptr() - new_ptrs.append(new_ptr) - ci[name] = new_ptr - - # Replace self_attrs pointers (e.g. weight) - for attr_name in _self_attr_names: - attr_key = f"__attr_{attr_name}" - if attr_key in ci: - attr = getattr(self, attr_name, None) - if attr is not None: - old_ptrs.append(ci[attr_key]) - new_ptr = attr.data_ptr() - new_ptrs.append(new_ptr) - ci[attr_key] = new_ptr - - if old_ptrs: - graphs[key].replace_input_ptrs(old_ptrs, new_ptrs) - graphs[key].replay() - - # Reuse the output tensors from capture — graph wrote fresh - # data to the same memory, no allocation needed. - return coutputs[key] - - return wrapper - - return decorator diff --git a/fastdeploy/model_executor/graph_optimization/utils.py b/fastdeploy/model_executor/graph_optimization/utils.py index 2b5241e5aed..8d38df5324d 100644 --- a/fastdeploy/model_executor/graph_optimization/utils.py +++ b/fastdeploy/model_executor/graph_optimization/utils.py @@ -140,3 +140,9 @@ def get_state(): sot_warmup_guard, in_sot_warmup_mode = create_guard(False) profile_run_guard, in_profile_run_mode = create_guard(False) + +# Guard for the prefill SOT warmup + piecewise-CUDAGraph capture phase. +# When active, keep the SOT-compiled graph intact during prefill capture +# by avoiding extra graph break points or graph fragmentation. +# The decode-only CUDAGraph capture runs OUTSIDE this guard. +prefill_cudagraph_guard, in_prefill_cudagraph_mode = create_guard(False) diff --git a/fastdeploy/model_executor/layers/attention/append_attn_backend.py b/fastdeploy/model_executor/layers/attention/append_attn_backend.py index 76de638bce6..ec85434702e 100644 --- a/fastdeploy/model_executor/layers/attention/append_attn_backend.py +++ b/fastdeploy/model_executor/layers/attention/append_attn_backend.py @@ -321,17 +321,6 @@ def forward_mixed( q_norm_weight = getattr(layer, "q_norm_weight", None) if norm_after_rope_in_kernel else None k_norm_weight = getattr(layer, "k_norm_weight", None) if norm_after_rope_in_kernel else None - if self.rope_3d: - assert len(forward_meta.rotary_embs.shape) == 6 - else: - assert len(forward_meta.rotary_embs.shape) == 5 - if layer.use_neox_rotary_style: - assert forward_meta.rotary_embs.shape[0:4] == [2, 1, self.max_seq_len, 1] - # 128 is qwen3 - # 32 is glm - # 64 is gpt-oss - assert forward_meta.rotary_embs.shape[4] in [128, 32, 64] - if self.pd_disaggregation_mode == "per_query": metadata.kv_signal_data_list[layer.layer_id] = init_signal_layerwise( metadata.kv_signal_metadata, diff --git a/fastdeploy/model_executor/layers/linear.py b/fastdeploy/model_executor/layers/linear.py index 4f4985173b7..f1fdac31a84 100644 --- a/fastdeploy/model_executor/layers/linear.py +++ b/fastdeploy/model_executor/layers/linear.py @@ -25,9 +25,6 @@ decode_alltoall_transpose, tensor_model_parallel_all_reduce, ) -from fastdeploy.model_executor.graph_optimization.cuda_graph_op import ( - block_wise_cuda_graph_wrap, -) from fastdeploy.model_executor.layers.quantization.quant_base import QuantMethodBase from fastdeploy.model_executor.utils import ( default_weight_loader, @@ -272,7 +269,6 @@ def load_state_dict(self, state_dict: dict): bias_tensor = paddle.to_tensor(get_tensor(state_dict.pop(self.bias_key))) self.bias.set_value(bias_tensor) - @block_wise_cuda_graph_wrap(inputs=["x"], self_attrs=["weight", "weight_scale_inv", "bias"]) def forward_cuda(self, x: paddle.Tensor) -> paddle.Tensor: """ Forward function for Linear. diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 967c2a2fd02..b80563691d1 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -27,6 +27,7 @@ import fastdeploy from fastdeploy import envs from fastdeploy.config import MoEPhase +from fastdeploy.model_executor.layers.moe.moe import get_moe_scores from fastdeploy.platforms import current_platform from fastdeploy.utils import singleton @@ -500,8 +501,6 @@ def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): ) = layer.redundant_table_manger.get_ep_rank_to_expert_id_list_by_layer(layer.layer_idx) if layer.topk_method == "noaux_tc": - from .moe import get_moe_scores - score, topk_weights, topk_idx = get_moe_scores( gate_out, layer.n_group, @@ -530,8 +529,6 @@ def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): ) else: if layer.topk_method == "noaux_tc": - from fastdeploy.model_executor.layers.moe.moe import get_moe_scores - use_fused = ( layer.fd_config.scheduler_config.enable_moe_scores_elementwise_fuse and current_platform.is_cuda() ) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 1d4f392208f..a9123a9145a 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -106,7 +106,6 @@ def get_moe_scores( """ compute moe scores using e_score_correction_bias. """ - assert e_score_correction_bias is not None, "e_score_correction_bias is none!" if envs.FD_USE_PHI_MOE_TOPK: # calculate renormalize and routed_scaling_factor value outside the noaux_tc original_renormalize = renormalize diff --git a/fastdeploy/model_executor/layers/normalization.py b/fastdeploy/model_executor/layers/normalization.py index 3894978b61f..498351b614d 100644 --- a/fastdeploy/model_executor/layers/normalization.py +++ b/fastdeploy/model_executor/layers/normalization.py @@ -21,9 +21,6 @@ from paddle import nn from fastdeploy.model_executor.forward_meta import ForwardMeta -from fastdeploy.model_executor.graph_optimization.cuda_graph_op import ( - block_wise_cuda_graph_wrap, -) from fastdeploy.platforms import current_platform if current_platform.is_gcu(): @@ -212,7 +209,6 @@ def allgather(self, out, token_num): paddle.distributed.all_gather(multi_outs, out, self.tp_group) return multi_outs[:token_num, :] - @block_wise_cuda_graph_wrap(inputs=["x", "residual_input"], self_attrs=["weight"]) def forward( self, x, @@ -236,24 +232,26 @@ def forward( The `residual_output` is the result of applying the normalization and possibly other operations (like linear transformation) on the `residual_input`. """ + has_residual = residual_input is not None + x_dtype = x.dtype x = x.astype(self.weight.dtype) - if residual_input is not None: + if has_residual: residual_input_dtype = residual_input.dtype residual_input = residual_input.astype(self.weight.dtype) - if residual_input is None: + if not has_residual: residual_out = x use_allreduce_fused = ( self.enable_all_reduce_fusion and self.tp_size > 1 and x.shape[0] <= 2048 - and residual_input is not None + and has_residual and current_platform.is_cuda() ) if proxy_rmsnorm is None: if current_platform.is_gcu(): - if residual_input is None: + if not has_residual: norm_out = rms_norm(x, self.weight, self.eps) return norm_out.astype(x_dtype), residual_out norm_out = self.norm_func(x, residual_input, self.weight, self.eps) @@ -266,7 +264,7 @@ def forward( else: if is_batch_invariant_mode_enabled(): # M-invariant path: per-row Triton kernel, no cross-row reduction - if residual_input is not None: + if has_residual: x = x + residual_input norm_out = rms_norm_batch_invariant(x, self.weight, self.eps), x else: @@ -286,20 +284,16 @@ def forward( else: if use_allreduce_fused: norm_out = flashinfer_allreduce_residual_rmsnorm( - fd_config=self.fd_config, - input_tensor=x, - residual=residual_input, - weight=self.weight, - eps=self.eps, + fd_config=self.fd_config, input_tensor=x, residual=residual_input, weight=self.weight, eps=self.eps ) assert norm_out[0] is not None, "Trtllm-all-reduce fusion failed!" else: - if residual_input is not None: + if has_residual: x = x + residual_input norm_out = proxy_rmsnorm(x, self.weight, self.eps), x out = norm_out[0].astype(x_dtype) - if residual_input is not None: + if has_residual: residual_out = norm_out[1].astype(residual_input_dtype) if self.split_x: diff --git a/fastdeploy/model_executor/models/glm4_moe.py b/fastdeploy/model_executor/models/glm4_moe.py index 434f34ceade..00dfc366db0 100644 --- a/fastdeploy/model_executor/models/glm4_moe.py +++ b/fastdeploy/model_executor/models/glm4_moe.py @@ -352,7 +352,10 @@ def forward( # Fully Connected hidden_states, residual = self.post_attention_layernorm(hidden_states, residual, proxy_rmsnorm=proxy_rmsnorm) - hidden_states = self.mlp(hidden_states, forward_meta) + if isinstance(self.mlp, Glm4Moe): + hidden_states = self.mlp(hidden_states, forward_meta) + else: + hidden_states = self.mlp(hidden_states, forward_meta) return hidden_states, residual diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 31c7d491035..a6a761f0730 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -31,6 +31,7 @@ from fastdeploy.engine.pooling_params import PoolingParams from fastdeploy.engine.request import BatchRequest, ImagePosition, Request, RequestType from fastdeploy.model_executor.graph_optimization.utils import ( + prefill_cudagraph_guard, profile_run_guard, sot_warmup_guard, ) @@ -2170,6 +2171,7 @@ def capture_model(self) -> None: time_after_capture = time.perf_counter() logger.info(f"Cuda Graph capturing took {time_after_capture - time_before_capture} seconds") + @prefill_cudagraph_guard(True) @sot_warmup_guard(True) def capture_model_prefill_and_mixed(self) -> None: """ @@ -3072,12 +3074,6 @@ def clear_parameters(self, pid): if self.use_cudagraph: self.model.clear_graph_opt_backend() - if envs.FD_USE_BLOCK_WISE_CUDA_GRAPH: - from fastdeploy.model_executor.graph_optimization.cuda_graph_op import ( - clear_all_block_wise_graphs, - ) - - clear_all_block_wise_graphs() if ( self.speculative_decoding and self.spec_method == SpecMethod.MTP @@ -3521,47 +3517,3 @@ def initialize_routing_replay_manager(self): block_table=self.share_inputs["block_tables"], total_block_num=self.num_gpu_blocks, ) - - def capture_block_wise_graphs(self) -> None: - """ - Independent capture loop for block-wise CUDA graphs. - Pre-captures graphs for designated token counts so that at runtime, - matching sizes replay the graph while other sizes fall back to eager. - """ - if not envs.FD_USE_BLOCK_WISE_CUDA_GRAPH: - return - - from fastdeploy.model_executor.graph_optimization.cuda_graph_op import ( - set_block_wise_capturing, - ) - - # Parse capture sizes from env var - sizes_str = envs.FD_BLOCK_WISE_CUDA_GRAPH_SIZES - capture_sizes = sorted([int(s.strip()) for s in sizes_str.split(",") if s.strip()], reverse=True) - if not capture_sizes: - logger.warning("FD_BLOCK_WISE_CUDA_GRAPH_SIZES is empty, skipping block-wise CUDA graph capture") - return - - logger.info(f"Block-wise CUDA graph capture starting for sizes: {sorted(capture_sizes)}") - time_before_capture = time.perf_counter() - - set_block_wise_capturing(True) - try: - for num_tokens in capture_sizes: - batch_size = min(num_tokens, self.scheduler_config.max_num_seqs) - if batch_size < 1: - batch_size = 1 - self._dummy_run( - num_tokens=num_tokens, - batch_size=batch_size, - in_capturing=False, - ) - logger.info(f"Block-wise CUDA graph captured for num_tokens={num_tokens}") - finally: - set_block_wise_capturing(False) - - time_after_capture = time.perf_counter() - logger.info( - f"Block-wise CUDA graph capturing took {time_after_capture - time_before_capture:.3f} seconds " - f"for {len(capture_sizes)} sizes" - ) diff --git a/fastdeploy/worker/gpu_worker.py b/fastdeploy/worker/gpu_worker.py index c8dd0cca535..a1f75a04e8f 100644 --- a/fastdeploy/worker/gpu_worker.py +++ b/fastdeploy/worker/gpu_worker.py @@ -245,9 +245,6 @@ def graph_optimize_and_warm_up_model(self) -> None: # Capture CUDAGraph for decode phase (all modes) self.model_runner.capture_model() - # Block-wise CUDA graph capture (independent loop) - self.model_runner.capture_block_wise_graphs() - # Deterministic mode: reset RNG and share_inputs after warmup. # Warmup _dummy_run() calls consume CUDA RNG state and leave stale # data (infer_seed, stop_flags, seq_lens, etc.) in share_inputs. diff --git a/tests/graph_optimization/test_block_wise_cuda_graph.py b/tests/graph_optimization/test_block_wise_cuda_graph.py deleted file mode 100644 index 8c39644e3b8..00000000000 --- a/tests/graph_optimization/test_block_wise_cuda_graph.py +++ /dev/null @@ -1,200 +0,0 @@ -# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import shutil -import signal -import subprocess -import sys -import time - -import pytest -import requests - -tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) -sys.path.insert(0, tests_dir) - -from e2e.utils.serving_utils import ( - FD_API_PORT, - FD_CACHE_QUEUE_PORT, - FD_ENGINE_QUEUE_PORT, - FD_METRICS_PORT, - PORTS_TO_CLEAN, - clean_ports, - is_port_open, -) - - -@pytest.fixture(scope="session", autouse=True) -def setup_and_run_server(api_url): - """ - Pytest fixture that runs once per test session: - - Cleans ports before tests - - Starts the API server with block-wise CUDA graph env vars enabled - - Waits for server port to open (up to 300 seconds) - - Tears down server after all tests finish - """ - print("Pre-test port cleanup...") - - ports_to_add = [ - FD_API_PORT + 1, - FD_METRICS_PORT + 1, - FD_CACHE_QUEUE_PORT + 1, - FD_ENGINE_QUEUE_PORT + 1, - ] - - for port in ports_to_add: - if port not in PORTS_TO_CLEAN: - PORTS_TO_CLEAN.append(port) - - clean_ports(PORTS_TO_CLEAN) - - print("log dir clean") - if os.path.exists("log") and os.path.isdir("log"): - shutil.rmtree("log") - - base_path = os.getenv("MODEL_PATH") - if base_path: - model_path = os.path.join(base_path, "ernie-4_5-21b-a3b-bf16-paddle") - else: - model_path = "./ernie-4_5-21b-a3b-bf16-paddle" - - log_path = "server.log" - cmd = [ - sys.executable, - "-m", - "fastdeploy.entrypoints.openai.multi_api_server", - "--num-servers", - "2", - "--ports", - f"{FD_API_PORT},{FD_API_PORT + 1}", - "--metrics-ports", - f"{FD_METRICS_PORT},{FD_METRICS_PORT + 1}", - "--args", - "--model", - model_path, - "--engine-worker-queue-port", - f"{FD_ENGINE_QUEUE_PORT},{FD_ENGINE_QUEUE_PORT + 1}", - "--cache-queue-port", - f"{FD_CACHE_QUEUE_PORT},{FD_CACHE_QUEUE_PORT + 1}", - "--tensor-parallel-size", - "1", - "--data-parallel-size", - "2", - "--max-model-len", - "65536", - "--max-num-seqs", - "32", - "--quantization", - "block_wise_fp8", - "--max-num-batched-tokens", - "128", - ] - - # Build env with block-wise CUDA graph enabled - env = os.environ.copy() - env["FD_USE_BLOCK_WISE_CUDA_GRAPH"] = "1" - env["FD_BLOCK_WISE_CUDA_GRAPH_SIZES"] = "128" - env["FD_USE_PHI_FP8_QUANT"] = "0" - env["CUDA_VISIBLE_DEVICES"] = "0,1" - env["FD_BLOCK_WISE_DEBUG"] = "1" - - if os.path.exists("log"): - shutil.rmtree("log") - - with open(log_path, "w") as logfile: - process = subprocess.Popen( - cmd, - stdout=logfile, - stderr=subprocess.STDOUT, - env=env, - start_new_session=True, - ) - - # Wait up to 300 seconds for API server to be ready - for _ in range(300): - if is_port_open("127.0.0.1", FD_API_PORT): - print(f"Server is up on port {FD_API_PORT}") - break - time.sleep(1) - else: - print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") - try: - os.killpg(process.pid, signal.SIGTERM) - except Exception as e: - print(f"Failed to kill process group: {e}") - raise RuntimeError(f"API server did not start on port {FD_API_PORT}") - - yield # Run tests - - print("\n===== Post-test server cleanup... =====") - try: - os.killpg(process.pid, signal.SIGTERM) - time.sleep(10) - print(f"server (pid={process.pid}) terminated") - except Exception as e: - print(f"Failed to terminate API server: {e}") - - clean_ports(PORTS_TO_CLEAN) - - -@pytest.fixture(scope="session") -def api_url(request): - """ - Returns the API endpoint URL for chat completions. - """ - return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions" - - -@pytest.fixture -def headers(): - """ - Returns common HTTP request headers. - """ - return {"Content-Type": "application/json"} - - -def send_request(url, payload, timeout=60): - """ - Send a POST request to the specified URL with the given payload. - """ - headers = {"Content-Type": "application/json"} - try: - res = requests.post(url, headers=headers, json=payload, timeout=timeout) - return res - except requests.exceptions.Timeout: - print(f"Request timed out (>{timeout} seconds)") - return None - except requests.exceptions.RequestException as e: - print(f"Request failed: {e}") - return None - - -def test_block_wise_cuda_graph_beijing(api_url): - """ - Verify that block-wise CUDA graph produces correct output. - - With FD_USE_BLOCK_WISE_CUDA_GRAPH=1 and FD_BLOCK_WISE_CUDA_GRAPH_SIZES set, - ask about Tiananmen Square in Beijing and verify the response mentions "北京". - """ - payload = { - "stream": False, - "messages": [{"role": "user", "content": "北京天安门在哪里"}], - "max_tokens": 128, - } - - response = send_request(url=api_url, payload=payload) - print("response: ", response) - assert response is not None, "Request returned None (timeout or connection error)" - assert response.status_code == 200, f"Request failed with status {response.status_code}: {response.text}" diff --git a/tests/graph_optimization/test_cuda_graph_op_unit.py b/tests/graph_optimization/test_cuda_graph_op_unit.py deleted file mode 100644 index ad61d08bfc5..00000000000 --- a/tests/graph_optimization/test_cuda_graph_op_unit.py +++ /dev/null @@ -1,181 +0,0 @@ -# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -In-process unit tests for -``fastdeploy.model_executor.graph_optimization.cuda_graph_op``. - -These tests exercise helper functions and the non-capturing branches of the -``block_wise_cuda_graph_wrap`` decorator to improve coverage. They intentionally -avoid invoking the actual CUDA Graph capture path so that they can run without -spinning up a serving process or recording on a CUDA stream. -""" - -import paddle -import pytest - -import fastdeploy -from fastdeploy.model_executor.graph_optimization import cuda_graph_op - - -class _ToyLayer(paddle.nn.Layer): - """Minimal layer used to exercise the wrap decorator paths.""" - - def __init__(self): - super().__init__() - self.weight = paddle.ones([2, 2], dtype="float32") - - -def _set_env(monkeypatch, **kwargs): - """Helper to override FD env vars dynamically via monkeypatch.""" - for key, value in kwargs.items(): - monkeypatch.setattr(fastdeploy.envs, key, value, raising=False) - - -def test_get_captured_graph_log_returns_copy(monkeypatch): - """get_captured_graph_log should return a list copy of the registry.""" - monkeypatch.setattr(cuda_graph_op, "_CAPTURED_GRAPH_LOG", [("op", ("k",), False)]) - log = cuda_graph_op.get_captured_graph_log() - assert log == [("op", ("k",), False)] - log.append(("other", (), True)) - assert cuda_graph_op._CAPTURED_GRAPH_LOG == [("op", ("k",), False)] - - -def test_dump_captured_graph_summary_paths(monkeypatch): - """Cover early-returns and the summary-print branch of dump_captured_graph_summary.""" - # Debug disabled: early return. - _set_env(monkeypatch, FD_BLOCK_WISE_DEBUG=False) - monkeypatch.setattr(cuda_graph_op, "_CAPTURED_GRAPH_LOG", [("op", (), False)]) - cuda_graph_op.dump_captured_graph_summary() - - # Debug enabled, empty log. - _set_env(monkeypatch, FD_BLOCK_WISE_DEBUG=True) - monkeypatch.setattr(cuda_graph_op, "_CAPTURED_GRAPH_LOG", []) - cuda_graph_op.dump_captured_graph_summary() - - # Debug enabled, non-empty log. - monkeypatch.setattr( - cuda_graph_op, - "_CAPTURED_GRAPH_LOG", - [("a.fwd", ("k1",), True), ("a.fwd", ("k2",), True), ("b.fwd", ("k1",), False)], - ) - cuda_graph_op.dump_captured_graph_summary() - - -def test_clear_all_block_wise_graphs(): - """clear_all_block_wise_graphs should empty every registered shared cache.""" - g, ci, co = {"k": object()}, {"k": object()}, {"k": object()} - snapshot = list(cuda_graph_op._ALL_SHARED_CACHES) - try: - cuda_graph_op._ALL_SHARED_CACHES.clear() - cuda_graph_op._ALL_SHARED_CACHES.append((g, ci, co)) - cuda_graph_op.clear_all_block_wise_graphs() - assert g == {} and ci == {} and co == {} - finally: - cuda_graph_op._ALL_SHARED_CACHES.clear() - cuda_graph_op._ALL_SHARED_CACHES.extend(snapshot) - - -def test_block_wise_wrap_invalid_input_raises(): - """Decorator should raise ValueError when 'inputs' name is not a parameter.""" - with pytest.raises(ValueError): - - @cuda_graph_op.block_wise_cuda_graph_wrap(inputs=["nonexistent"]) - def forward(self, x): - return x - - -def test_block_wise_wrap_disabled_passthrough(monkeypatch): - """When FD_USE_BLOCK_WISE_CUDA_GRAPH is off, wrapper should call eager.""" - _set_env(monkeypatch, FD_USE_BLOCK_WISE_CUDA_GRAPH=False) - - class M(_ToyLayer): - @cuda_graph_op.block_wise_cuda_graph_wrap(inputs=["x"]) - def forward(self, x, residual=None): - return x + 1 - - m = M() - x = paddle.zeros([2, 2], dtype="float32") - out = m.forward(x) - assert paddle.all(out == 1).item() - - -def test_block_wise_wrap_zero_shape_skips(monkeypatch): - """A zero-dim tensor input (positional or keyword) should bypass capture.""" - _set_env(monkeypatch, FD_USE_BLOCK_WISE_CUDA_GRAPH=True) - cuda_graph_op.set_block_wise_capturing(False) - - class M(_ToyLayer): - @cuda_graph_op.block_wise_cuda_graph_wrap(inputs=["x"]) - def forward(self, x, residual=None): - return x - - m = M() - empty = paddle.zeros([0, 4], dtype="float32") - # Positional zero-shape arg: hits the `for a in args` branch. - assert m.forward(empty).shape == [0, 4] - # Keyword zero-shape arg: hits the `for v in kwargs.values()` branch. - assert m.forward(paddle.ones([2, 2]), residual=empty).shape == [2, 2] - - -def test_block_wise_wrap_per_instance_cache_eager_fallback(monkeypatch): - """Per-instance cache init + eager fallback when not in capture phase.""" - _set_env(monkeypatch, FD_USE_BLOCK_WISE_CUDA_GRAPH=True) - cuda_graph_op.set_block_wise_capturing(False) - - class M(_ToyLayer): - @cuda_graph_op.block_wise_cuda_graph_wrap(inputs=["x"]) - def forward(self, x, hook=None): - return x * 2 - - m = M() - x = paddle.ones([2, 2], dtype="float32") - # First call: initializes per-instance cache dicts on self.__dict__. - out1 = m.forward(x, hook=lambda v: v) # callable arg covers callable-key branch - # Second call reuses the existing per-instance cache (try/except hit path). - out2 = m.forward(x) - assert paddle.all(out1 == 2).item() - assert paddle.all(out2 == 2).item() - # The decorator should have stashed cache attributes on the instance. - assert any(name.startswith("_cg_forward_") for name in m.__dict__) - - -def test_block_wise_wrap_custom_key_fn(monkeypatch): - """Custom key_fn path is used to compute the cache key.""" - _set_env(monkeypatch, FD_USE_BLOCK_WISE_CUDA_GRAPH=True) - cuda_graph_op.set_block_wise_capturing(False) - - seen_keys = [] - - def key_fn(x, residual): - k = ("custom", tuple(x.shape) if x is not None else None) - seen_keys.append(k) - return k - - class M(_ToyLayer): - @cuda_graph_op.block_wise_cuda_graph_wrap(inputs=["x"], key_fn=key_fn) - def forward(self, x, residual=None): - return x - - m = M() - m.forward(paddle.ones([3, 3], dtype="float32")) - assert seen_keys and seen_keys[0][0] == "custom" - - -def test_set_block_wise_capturing_toggle(): - """set_block_wise_capturing should mutate the module-level flag.""" - cuda_graph_op.set_block_wise_capturing(True) - assert cuda_graph_op._BLOCK_WISE_CAPTURING is True - cuda_graph_op.set_block_wise_capturing(False) - assert cuda_graph_op._BLOCK_WISE_CAPTURING is False From 21a35d99104d92c73c8a074a31b724c7f249275f Mon Sep 17 00:00:00 2001 From: zhangxiao35 Date: Tue, 9 Jun 2026 11:14:30 +0800 Subject: [PATCH 2/3] Support piecewise CUDAGraph for MTP execution --- .../cudagraph_piecewise_backend.py | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py b/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py index d2d5a6993a6..1b5e73e4d8f 100644 --- a/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py +++ b/fastdeploy/model_executor/graph_optimization/cudagraph_piecewise_backend.py @@ -193,19 +193,14 @@ def run_static_model(self, entry: ConcreteSizeEntry, is_decode: bool = False, ** def __call__(self, **kwargs) -> List[paddle.Tensor] | paddle.Tensor: # Get real shape (total num tokens) - if ( - self.speculative_decoding - and self.real_bsz_to_captured_size - and all(self.real_bsz_to_captured_size.values()) - ): - seq_lens_this_time: paddle.Tensor = kwargs["forward_meta"].seq_lens_this_time - real_bsz = kwargs["forward_meta"].real_bsz - num_running_requests = real_bsz if real_bsz > 0 else int((seq_lens_this_time.flatten() > 0).sum().item()) - num_running_requests = max(1, num_running_requests) - real_shape = self.real_bsz_to_captured_size[num_running_requests] - else: - ids_remove_padding: paddle.Tensor = kwargs["forward_meta"].ids_remove_padding - real_shape = ids_remove_padding.shape[0] + # For both MTP speculative decoding and regular decode, use ids_remove_padding.shape[0] + # directly as the real_shape key into real_shape_to_captured_size. + # In MTP, cudagraph_capture_sizes are already scaled by (num_speculative_tokens+1), + # so ids_remove_padding.shape[0] == capture_size and the lookup is correct. + # This avoids using real_bsz (a concrete Python int) which causes SOT to specialize + # on each distinct batch size, generating multiple code objects and breaking warmup_impl. + ids_remove_padding: paddle.Tensor = kwargs["forward_meta"].ids_remove_padding + real_shape = ids_remove_padding.shape[0] exist_prefill = kwargs["forward_meta"].exist_prefill # Static split graph mode: use Static + CUDAGraph for prefill/mixed phase static_cudagraph_for_prefill = exist_prefill and not self.full_cuda_graph and self.dy2st From 7bee9e2908ef997a89924d3d217be3e6078b02be Mon Sep 17 00:00:00 2001 From: zhangxiao35 Date: Tue, 23 Jun 2026 16:43:31 +0800 Subject: [PATCH 3/3] support PD separate P worker piecewise cudagraph --- fastdeploy/config.py | 8 ++++++- fastdeploy/worker/gpu_model_runner.py | 14 +++++++++++- fastdeploy/worker/gpu_worker.py | 32 ++++++++++++++++++++------- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 6fd70cdd9ee..90cfe621bb5 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -2230,7 +2230,13 @@ def postprocess(self): # Adjustment GraphOptConfig if self.scheduler_config is not None and self.scheduler_config.splitwise_role == "prefill": - self.graph_opt_config.use_cudagraph = self.graph_opt_config.cudagraph_only_prefill + # Piecewise CUDAGraph for prefill worker: if graph_opt_level >= 1 and not full_cuda_graph, + # reuse the mixed piecewise path (capture_model_prefill_and_mixed) for the prefill worker. + # Otherwise fall back to cudagraph_only_prefill flag (legacy path). + if self.graph_opt_config.graph_opt_level >= 1 and not self.graph_opt_config.full_cuda_graph: + self.graph_opt_config.use_cudagraph = True + else: + self.graph_opt_config.use_cudagraph = self.graph_opt_config.cudagraph_only_prefill if self.load_config is not None and self.load_config.dynamic_load_weight is True: self.graph_opt_config.graph_opt_level = 0 logger.info( diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index a6a761f0730..dd7d09d2d8a 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1509,7 +1509,16 @@ def initialize_forward_meta(self, is_dummy_or_profile_run=False): if self.fd_config.parallel_config.enable_chunked_moe: self.forward_meta.max_moe_num_chunk = dist_status.max_moe_num_chunk - only_decode_use_cudagraph = self.use_cudagraph and if_only_decode + # PD prefill workers with piecewise CUDAGraph (graph_opt_level>=1, not full_cuda_graph) only + # capture prefill/mixed graphs (via capture_model_prefill_and_mixed), never decode graphs. + # Exclude such workers from the decode CUDAGraph path to avoid lazy decode capture at runtime. + is_pd_prefill_piecewise = ( + hasattr(self, "graph_opt_config") + and self.fd_config.scheduler_config.splitwise_role == "prefill" + and self.graph_opt_config.graph_opt_level >= 1 + and not self.graph_opt_config.full_cuda_graph + ) + only_decode_use_cudagraph = self.use_cudagraph and if_only_decode and not is_pd_prefill_piecewise # Update config about moe for better performance # TODO(wanglongzhi):Modifying the config at runtime is not appropriate; it needs to be moved to forward_meta. It will be used in MoEMethodBase.apply() @@ -1534,6 +1543,7 @@ def initialize_forward_meta(self, is_dummy_or_profile_run=False): and self.use_cudagraph and self.graph_opt_config.graph_opt_level > 0 and not self.graph_opt_config.full_cuda_graph + and (self.fd_config.scheduler_config.splitwise_role != "prefill" or self.exist_prefill()) ): self.forward_meta.step_use_cudagraph = True @@ -2181,6 +2191,8 @@ def capture_model_prefill_and_mixed(self) -> None: logger.info("Skipping CUDA graph capture. Please check GraphOptimizationConfig") return time_before_capture = time.perf_counter() + if self.fd_config.parallel_config.use_ep: + self.fd_config.model_config.moe_phase.phase = "prefill" capture_sizes = self.cudagraph_capture_sizes_prefill.copy() for capture_size in sorted(capture_sizes, reverse=True): self._dummy_run( diff --git a/fastdeploy/worker/gpu_worker.py b/fastdeploy/worker/gpu_worker.py index a1f75a04e8f..02144432d6d 100644 --- a/fastdeploy/worker/gpu_worker.py +++ b/fastdeploy/worker/gpu_worker.py @@ -223,27 +223,43 @@ def graph_optimize_and_warm_up_model(self) -> None: """ Perform the warm-up and the graph optimization. - Execution modes: + Execution modes (mixed worker): | Mode | Prefill + Mixed | Decode | |-----------------------------------|--------------------------|--------------------------| | Dynamic (graph_opt_level=0) | Dynamic | Dynamic + CUDAGraph | | Static Full Graph (full=True) | Dynamic | Static + CUDAGraph | | Static Split Graph (full=False) | Static + CUDAGraph | Dynamic + CUDAGraph | + + PD disaggregation: + | Role | graph_opt_level>=1, full=False | Otherwise | + |---------|-------------------------------|------------------------------| + | prefill | Piecewise CUDAGraph (prefill) | Dynamic (cudagraph_only_pref)| + | decode | Dynamic + CUDAGraph | Dynamic + CUDAGraph | """ + splitwise_role = self.fd_config.scheduler_config.splitwise_role + is_pd_prefill = splitwise_role == "prefill" + is_pd_decode = splitwise_role == "decode" + use_piecewise = ( + self.fd_config.graph_opt_config.graph_opt_level >= 1 + and not self.fd_config.graph_opt_config.full_cuda_graph + ) + if self.fd_config.graph_opt_config.graph_opt_level >= 1 and not self.model_runner.use_cudagraph: self.model_runner.sot_warmup() if self.fd_config.graph_opt_config.graph_opt_level >= 1: self.model_runner.vision_encoder_compile() - # Static split graph mode: capture CUDAGraph for prefill/mixed phase - if ( - self.fd_config.graph_opt_config.graph_opt_level >= 1 - and not self.fd_config.graph_opt_config.full_cuda_graph - ): + # Piecewise CUDAGraph capture for prefill/mixed phase. + # In PD disaggregation: only the prefill worker (with piecewise enabled) runs this; + # the decode worker never captures prefill/mixed graphs. + if use_piecewise and not is_pd_decode: self.model_runner.capture_model_prefill_and_mixed() - # Capture CUDAGraph for decode phase (all modes) - self.model_runner.capture_model() + # Decode-phase CUDAGraph capture. + # In PD disaggregation: the prefill worker running piecewise CUDAGraph skips this; + # the decode worker always runs this. + if not (is_pd_prefill and use_piecewise): + self.model_runner.capture_model() # Deterministic mode: reset RNG and share_inputs after warmup. # Warmup _dummy_run() calls consume CUDA RNG state and leave stale