Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions custom_ops/gpu_ops/append_attn/template_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"IsDynamicC8"
],
"dispatch_params": {
"GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16, 24],
"GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16, 24],
"HEAD_DIM": [128],
"BLOCK_SIZE": [64],
"CAUSAL": [0, 1],
Expand Down Expand Up @@ -54,7 +54,7 @@
"ENABLE_PREFILL"
],
"dispatch_params": {
"GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16, 24],
"GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16, 24],
"HEAD_DIM": [128],
"BLOCK_SIZE": [64],
"CAUSAL": [0, 1],
Expand Down Expand Up @@ -89,7 +89,7 @@
"ENABLE_PREFILL"
],
"dispatch_params": {
"GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16, 24],
"GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16, 24],
"HEAD_DIM": [64,128,192],
"BLOCK_SIZE": [64],
"CAUSAL": [0, 1],
Expand Down
80 changes: 49 additions & 31 deletions fastdeploy/cache_manager/v1/cache_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import threading
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Dict, List, Optional

Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self, config: "FDConfig", local_rank: int, device_id: int):

# Attention backend
self.attn_backend = None
self.attn_backends = None

@property
def write_policy(self) -> Optional[str]:
Expand Down Expand Up @@ -285,7 +287,9 @@ def initialize_kv_cache(
Returns:
cache_kvs_list: Flat list of allocated tensors in layer/role order.
"""
self.attn_backend = attn_backend
attn_backends = self._normalize_attn_backends(attn_backend)
self.attn_backend = attn_backends[0]
self.attn_backends = attn_backends

kv_cache_quant_type = self._get_kv_cache_quant_type()
cache_dtype = "uint8" if kv_cache_quant_type is not None else self.model_config.dtype
Expand All @@ -295,18 +299,19 @@ def initialize_kv_cache(
f"backend={type(self.attn_backend).__name__}, kv_cache_quant_type={kv_cache_quant_type}"
)

caches = self.attn_backend.create_kv_cache(
num_layers=self._num_layers,
num_blocks=num_gpu_blocks,
cache_dtype=cache_dtype,
kv_cache_quant_type=kv_cache_quant_type,
)

cache_kvs_list: List[Any] = []
for (role, layer_idx), tensor in caches.items():
name = self._format_cache_name(role, layer_idx)
self.cache_kvs_map[name] = tensor
cache_kvs_list.append(tensor)
for layer_idx, layer_attn_backend in enumerate(attn_backends):
caches = layer_attn_backend.create_kv_cache(
num_layers=1,
num_blocks=num_gpu_blocks,
cache_dtype=cache_dtype,
kv_cache_quant_type=kv_cache_quant_type,
layer_offset=layer_idx,
)
for (role, cache_layer_idx), tensor in caches.items():
name = self._format_cache_name(role, cache_layer_idx)
self.cache_kvs_map[name] = tensor
cache_kvs_list.append(tensor)

paddle.device.cuda.empty_cache()
logger.info("kv cache is initialized!")
Expand All @@ -315,7 +320,7 @@ def initialize_kv_cache(
self._transfer_manager.set_cache_kvs_map(self.cache_kvs_map)

# Initialize host cache
self.initialize_host_cache(self.attn_backend)
self.initialize_host_cache(self.attn_backends)

return cache_kvs_list

Expand Down Expand Up @@ -498,6 +503,15 @@ def _bind_to_closest_numa_node(self) -> bool:
logger.warning(f"[CacheController] NUMA binding failed: {e}")
return False

def _normalize_attn_backends(self, attn_backend: Any) -> Sequence[Any]:
if isinstance(attn_backend, Sequence) and not isinstance(attn_backend, (str, bytes)):
if len(attn_backend) != self._num_layers:
raise ValueError(
f"attn_backend length {len(attn_backend)} does not match num_layers {self._num_layers}"
)
return attn_backend
return [attn_backend] * self._num_layers

def initialize_host_cache(
self,
attn_backend: Any,
Expand Down Expand Up @@ -533,35 +547,39 @@ def initialize_host_cache(
cache_dtype = self.cache_config.cache_dtype
cache_item_bytes = self.cache_config.get_cache_bytes(cache_dtype)
num_layers = self._num_layers + self.config.speculative_config.num_extra_cache_layer
attn_backends = self._normalize_attn_backends(attn_backend)

logger.info(
f"[CacheController] Initializing swap space (Host cache) for {num_layers} layers "
f"(num_host_blocks={num_host_blocks}, backend={type(attn_backend).__name__}, "
f"(num_host_blocks={num_host_blocks}, backend={type(attn_backends[0]).__name__}, "
f"kv_cache_quant_type={kv_cache_quant_type})."
)

try:
host_caches = attn_backend.create_host_kv_cache(
num_layers=num_layers,
num_blocks=num_host_blocks,
cache_item_bytes=cache_item_bytes,
kv_cache_quant_type=kv_cache_quant_type,
)
except NotImplementedError as e:
logger.warning(
f"[CacheController] Host kv cache offload not supported by "
f"{type(attn_backend).__name__}: {e}. Skipping swap space setup."
)
return
for layer_idx in range(num_layers):
layer_attn_backend = attn_backends[layer_idx] if layer_idx < len(attn_backends) else attn_backends[-1]
try:
host_caches = layer_attn_backend.create_host_kv_cache(
num_layers=1,
num_blocks=num_host_blocks,
cache_item_bytes=cache_item_bytes,
kv_cache_quant_type=kv_cache_quant_type,
layer_offset=layer_idx,
)
except NotImplementedError as e:
logger.warning(
f"[CacheController] Host kv cache offload not supported by "
f"{type(layer_attn_backend).__name__}: {e}. Skipping swap space setup."
)
return

for (role, layer_idx), ptr in host_caches.items():
name = self._format_cache_name(role, layer_idx)
self.host_cache_kvs_map[name] = ptr
for (role, cache_layer_idx), ptr in host_caches.items():
name = self._format_cache_name(role, cache_layer_idx)
self.host_cache_kvs_map[name] = ptr

logger.info(f"[CacheController] Swap space (Host cache) is ready for {num_layers} layers!")

# Preserve the shape/num_blocks bookkeeping that downstream code may read.
key_cache_shape, value_cache_shape = attn_backend.get_kv_cache_shape(
key_cache_shape, value_cache_shape = attn_backends[0].get_kv_cache_shape(
max_num_blocks=num_host_blocks, kv_cache_quant_type=kv_cache_quant_type
)
self._host_key_cache_shape = [num_host_blocks] + list(key_cache_shape[1:])
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ def __init__(

if not hasattr(self, "head_dim"):
self.head_dim = self.hidden_size // self.num_attention_heads
if not hasattr(self, "v_head_dim"):
self.v_head_dim = self.head_dim

if hasattr(self, "vision_config"):
self.vision_config = PretrainedConfig.from_dict(self.vision_config)
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/model_executor/forward_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class ForwardMeta:

# Attention backend object
attn_backend: AttentionBackend = None
attn_backends: Optional[list[AttentionBackend]] = None
# Forward mode used during attention
forward_mode: ForwardMode = ForwardMode.MIXED
# Attention mask
Expand Down
79 changes: 48 additions & 31 deletions fastdeploy/model_executor/layers/attention/append_attn_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ def __init__(
self.num_heads: int = num_heads
self.group_size: int = self.num_heads // self.kv_num_heads
self.head_dim: int = fd_config.model_config.head_dim
self.v_head_dim: int = getattr(fd_config.model_config, "v_head_dim", self.head_dim)
self.external_norm_rope: bool = True if self.v_head_dim != self.head_dim else False
self.num_layers: int = fd_config.model_config.num_hidden_layers

# head wise sliding window attention
Expand Down Expand Up @@ -290,7 +292,9 @@ def get_kv_cache_shape(
key_cache_shape = [max_num_blocks, self.kv_num_heads, self.block_size, self.head_dim]
if kv_cache_quant_type is not None and kv_cache_quant_type == "int4_zp":
key_cache_shape[-1] = self.head_dim // 2
value_cache_shape = key_cache_shape
value_cache_shape = [max_num_blocks, self.kv_num_heads, self.block_size, self.v_head_dim]
if kv_cache_quant_type is not None and kv_cache_quant_type == "int4_zp":
value_cache_shape[-1] = self.v_head_dim // 2
return key_cache_shape, value_cache_shape

def forward_mixed(
Expand All @@ -308,33 +312,6 @@ def forward_mixed(
forward_mixed
"""

cache_k = forward_meta.caches[2 * layer.layer_id]
cache_v = forward_meta.caches[2 * layer.layer_id + 1]

from fastdeploy.model_executor.ops.triton_ops import do_rope, write_cache

if getattr(layer, "only_do_attn", False):
do_rope(
qkv,
forward_meta.rotary_embs[0],
forward_meta.rotary_embs[1],
forward_meta.cu_seqlens_q,
forward_meta.seq_lens_decoder,
forward_meta.batch_id_per_token,
cache_k,
cache_v,
)

write_cache(
qkv,
cache_k,
cache_v,
forward_meta.cu_seqlens_q,
forward_meta.seq_lens_decoder,
forward_meta.batch_id_per_token,
forward_meta.block_tables,
)

metadata = self.attention_metadata

# - PaddleFormers fallback: rope_already_applied=True -> use identity RoPE (cos=1, sin=0)
Expand Down Expand Up @@ -376,7 +353,8 @@ def forward_mixed(
cache_k_scales = getattr(layer, "cache_k_scale", None)
cache_v_scales = getattr(layer, "cache_v_scale", None)

if layer.layer_id == 0:
self.num_key_value_heads_list = getattr(self.fd_config.model_config, "num_key_value_heads_list", None)
if layer.layer_id == 0 or self.num_key_value_heads_list is not None:
get_block_shape_and_split_kv_block(
forward_meta.seq_lens_encoder,
forward_meta.seq_lens_decoder,
Expand All @@ -399,6 +377,45 @@ def forward_mixed(
self.block_size,
)

from fastdeploy.model_executor.ops.triton_ops import (
do_rope,
qk_rmsnorm_fused,
write_cache,
)

if self.external_norm_rope:
if q_norm_weight is not None and k_norm_weight is not None:
qk_rmsnorm_fused(
qkv,
q_norm_weight,
k_norm_weight,
getattr(layer, "rms_norm_eps", 1e-6),
layer.num_heads * layer.head_dim,
layer.kv_num_heads * layer.head_dim,
cache_k.shape[-1],
cache_v.shape[-1],
)
do_rope(
qkv,
forward_meta.rotary_embs[0],
forward_meta.rotary_embs[1],
forward_meta.cu_seqlens_q,
forward_meta.seq_lens_decoder,
forward_meta.batch_id_per_token,
cache_k,
cache_v,
)

write_cache(
qkv,
cache_k,
cache_v,
forward_meta.cu_seqlens_q,
forward_meta.seq_lens_decoder,
forward_meta.batch_id_per_token,
forward_meta.block_tables,
)

if self.use_output:
quant_max_bound = getattr(layer, "quant_max_bound", 0.0)
cache_quant_type = getattr(layer, "cache_quant_type_str", "none")
Expand Down Expand Up @@ -547,7 +564,7 @@ def forward_mixed(
sliding_window,

This comment was marked as outdated.

self.sink_size,
self.head_wise_full_hidden if self.head_wise_swa_ratio > 0 else 0,
getattr(layer, "only_do_attn", False),
self.external_norm_rope, # if True is means only_do_attn
)
return res

Expand All @@ -569,7 +586,7 @@ def forward_unitest(
cache_k = forward_meta.caches[2 * layer.layer_id]
cache_v = forward_meta.caches[2 * layer.layer_id + 1]

head_dim_q = 192
head_dim_q = 128
head_dim_v = 128

forward_meta.caches[2 * layer.layer_id] = paddle.randn(cache_k.shape[:3] + [head_dim_q])
Expand Down
15 changes: 12 additions & 3 deletions fastdeploy/model_executor/layers/attention/attention.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,16 @@ def __init__(
fd_config.model_config.num_attention_heads // fd_config.parallel_config.tensor_parallel_size
)
self.head_dim: int = fd_config.model_config.head_dim
self.layer_id: int = layer_id
num_key_value_heads = getattr(fd_config.model_config, "num_key_value_heads_list", None)
if num_key_value_heads is None:
num_key_value_heads = fd_config.model_config.num_key_value_heads
else:
num_key_value_heads = num_key_value_heads[self.layer_id]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug num_key_value_heads_list 只改变了 Attention/backend 的 KV head 数,但 QKV projection/loader 仍按全局 model_config.num_key_value_heads 生成 Q/K/V 宽度。

在这个分支里第 i 层 Attention.kv_num_heads 会使用 num_key_value_heads_list[i];但 QKVParallelLinear.__init__ 仍使用 fd_config.model_config.num_key_value_heads,现有调用点(如 qwen3.py:66gpt_oss.py:59)也没有传 per-layer kv_num_heads。当某层 list 值和全局值不同,qkv 输出宽度、权重切片和该层 cache/backend 期望的 KV head 数不一致,后续 do_rope / write_cache / append_attention 的 Q/K/V 切分会错。

建议把 layer_id 或解析后的 kv_num_heads 传入对应的 QKVParallelLinear / QKVGateParallelLinear,并让 output_size、loader offset 和 Attention.kv_num_heads 使用同一层的 KV head 数;同时补一个 list 中至少两个不同 KV head 的加载和 forward 单测。

self.kv_num_heads: int = max(
1,
fd_config.model_config.num_key_value_heads // fd_config.parallel_config.tensor_parallel_size,
int(num_key_value_heads) // fd_config.parallel_config.tensor_parallel_size,
)
self.layer_id: int = layer_id
self.v_head_dim: int = v_head_dim if v_head_dim > 0 else self.head_dim
self.rope_type: str = rope_type
self.qk_head_dim: int = self.head_dim
Expand Down Expand Up @@ -277,7 +282,11 @@ def forward(
if forward_meta.layer_done_counter is not None:
forward_meta.layer_done_counter.wait_for_layer(self.layer_id)

return forward_meta.attn_backend.forward(
attn_backend = forward_meta.attn_backend
if forward_meta.attn_backends is not None:
attn_backend = forward_meta.attn_backends[self.layer_id]

return attn_backend.forward(
q,
k,
v,
Expand Down
Loading
Loading