Skip to content

Commit

Permalink
fix(factory): on reload, close the connection right away. (#235)
Browse files Browse the repository at this point in the history
* fix(factory): on reload, close the connection right away.

graceful shutdown logic should be implemented in the close function, not in the factory.

* fix(cron): test flakes
  • Loading branch information
Reasno committed Jan 20, 2022
1 parent eaf7ad9 commit eefb5df
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 38 deletions.
2 changes: 1 addition & 1 deletion cron/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestJobOption(t *testing.T) {
},
func(ctx context.Context) error {
entryCount++
time.Sleep(5 * time.Millisecond)
time.Sleep(6 * time.Millisecond)
return nil
},
func(t *testing.T) {
Expand Down
14 changes: 1 addition & 13 deletions di/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package di

import (
"context"
"reflect"
"runtime"
"sync"

"golang.org/x/sync/singleflight"
Expand Down Expand Up @@ -69,17 +67,7 @@ func (f *Factory) SubscribeReloadEventFrom(dispatcher contract.Dispatcher) {
if pair.Closer == nil {
return true
}
finalized := make(chan struct{})
if reflect.TypeOf(pair.Conn).Kind() == reflect.Ptr {
runtime.SetFinalizer(pair.Conn, func(_ interface{}) { finalized <- struct{}{} })
}
go func() {
select {
case <-ctx.Done():
case <-finalized:
}
pair.Closer()
}()
pair.Closer()
return true
})
return nil
Expand Down
27 changes: 3 additions & 24 deletions di/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"math/rand"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -115,41 +114,21 @@ func TestFactory_Watch(t *testing.T) {
func TestFactory_SubscribeReloadEventFrom(t *testing.T) {
t.Parallel()

var (
ptr = &struct {
Dummy string
}{Dummy: "dummy"}
closed = make(chan struct{})
)
closed := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f := NewFactory(func(_ string) (Pair, error) {
return Pair{
Conn: ptr,
Conn: &struct{}{},
Closer: func() { close(closed) },
}, nil
})
dispatcher := events.SyncDispatcher{}
f.SubscribeReloadEventFrom(&dispatcher)

foo, err := f.Make("default")
assert.NoError(t, err)
assert.Same(t, ptr, foo)

f.Make("default")
_ = dispatcher.Dispatch(ctx, events.OnReload, events.OnReloadPayload{})

// We don't want to interrupt ongoing request, so foo should not be closed by now
select {
case <-closed:
t.Fatalf("foo should not be closed.")
default:
}

// now that foo is garbage collected, we can safely close foo.
ptr = nil //nolint
foo = nil //nolint
runtime.GC()
cancel()
select {
case <-closed:
case <-time.After(4 * time.Second):
Expand Down

0 comments on commit eefb5df

Please sign in to comment.