@@ -19,6 +19,7 @@ package llmdinferencesim
1919
2020import (
2121 "context"
22+ "sync"
2223 "time"
2324
2425 "github.com/go-logr/logr"
@@ -49,22 +50,31 @@ func (w *worker) waitForRequests() {
4950 w .logger .V (4 ).Info ("worker done" , "id" , w .id )
5051 return
5152 case req := <- w .reqChan :
52- w .processor .processRequest (req )
53+ w .processor .processRequest (req , nil )
5354 w .finishedChan <- & requestCompleted {worker : w , model : req .CompletionReq .GetModel ()}
5455 }
56+
5557 }
5658}
5759
5860type requestProcessor interface {
59- processRequest (reqCtx * openaiserverapi.CompletionReqCtx )
61+ processRequest (reqCtx * openaiserverapi.CompletionReqCtx , wg * sync. WaitGroup )
6062}
6163
62- func (s * VllmSimulator ) processRequest (reqCtx * openaiserverapi.CompletionReqCtx ) {
63- start := time .Now ()
64- defer func () {
65- common .WriteToChannel (s .metrics .reqInferenceTimeChan , time .Since (start ).Seconds (), s .logger , "metrics.reqInferenceTimeChan" )
66- }()
64+ func (s * VllmSimulator ) processRequest (reqCtx * openaiserverapi.CompletionReqCtx , _ * sync.WaitGroup ) {
65+ startTime := time .Now ()
66+ wg := sync.WaitGroup {}
67+ wg .Add (1 )
68+
69+ go s .processRequestAsync (reqCtx , & wg )
70+
71+ wg .Wait ()
72+ // calculate inference time and finish e2e latency calculation only when sure that request processing was finished for streaming requests too
73+ common .WriteToChannel (s .metrics .e2eReqLatencyChan , time .Since (reqCtx .StartProcessing ).Seconds (), s .logger , "metrics.e2eReqLatencyChan" )
74+ common .WriteToChannel (s .metrics .reqInferenceTimeChan , time .Since (startTime ).Seconds (), s .logger , "metrics.reqInferenceTimeChan" )
75+ }
6776
77+ func (s * VllmSimulator ) processRequestAsync (reqCtx * openaiserverapi.CompletionReqCtx , wg * sync.WaitGroup ) {
6878 req := reqCtx .CompletionReq
6979 model := req .GetModel ()
7080 displayModel := s .getDisplayedModelName (model )
@@ -138,14 +148,15 @@ func (s *VllmSimulator) processRequest(reqCtx *openaiserverapi.CompletionReqCtx)
138148 // Logprobs configuration
139149 logprobs : req .GetLogprobs (),
140150 },
141- responseTokens , toolCalls , finishReason , usageDataToSend ,
151+ responseTokens , toolCalls , finishReason , usageDataToSend , wg ,
142152 )
143153 } else {
144154 if req .IsDoRemoteDecode () {
145155 // in case this is prefill pod processing, return special finish reason
146156 finishReason = dataset .RemoteDecodeFinishReason
147157 }
148158 s .sendResponse (reqCtx , responseTokens , toolCalls , displayModel , finishReason , & usageData )
159+ wg .Done ()
149160 }
150161
151162 common .WriteToChannel (s .metrics .requestSuccessChan ,
0 commit comments