Skip to content

Commit 35d5222

Browse files
author
K11OntheBoat
committed
support Qwen-MoE PD/EP Prefill luanch
1 parent fa8a315 commit 35d5222

File tree

7 files changed

+78
-52
lines changed

7 files changed

+78
-52
lines changed

fastdeploy/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,6 @@ def __init__(
212212
# set attribute from pretrained_config
213213
for key, value in pretrained_config.items():
214214
setattr(self, key, value)
215-
216215
# we need set default value when not exist
217216
for key, value in PRETRAINED_INIT_CONFIGURATION.items():
218217
if not hasattr(self, key):
@@ -300,6 +299,9 @@ def override_name_from_config(self):
300299
if not hasattr(self, "mla_use_absorb"):
301300
self.mla_use_absorb = False
302301

302+
if hasattr(self, "num_experts") and getattr(self, "moe_num_experts") is None:
303+
self.moe_num_experts = self.num_experts
304+
303305
def read_from_env(self):
304306
"""
305307
Read configuration information from environment variables and update the object's attributes.

fastdeploy/model_executor/layers/moe/moe.py

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -219,45 +219,52 @@ def weight_loader(self, param, loaded_weight, expert_id, shard_id: Optional[str]
219219
# MoE experts has been fused in disk
220220
self._load_fused_experts_weight(param, loaded_weight)
221221
return
222-
if hasattr(param, "SHARD_ID_TO_SHARDED_DIM"):
223-
SHARD_ID_TO_SHARDED_DIM = param.SHARD_ID_TO_SHARDED_DIM
224-
elif current_platform.is_cuda():
225-
SHARD_ID_TO_SHARDED_DIM = {"gate": 1, "down": 0, "up": 1}
226-
else:
227-
SHARD_ID_TO_SHARDED_DIM = {"gate": 0, "down": 1, "up": 0}
228-
229-
if not param._is_initialized():
230-
param.initialize()
231-
232-
if shard_id is None:
233-
# 1.gate up fused in disk
234-
weight_need_transpose = getattr(param, "weight_need_transpose", False)
235-
output_size = param[expert_id - self.expert_id_offset].shape[SHARD_ID_TO_SHARDED_DIM["gate"]]
236-
per_rank = output_size // 2
237-
start = self.tp_rank * per_rank
238-
loaded_weight_shard_gate = slice_fn(
239-
loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["gate"], start, start + per_rank
240-
)
241-
self._load_gate_up_weight(
242-
param, expert_id, loaded_weight_shard_gate, "gate", SHARD_ID_TO_SHARDED_DIM["gate"], is_sharded=True
243-
)
244-
start_up = output_size // 2 * self.tp_size + self.tp_rank * per_rank
245-
loaded_weight_shard_up = slice_fn(
246-
loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["up"], start_up, start_up + per_rank
247-
)
248-
self._load_gate_up_weight(
249-
param, expert_id, loaded_weight_shard_up, "up", SHARD_ID_TO_SHARDED_DIM["up"], is_sharded=True
250-
)
251-
else:
252-
# 2.gate up splited in disk
253-
assert shard_id in ["gate", "down", "up"]
254-
self._load_expert_weight(
255-
param=param,
256-
expert_id=expert_id,
257-
loaded_weight=loaded_weight,
258-
shard_id=shard_id,
259-
shard_dim=SHARD_ID_TO_SHARDED_DIM[shard_id],
260-
)
222+
223+
if expert_id - self.expert_id_offset >= 0 and expert_id - self.expert_id_offset < self.num_local_experts:
224+
if hasattr(param, "SHARD_ID_TO_SHARDED_DIM"):
225+
SHARD_ID_TO_SHARDED_DIM = param.SHARD_ID_TO_SHARDED_DIM
226+
elif current_platform.is_cuda():
227+
SHARD_ID_TO_SHARDED_DIM = {"gate": 1, "down": 0, "up": 1}
228+
else:
229+
SHARD_ID_TO_SHARDED_DIM = {"gate": 0, "down": 1, "up": 0}
230+
231+
if not param._is_initialized():
232+
param.initialize()
233+
234+
if shard_id is None:
235+
# 1.gate up fused in disk
236+
weight_need_transpose = getattr(param, "weight_need_transpose", False)
237+
output_size = param[expert_id - self.expert_id_offset].shape[SHARD_ID_TO_SHARDED_DIM["gate"]]
238+
per_rank = output_size // 2
239+
start = self.tp_rank * per_rank
240+
loaded_weight_shard_gate = slice_fn(
241+
loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["gate"], start, start + per_rank
242+
)
243+
self._load_gate_up_weight(
244+
param,
245+
expert_id,
246+
loaded_weight_shard_gate,
247+
"gate",
248+
SHARD_ID_TO_SHARDED_DIM["gate"],
249+
is_sharded=True,
250+
)
251+
start_up = output_size // 2 * self.tp_size + self.tp_rank * per_rank
252+
loaded_weight_shard_up = slice_fn(
253+
loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["up"], start_up, start_up + per_rank
254+
)
255+
self._load_gate_up_weight(
256+
param, expert_id, loaded_weight_shard_up, "up", SHARD_ID_TO_SHARDED_DIM["up"], is_sharded=True
257+
)
258+
else:
259+
# 2.gate up splited in disk
260+
assert shard_id in ["gate", "down", "up"]
261+
self._load_expert_weight(
262+
param=param,
263+
expert_id=expert_id,
264+
loaded_weight=loaded_weight,
265+
shard_id=shard_id,
266+
shard_dim=SHARD_ID_TO_SHARDED_DIM[shard_id],
267+
)
261268

262269
def _load_gate_up_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None, is_sharded=False):
263270
weight_need_transpose = getattr(param, "weight_need_transpose", False)

fastdeploy/model_executor/models/qwen3moe.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,20 @@ def compute_logits(self, hidden_states: paddle.Tensor):
451451

452452
return logits
453453

454+
def empty_input_forward(self):
455+
"""
456+
empty_input_forward
457+
"""
458+
fake_hidden_states = paddle.empty(
459+
shape=[1, self.fd_config.model_config.hidden_size],
460+
dtype=paddle.get_default_dtype(),
461+
)
462+
for i in range(
463+
self.fd_config.model_config.moe_layer_start_index,
464+
self.fd_config.model_config.num_hidden_layers,
465+
):
466+
self.model.layers[i].mlp.experts(fake_hidden_states, self.model.layers[i].mlp.gate)
467+
454468
def forward(
455469
self,
456470
ids_remove_padding: paddle.Tensor,

fastdeploy/model_executor/utils.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ def fn(model_sublayer_name: str, param=None):
145145
quant_method = getattr(model_sublayer, "quant_method", None)
146146
if not hasattr(quant_method, "process_weights_after_loading"):
147147
return
148+
if param is not None and hasattr(param, "tensor_track") and param.tensor_track is None:
149+
return
148150
if param is not None and hasattr(param, "tensor_track") and not param.tensor_track.is_fully_copied():
149151
return
150152
quant_method.process_weights_after_loading(model_sublayer)
@@ -269,10 +271,6 @@ def _err_msg(msg: str) -> str:
269271
_err_msg("v1 loader currently does not support pre-sliced weights")
270272
return False
271273

272-
if fd_config.parallel_config.use_ep:
273-
_err_msg("v1 loader currently does not support expert parallelism")
274-
return False
275-
276274
if envs.FD_MOE_BACKEND.lower() == "marlin":
277275
_err_msg("v1 loader currently does not support marlin backend")
278276
return False

fastdeploy/stop.sh

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
11

2-
fastdeploy_inferernce_pids=$(ps auxww | grep "fastdeploy" | grep -v grep | awk '{print $2}')
3-
echo $fastdeploy_inferernce_pids
4-
for in_pid in ${fastdeploy_inferernce_pids[@]}; do
5-
kill -9 ${in_pid}
6-
done
7-
echo 'end fastDeploy inference pids'
82

93
api_server_pids=$(ps auxww | grep "api_server" | grep -v grep | awk '{print $2}')
104
echo 'end api server pids:'
@@ -18,3 +12,11 @@ for pid in $api_server_pids; do
1812
done
1913
echo 'end uvicorn multi workers'
2014
done
15+
16+
17+
fastdeploy_inferernce_pids=$(ps auxww | grep "fastdeploy" | grep -v grep | awk '{print $2}')
18+
echo $fastdeploy_inferernce_pids
19+
for in_pid in ${fastdeploy_inferernce_pids[@]}; do
20+
kill -9 ${in_pid}
21+
done
22+
echo 'end fastDeploy inference pids'

fastdeploy/worker/gpu_model_runner.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2387,7 +2387,11 @@ def profile_run(self) -> None:
23872387

23882388
# 2. Dummy run
23892389
self._dummy_run(
2390-
num_tokens=self.scheduler_config.max_num_batched_tokens,
2390+
num_tokens=(
2391+
self.scheduler_config.max_num_seqs
2392+
if self.scheduler_config.splitwise_role == "decode"
2393+
else self.scheduler_config.max_num_batched_tokens
2394+
),
23912395
batch_size=self.scheduler_config.max_num_seqs,
23922396
)
23932397

fastdeploy/worker/worker_process.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,6 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
853853
num_experts = model_config.moe_num_experts[0]
854854
else:
855855
num_experts = model_config.moe_num_experts
856-
857856
num_experts_per_rank = num_experts // parallel_config.expert_parallel_size
858857
num_experts_start_offset = expert_parallel_rank * num_experts_per_rank
859858
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8

0 commit comments

Comments
 (0)