@@ -170,6 +170,8 @@ type cbContext struct {
170170 // tracking statistics for the output topic
171171 trackOutputStats func (ctx context.Context , topic string , size int )
172172
173+ deferreds []func () error
174+
173175 msg * message
174176 done bool
175177 counters struct {
@@ -310,17 +312,25 @@ func (ctx *cbContext) Join(topic Table) interface{} {
310312 if ! ok {
311313 ctx .Fail (fmt .Errorf ("table %s not subscribed" , topic ))
312314 }
313- data , err := v .st .Get (ctx .Key ())
315+ data , getCloser , err := v .st .GetP (ctx .Key ())
314316 if err != nil {
315317 ctx .Fail (fmt .Errorf ("error getting key %s of table %s: %v" , ctx .Key (), topic , err ))
316318 } else if data == nil {
317319 return nil
318320 }
319321
320- value , err := ctx .graph .codec (string (topic )).Decode (data )
322+ if getCloser != nil {
323+ ctx .addDeferred (getCloser .Close )
324+ }
325+
326+ value , decodeCloser , err := ctx .graph .codec (string (topic )).DecodeP (data )
321327 if err != nil {
322328 ctx .Fail (fmt .Errorf ("error decoding value key %s of table %s: %v" , ctx .Key (), topic , err ))
323329 }
330+ if decodeCloser != nil {
331+ ctx .addDeferred (decodeCloser .Close )
332+ }
333+
324334 return value
325335}
326336
@@ -332,10 +342,13 @@ func (ctx *cbContext) Lookup(topic Table, key string) interface{} {
332342 if ! ok {
333343 ctx .Fail (fmt .Errorf ("topic %s not subscribed" , topic ))
334344 }
335- val , err := v .Get (key )
345+ val , getCloser , err := v .GetP (key )
336346 if err != nil {
337347 ctx .Fail (fmt .Errorf ("error getting key %s of table %s: %v" , key , topic , err ))
338348 }
349+ if getCloser != nil {
350+ ctx .addDeferred (getCloser .Close )
351+ }
339352 return val
340353}
341354
@@ -345,17 +358,26 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
345358 return nil , fmt .Errorf ("Cannot access state in stateless processor" )
346359 }
347360
348- data , err := ctx .table .Get (key )
361+ data , closer , err := ctx .table .Get (key )
349362 if err != nil {
350363 return nil , fmt .Errorf ("error reading value: %v" , err )
351- } else if data == nil {
364+ }
365+ if closer != nil {
366+ ctx .addDeferred (closer .Close )
367+ }
368+
369+ if data == nil {
352370 return nil , nil
353371 }
354372
355- value , err := ctx .graph .GroupTable ().Codec ().Decode (data )
373+ value , decodeCloser , err := ctx .graph .GroupTable ().Codec ().DecodeP (data )
356374 if err != nil {
357375 return nil , fmt .Errorf ("error decoding value: %v" , err )
358376 }
377+ if decodeCloser != nil {
378+ ctx .addDeferred (decodeCloser .Close )
379+ }
380+
359381 return value , nil
360382}
361383
@@ -450,6 +472,14 @@ func (ctx *cbContext) tryCommit(err error) {
450472 if ctx .errors .ErrorOrNil () != nil {
451473 ctx .asyncFailer (fmt .Errorf ("could not commit message with key '%s': %w" , ctx .Key (), ctx .errors .ErrorOrNil ()))
452474 } else {
475+
476+ // execute deferred commit functions in reverse order
477+ for i := len (ctx .deferreds ) - 1 ; i >= 0 ; i -- {
478+ if err := ctx .deferreds [i ](); err != nil {
479+ ctx .asyncFailer (fmt .Errorf ("error executing context deferred: %w" , err ))
480+ }
481+ }
482+
453483 ctx .commit ()
454484 }
455485
@@ -483,3 +513,7 @@ func (ctx *cbContext) DeferCommit() func(err error) {
483513 })
484514 }
485515}
516+
517+ func (ctx * cbContext ) addDeferred (def func () error ) {
518+ ctx .deferreds = append (ctx .deferreds , def )
519+ }
0 commit comments