@@ -33,15 +33,15 @@ func (group *Group) shouldSpill() bool {
3333 len (group .ctr .result1 .AggList ) > 0 &&
3434 len (group .ctr .result1 .ToPopped ) > 0
3535
36- logutil .Info ("shouldSpill" ,
36+ logutil .Info ("[SPILL] shouldSpill" ,
3737 zap .Any ("threshold" , group .SpillThreshold ),
3838 zap .Any ("current usage" , group .ctr .currentMemUsage ),
3939 zap .Any ("agg list len" , len (group .ctr .result1 .AggList )),
4040 zap .Any ("to popped len" , len (group .ctr .result1 .ToPopped )),
4141 )
4242
4343 if shouldSpill {
44- logutil .Info ("Group operator triggering spill" ,
44+ logutil .Info ("[SPILL] Group operator triggering spill" ,
4545 zap .Int64 ("current_memory_usage" , group .ctr .currentMemUsage ),
4646 zap .Int64 ("spill_threshold" , group .SpillThreshold ),
4747 zap .Int ("agg_count" , len (group .ctr .result1 .AggList )),
@@ -74,7 +74,7 @@ func (group *Group) updateMemoryUsage(proc *process.Process) {
7474 group .ctr .currentMemUsage = usage
7575
7676 if usage > previousUsage && usage > group .SpillThreshold / 2 {
77- logutil .Info ("Group operator memory usage update" ,
77+ logutil .Info ("[SPILL] Group operator memory usage update" ,
7878 zap .Int64 ("previous_usage" , previousUsage ),
7979 zap .Int64 ("current_usage" , usage ),
8080 zap .Int64 ("spill_threshold" , group .SpillThreshold ))
@@ -83,11 +83,11 @@ func (group *Group) updateMemoryUsage(proc *process.Process) {
8383
8484func (group * Group ) spillPartialResults (proc * process.Process ) error {
8585 if len (group .ctr .result1 .AggList ) == 0 || len (group .ctr .result1 .ToPopped ) == 0 {
86- logutil .Info ("Group operator spill called but no data to spill" )
86+ logutil .Info ("[SPILL] Group operator spill called but no data to spill" )
8787 return nil
8888 }
8989
90- logutil .Info ("Group operator starting spill operation" ,
90+ logutil .Info ("[SPILL] Group operator starting spill operation" ,
9191 zap .Int64 ("memory_usage" , group .ctr .currentMemUsage ),
9292 zap .Int64 ("spill_threshold" , group .SpillThreshold ),
9393 zap .Int ("agg_count" , len (group .ctr .result1 .AggList )))
@@ -97,7 +97,7 @@ func (group *Group) spillPartialResults(proc *process.Process) error {
9797 if agg != nil {
9898 marshaledData , err := aggexec .MarshalAggFuncExec (agg )
9999 if err != nil {
100- logutil .Error ("Group operator failed to marshal aggregator" ,
100+ logutil .Error ("[SPILL] Group operator failed to marshal aggregator" ,
101101 zap .Int ("agg_index" , i ), zap .Error (err ))
102102 return err
103103 }
@@ -113,7 +113,7 @@ func (group *Group) spillPartialResults(proc *process.Process) error {
113113 }
114114
115115 if totalGroups == 0 {
116- logutil .Info ("Group operator spill found no groups to spill" )
116+ logutil .Info ("[SPILL] Group operator spill found no groups to spill" )
117117 for _ , agg := range group .ctr .result1 .AggList {
118118 if agg != nil {
119119 agg .Free ()
@@ -144,7 +144,7 @@ func (group *Group) spillPartialResults(proc *process.Process) error {
144144 for i , vec := range bat .Vecs {
145145 if i < len (groupVecs ) && groupVecs [i ] != nil && vec != nil {
146146 if err := groupVecs [i ].UnionBatch (vec , 0 , vec .Length (), nil , proc .Mp ()); err != nil {
147- logutil .Error ("Group operator failed to union batch during spill" ,
147+ logutil .Error ("[SPILL] Group operator failed to union batch during spill" ,
148148 zap .Int ("vec_index" , i ), zap .Error (err ))
149149 for j := range groupVecs {
150150 if groupVecs [j ] != nil {
@@ -168,12 +168,12 @@ func (group *Group) spillPartialResults(proc *process.Process) error {
168168
169169 spillID , err := group .SpillManager .Spill (spillData )
170170 if err != nil {
171- logutil .Error ("Group operator failed to spill data" , zap .Error (err ))
171+ logutil .Error ("[SPILL] Group operator failed to spill data" , zap .Error (err ))
172172 spillData .Free (proc .Mp ())
173173 return err
174174 }
175175
176- logutil .Info ("Group operator successfully spilled data" ,
176+ logutil .Info ("[SPILL] Group operator successfully spilled data" ,
177177 zap .String ("spill_id" , string (spillID )),
178178 zap .Int ("total_groups" , totalGroups ),
179179 zap .Int64 ("estimated_size" , spillData .EstimateSize ()))
@@ -200,7 +200,7 @@ func (group *Group) spillPartialResults(proc *process.Process) error {
200200 }
201201
202202 group .ctr .currentMemUsage = 0
203- logutil .Info ("Group operator completed spill cleanup" ,
203+ logutil .Info ("[SPILL] Group operator completed spill cleanup" ,
204204 zap .Int ("spilled_states_count" , len (group .ctr .spilledStates )))
205205 return nil
206206}
@@ -210,53 +210,53 @@ func (group *Group) mergeSpilledResults(proc *process.Process) error {
210210 return nil
211211 }
212212
213- logutil .Info ("Group operator starting merge of spilled results" ,
213+ logutil .Info ("[SPILL] Group operator starting merge of spilled results" ,
214214 zap .Int ("spilled_states_count" , len (group .ctr .spilledStates )))
215215
216216 for i , spillID := range group .ctr .spilledStates {
217- logutil .Info ("Group operator merging spilled state" ,
217+ logutil .Info ("[SPILL] Group operator merging spilled state" ,
218218 zap .Int ("state_index" , i ),
219219 zap .String ("spill_id" , string (spillID )))
220220
221221 spillData , err := group .SpillManager .Retrieve (spillID , proc .Mp ())
222222 if err != nil {
223- logutil .Error ("Group operator failed to retrieve spilled data" ,
223+ logutil .Error ("[SPILL] Group operator failed to retrieve spilled data" ,
224224 zap .String ("spill_id" , string (spillID )), zap .Error (err ))
225225 return err
226226 }
227227
228228 spillState , ok := spillData .(* SpillableAggState )
229229 if ! ok {
230- logutil .Error ("Group operator retrieved invalid spilled data type" ,
230+ logutil .Error ("[SPILL] Group operator retrieved invalid spilled data type" ,
231231 zap .String ("spill_id" , string (spillID )))
232232 spillData .Free (proc .Mp ())
233233 panic (fmt .Sprintf ("invalid spilled data type" ))
234234 }
235235
236- logutil .Info ("Group operator retrieved spilled state" ,
236+ logutil .Info ("[SPILL] Group operator retrieved spilled state" ,
237237 zap .String ("spill_id" , string (spillID )),
238238 zap .Int ("group_count" , spillState .GroupCount ),
239239 zap .Int64 ("estimated_size" , spillState .EstimateSize ()))
240240
241241 if err = group .restoreAndMergeSpilledAggregators (proc , spillState ); err != nil {
242- logutil .Error ("Group operator failed to restore and merge spilled aggregators" ,
242+ logutil .Error ("[SPILL] Group operator failed to restore and merge spilled aggregators" ,
243243 zap .String ("spill_id" , string (spillID )), zap .Error (err ))
244244 spillState .Free (proc .Mp ())
245245 return err
246246 }
247247
248248 spillState .Free (proc .Mp ())
249249 if err = group .SpillManager .Delete (spillID ); err != nil {
250- logutil .Error ("Group operator failed to delete spilled data" ,
250+ logutil .Error ("[SPILL] Group operator failed to delete spilled data" ,
251251 zap .String ("spill_id" , string (spillID )), zap .Error (err ))
252252 return err
253253 }
254254
255- logutil .Info ("Group operator completed merge of spilled state" ,
255+ logutil .Info ("[SPILL] Group operator completed merge of spilled state" ,
256256 zap .String ("spill_id" , string (spillID )))
257257 }
258258
259- logutil .Info ("Group operator completed merge of all spilled results" ,
259+ logutil .Info ("[SPILL] Group operator completed merge of all spilled results" ,
260260 zap .Int ("merged_states_count" , len (group .ctr .spilledStates )))
261261
262262 group .ctr .spilledStates = nil
@@ -265,16 +265,16 @@ func (group *Group) mergeSpilledResults(proc *process.Process) error {
265265
266266func (group * Group ) restoreAndMergeSpilledAggregators (proc * process.Process , spillState * SpillableAggState ) error {
267267 if len (spillState .MarshaledAggStates ) == 0 {
268- logutil .Info ("Group operator restore found no marshaled aggregator states" )
268+ logutil .Info ("[SPILL] Group operator restore found no marshaled aggregator states" )
269269 return nil
270270 }
271271
272- logutil .Info ("Group operator restoring spilled aggregators" ,
272+ logutil .Info ("[SPILL] Group operator restoring spilled aggregators" ,
273273 zap .Int ("agg_states_count" , len (spillState .MarshaledAggStates )),
274274 zap .Int ("group_count" , spillState .GroupCount ))
275275
276276 if len (group .ctr .result1 .AggList ) == 0 {
277- logutil .Info ("Group operator initializing aggregators from spilled state" )
277+ logutil .Info ("[SPILL] Group operator initializing aggregators from spilled state" )
278278 aggs := make ([]aggexec.AggFuncExec , len (spillState .MarshaledAggStates ))
279279 defer func () {
280280 if group .ctr .result1 .AggList == nil {
@@ -293,7 +293,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
293293
294294 agg , err := aggexec .UnmarshalAggFuncExec (aggexec .NewSimpleAggMemoryManager (proc .Mp ()), marshaledState )
295295 if err != nil {
296- logutil .Error ("Group operator failed to unmarshal aggregator" ,
296+ logutil .Error ("[SPILL] Group operator failed to unmarshal aggregator" ,
297297 zap .Int ("agg_index" , i ), zap .Error (err ))
298298 return err
299299 }
@@ -302,7 +302,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
302302 aggExpr := group .Aggs [i ]
303303 if config := aggExpr .GetExtraConfig (); config != nil {
304304 if err = agg .SetExtraInformation (config , 0 ); err != nil {
305- logutil .Error ("Group operator failed to set extra information for aggregator" ,
305+ logutil .Error ("[SPILL] Group operator failed to set extra information for aggregator" ,
306306 zap .Int ("agg_index" , i ), zap .Error (err ))
307307 agg .Free ()
308308 return err
@@ -318,7 +318,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
318318 group .ctr .result1 .ChunkSize = chunkSize
319319 group .ctr .result1 .AggList = aggs
320320
321- logutil .Info ("Group operator initialized aggregators from spilled state" ,
321+ logutil .Info ("[SPILL] Group operator initialized aggregators from spilled state" ,
322322 zap .Int ("chunk_size" , chunkSize ),
323323 zap .Int ("agg_count" , len (aggs )))
324324
@@ -335,7 +335,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
335335 for i , vec := range spillState .GroupVectors {
336336 if vec != nil && i < len (bat .Vecs ) {
337337 if err := bat .Vecs [i ].UnionBatch (vec , int64 (offset ), size , nil , proc .Mp ()); err != nil {
338- logutil .Error ("Group operator failed to union batch during restore" ,
338+ logutil .Error ("[SPILL] Group operator failed to union batch during restore" ,
339339 zap .Int ("vec_index" , i ), zap .Int ("offset" , offset ), zap .Error (err ))
340340 bat .Clean (proc .Mp ())
341341 for _ , b := range batchesToAdd {
@@ -354,14 +354,14 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
354354 return nil
355355 }
356356
357- logutil .Info ("Group operator merging spilled aggregators with existing ones" ,
357+ logutil .Info ("[SPILL] Group operator merging spilled aggregators with existing ones" ,
358358 zap .Int ("existing_agg_count" , len (group .ctr .result1 .AggList )),
359359 zap .Int ("spilled_group_count" , spillState .GroupCount ))
360360
361361 for _ , currentAgg := range group .ctr .result1 .AggList {
362362 if currentAgg != nil {
363363 if err := currentAgg .GroupGrow (spillState .GroupCount ); err != nil {
364- logutil .Error ("Group operator failed to grow aggregator groups" , zap .Error (err ))
364+ logutil .Error ("[SPILL] Group operator failed to grow aggregator groups" , zap .Error (err ))
365365 return err
366366 }
367367 }
@@ -383,7 +383,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
383383
384384 agg , err := aggexec .UnmarshalAggFuncExec (aggexec .NewSimpleAggMemoryManager (proc .Mp ()), marshaledState )
385385 if err != nil {
386- logutil .Error ("Group operator failed to unmarshal aggregator for merge" ,
386+ logutil .Error ("[SPILL] Group operator failed to unmarshal aggregator for merge" ,
387387 zap .Int ("agg_index" , i ), zap .Error (err ))
388388 return err
389389 }
@@ -392,7 +392,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
392392 aggExpr := group .Aggs [i ]
393393 if config := aggExpr .GetExtraConfig (); config != nil {
394394 if err = agg .SetExtraInformation (config , 0 ); err != nil {
395- logutil .Error ("Group operator failed to set extra information for temp aggregator" ,
395+ logutil .Error ("[SPILL] Group operator failed to set extra information for temp aggregator" ,
396396 zap .Int ("agg_index" , i ), zap .Error (err ))
397397 agg .Free ()
398398 return err
@@ -423,7 +423,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
423423 for spilledGroupIdx := 0 ; spilledGroupIdx < spillState .GroupCount ; spilledGroupIdx ++ {
424424 currentGroupIdx := currentGroupCount + spilledGroupIdx
425425 if err := currentAgg .Merge (tempAgg , currentGroupIdx , spilledGroupIdx ); err != nil {
426- logutil .Error ("Group operator failed to merge aggregator groups" ,
426+ logutil .Error ("[SPILL] Group operator failed to merge aggregator groups" ,
427427 zap .Int ("agg_index" , i ),
428428 zap .Int ("current_group_idx" , currentGroupIdx ),
429429 zap .Int ("spilled_group_idx" , spilledGroupIdx ),
@@ -450,7 +450,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
450450 for i , vec := range spillState .GroupVectors {
451451 if vec != nil && i < len (bat .Vecs ) {
452452 if err := bat .Vecs [i ].UnionBatch (vec , int64 (offset ), size , nil , proc .Mp ()); err != nil {
453- logutil .Error ("Group operator failed to union batch during merge" ,
453+ logutil .Error ("[SPILL] Group operator failed to union batch during merge" ,
454454 zap .Int ("vec_index" , i ), zap .Int ("offset" , offset ), zap .Error (err ))
455455 bat .Clean (proc .Mp ())
456456 for _ , b := range batchesToAdd {
@@ -466,7 +466,7 @@ func (group *Group) restoreAndMergeSpilledAggregators(proc *process.Process, spi
466466 group .ctr .result1 .ToPopped = append (group .ctr .result1 .ToPopped , batchesToAdd ... )
467467 }
468468
469- logutil .Info ("Group operator completed restore and merge of spilled aggregators" ,
469+ logutil .Info ("[SPILL] Group operator completed restore and merge of spilled aggregators" ,
470470 zap .Int ("final_batch_count" , len (group .ctr .result1 .ToPopped )))
471471
472472 return nil
0 commit comments