Skip to content

Commit ceedfeb

Browse files
committed
add trace log
1 parent ec7746b commit ceedfeb

File tree

15 files changed

+820
-7
lines changed

15 files changed

+820
-7
lines changed

fastdeploy/engine/common_engine.py

Lines changed: 135 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
from fastdeploy.plugins.token_processor import load_token_processor_plugins
4848
from fastdeploy.splitwise.internal_adapter_utils import InternalAdapter
4949
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
50-
from fastdeploy.utils import EngineError, envs, get_logger, llm_logger
50+
from fastdeploy.utils import EngineError, envs, get_logger, llm_logger, trace_logger
5151

5252
try:
5353
TokenProcessor = load_token_processor_plugins()
@@ -384,7 +384,17 @@ def insert_tasks(self, tasks, current_id=-1, allocated=False):
384384

385385
for item in tasks:
386386
item.schedule_start_time = time.time()
387-
387+
trace_logger.info(
388+
"resource allocate start",
389+
extra={
390+
"attributes": {
391+
"request_id": f"{item.request_id}",
392+
"user_id": f"{getattr(item, 'user', '')}",
393+
"event": "RESOURCE_ALLOCATE_START",
394+
"stage": "SCHEDULE",
395+
}
396+
},
397+
)
388398
available_batch = np.sum(self.resource_manager.stop_flags)
389399
if len(tasks) > available_batch:
390400
self.llm_logger.error(f"Inserting batch:{len(tasks)} exceeds the available batch:{available_batch}.")
@@ -418,6 +428,39 @@ def insert_tasks(self, tasks, current_id=-1, allocated=False):
418428
self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
419429
for task in tasks:
420430
task.inference_start_time = time.time()
431+
trace_logger.info(
432+
"resource allocate end",
433+
extra={
434+
"attributes": {
435+
"request_id": f"{task.request_id}",
436+
"user_id": f"{getattr(task, 'user', '')}",
437+
"event": "RESOURCE_ALLOCATE_END",
438+
"stage": "SCHEDULE",
439+
}
440+
},
441+
)
442+
trace_logger.info(
443+
"request schedule end",
444+
extra={
445+
"attributes": {
446+
"request_id": f"{task.request_id}",
447+
"user_id": f"{getattr(task, 'user', '')}",
448+
"event": "REQUEST_SCHEDULE_END",
449+
"stage": "SCHEDULE",
450+
}
451+
},
452+
)
453+
trace_logger.info(
454+
"request inference start",
455+
extra={
456+
"attributes": {
457+
"request_id": f"{task.request_id}",
458+
"user_id": f"{getattr(task, 'user', '')}",
459+
"event": "INFERENCE_START",
460+
"stage": "PREFILL",
461+
}
462+
},
463+
)
421464
if not is_prefill:
422465
if not self.cfg.model_config.enable_mm:
423466
self.update_requests_chunk_size(tasks)
@@ -611,7 +654,18 @@ def _insert_task_to_worker(self):
611654
max_num_batched_tokens=self.cfg.scheduler_config.max_num_batched_tokens,
612655
batch=num_prefill_batch,
613656
)
614-
657+
for task in tasks:
658+
trace_logger.info(
659+
"request queue end",
660+
extra={
661+
"attributes": {
662+
"request_id": f"{task.request_id}",
663+
"user_id": f"{getattr(task, 'user', '')}",
664+
"event": "REQUEST_QUEUE_END",
665+
"stage": "SCHEDULE",
666+
}
667+
},
668+
)
615669
if len(tasks) == 0:
616670
time.sleep(0.001)
617671
continue
@@ -702,6 +756,18 @@ def _fetch_request():
702756
time.sleep(0.001)
703757
# Fetch requests and add them to the scheduling queue
704758
if tasks:
759+
for task in tasks:
760+
trace_logger.info(
761+
"resource allocate start",
762+
extra={
763+
"attributes": {
764+
"request_id": f"{task.request_id}",
765+
"user_id": f"{getattr(task, 'user', '')}",
766+
"event": "RESOURCE_ALLOCATE_START",
767+
"stage": "SCHEDULE",
768+
}
769+
},
770+
)
705771
if self.cfg.scheduler_config.splitwise_role == "prefill":
706772
self.resource_manager.add_request_in_p(tasks)
707773
else:
@@ -756,6 +822,39 @@ def _fetch_request():
756822
]
757823
)
758824
self.resource_manager.get_real_bsz()
825+
for task in tasks:
826+
trace_logger.info(
827+
"resource allocate end",
828+
extra={
829+
"attributes": {
830+
"request_id": f"{task.request_id}",
831+
"user_id": f"{getattr(task, 'user', '')}",
832+
"event": "RESOURCE_ALLOCATE_END",
833+
"stage": "SCHEDULE",
834+
}
835+
},
836+
)
837+
trace_logger.info(
838+
"request schedule end",
839+
extra={
840+
"attributes": {
841+
"request_id": f"{task.request_id}",
842+
"user_id": f"{getattr(task, 'user', '')}",
843+
"event": "REQUEST_SCHEDULE_END",
844+
"stage": "SCHEDULE",
845+
}
846+
},
847+
)
848+
trace_logger.info(
849+
"request inference start",
850+
extra={
851+
"attributes": {
852+
"request_id": f"{task.request_id}",
853+
"user_id": f"{getattr(task, 'user', '')}",
854+
"event": "INFERENCE_START",
855+
}
856+
},
857+
)
759858
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
760859
else:
761860
time.sleep(0.005)
@@ -813,6 +912,39 @@ def _insert_zmq_task_to_scheduler(self):
813912
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
814913
main_process_metrics.requests_number.inc()
815914
self.llm_logger.debug(f"Receive request: {request}")
915+
trace_logger.info(
916+
"preprocess end",
917+
extra={
918+
"attributes": {
919+
"request_id": f"{data['request_id']}",
920+
"user_id": f"{data.get('user', '')}",
921+
"event": "PREPROCESSING_END",
922+
"stage": "PREPROCESSING",
923+
}
924+
},
925+
)
926+
trace_logger.info(
927+
"request schedule start",
928+
extra={
929+
"attributes": {
930+
"request_id": f"{data['request_id']}",
931+
"user_id": f"{data.get('user', '')}",
932+
"event": "REQUEST_SCHEDULE_START",
933+
"stage": "SCHEDULE",
934+
}
935+
},
936+
)
937+
trace_logger.info(
938+
"request queue start",
939+
extra={
940+
"attributes": {
941+
"request_id": f"{data['request_id']}",
942+
"user_id": f"{data.get('user', '')}",
943+
"event": "REQUEST_QUEUE_START",
944+
"stage": "SCHEDULE",
945+
}
946+
},
947+
)
816948
except Exception as e:
817949
self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}")
818950
err_msg = str(e)

fastdeploy/entrypoints/engine_client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
ParameterError,
4444
StatefulSemaphore,
4545
api_server_logger,
46+
trace_logger,
4647
)
4748

4849

@@ -185,6 +186,17 @@ async def add_requests(self, task):
185186
"""
186187

187188
task["preprocess_start_time"] = time.time()
189+
trace_logger.info(
190+
"preprocess is started",
191+
extra={
192+
"attributes": {
193+
"request_id": f"{task['request_id']}",
194+
"user_id": f"{task.get('user', '')}",
195+
"event": "PREPROCESSING_START",
196+
"stage": "PREPROCESSING",
197+
}
198+
},
199+
)
188200
try:
189201
chat_template_kwargs = task.get("chat_template_kwargs") or {}
190202
chat_template_kwargs.update({"chat_template": task.get("chat_template")})

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
ParameterError,
4747
api_server_logger,
4848
get_host_ip,
49+
trace_logger,
4950
)
5051
from fastdeploy.worker.output import LogprobsLists
5152

@@ -445,6 +446,17 @@ async def chat_completion_stream_generator(
445446
finally:
446447
await self.engine_client.connection_manager.cleanup_request(request_id)
447448
self.engine_client.semaphore.release()
449+
trace_logger.info(
450+
"request end",
451+
extra={
452+
"attributes": {
453+
"request_id": f"{request_id}",
454+
"user_id": f"{getattr(request, 'user', '')}",
455+
"event": "POSTPROCESSING_END",
456+
"stage": "POSTPROCESSING",
457+
}
458+
},
459+
)
448460
api_server_logger.info(f"release {request_id} {self.engine_client.semaphore.status()}")
449461
yield "data: [DONE]\n\n"
450462

@@ -598,6 +610,17 @@ async def chat_completion_full_generator(
598610
choices=choices,
599611
usage=usage,
600612
)
613+
trace_logger.info(
614+
"request end",
615+
extra={
616+
"attributes": {
617+
"request_id": f"{request_id}",
618+
"user_id": f"{getattr(request, 'user', '')}",
619+
"event": "POSTPROCESSING_END",
620+
"stage": "POSTPROCESSING",
621+
}
622+
},
623+
)
601624
api_server_logger.info(f"Chat response: {res.model_dump_json()}")
602625
return res
603626

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
ParameterError,
4343
api_server_logger,
4444
get_host_ip,
45+
trace_logger,
4546
)
4647
from fastdeploy.worker.output import LogprobsLists
4748

@@ -312,6 +313,17 @@ async def completion_full_generator(
312313
except Exception as e:
313314
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
314315
finally:
316+
trace_logger.info(
317+
"request end",
318+
extra={
319+
"attributes": {
320+
"request_id": f"{request_id}",
321+
"user_id": f"{getattr(request, 'user', '')}",
322+
"event": "POSTPROCESSING_END",
323+
"stage": "POSTPROCESSING",
324+
}
325+
},
326+
)
315327
self.engine_client.semaphore.release()
316328
if dealer is not None:
317329
await self.engine_client.connection_manager.cleanup_request(request_id)
@@ -547,6 +559,17 @@ async def completion_stream_generator(
547559
api_server_logger.error(f"Error in completion_stream_generator: {e}, {str(traceback.format_exc())}")
548560
yield f"data: {ErrorResponse(error=ErrorInfo(message=str(e), code='400', type=ErrorType.INTERNAL_ERROR)).model_dump_json(exclude_unset=True)}\n\n"
549561
finally:
562+
trace_logger.info(
563+
"request end",
564+
extra={
565+
"attributes": {
566+
"request_id": f"{request_id}",
567+
"user_id": f"{getattr(request, 'user', '')}",
568+
"event": "POSTPROCESSING_END",
569+
"stage": "POSTPROCESSING",
570+
}
571+
},
572+
)
550573
del request
551574
if dealer is not None:
552575
await self.engine_client.connection_manager.cleanup_request(request_id)

0 commit comments

Comments
 (0)