diff --git a/config.yaml b/config.yaml index 3331fc53c1..20ace0ee49 100644 --- a/config.yaml +++ b/config.yaml @@ -16,6 +16,9 @@ postgres.connection.maxidle: -1 postgres.connection.maxopen: -1 # Timeout for a transaction in minutes postgres.transaction.timeout: 5m +# For LISTEN/NOTIFY connections +postgres.listennotify.minreconnectinterval: 10s +postgres.listennotify.maxreconnectinterval: 5m #------------------------ # HTTP configuration diff --git a/configuration/configuration.go b/configuration/configuration.go index a455edd1ab..e41b5902cf 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -69,6 +69,10 @@ const ( varHeaderMaxLength = "header.maxlength" varEnvironment = "environment" + // Postgres LISTEN/NOTIFY + varPostgresListenNotifyMinReconnectionInterval = "postgres.listennotify.minreconnectinterval" + varPostgresListenNotifyMaxReconnectionInterval = "postgres.listennotify.maxreconnectinterval" + // cache control settings for a list of resources varCacheControlWorkItems = "cachecontrol.workitems" varCacheControlWorkItemEvents = "cachecontrol.workitemevents" @@ -180,6 +184,8 @@ func (c *Registry) setConfigDefaults() { c.v.SetDefault(varPostgresConnectionTimeout, 5) c.v.SetDefault(varPostgresConnectionMaxIdle, -1) c.v.SetDefault(varPostgresConnectionMaxOpen, -1) + c.v.SetDefault(varPostgresListenNotifyMinReconnectionInterval, time.Duration(10*time.Second)) + c.v.SetDefault(varPostgresListenNotifyMaxReconnectionInterval, time.Duration(5*time.Minute)) // Number of seconds to wait before trying to connect again c.v.SetDefault(varPostgresConnectionRetrySleep, time.Duration(time.Second)) @@ -335,6 +341,20 @@ func (c *Registry) GetPostgresConfigString() string { ) } +// GetPostgresListenNotifyMinReconnectInterval controls the duration to wait +// before trying to re-establish the database connection after connection loss. +// After each consecutive failure this interval is doubled, until +// GetPostgresListenNotifyMaxReconnectInterval is reached. +func (c *Registry) GetPostgresListenNotifyMinReconnectInterval() time.Duration { + return c.v.GetDuration(varPostgresListenNotifyMinReconnectionInterval) +} + +// GetPostgresListenNotifyMaxReconnectInterval see +// GetPostgresListenNotifyMinReconnectInterval. +func (c *Registry) GetPostgresListenNotifyMaxReconnectInterval() time.Duration { + return c.v.GetDuration(varPostgresListenNotifyMaxReconnectionInterval) +} + // GetPopulateCommonTypes returns true if the (as set via default, config file, or environment variable) // the common work item types such as bug or feature shall be created. func (c *Registry) GetPopulateCommonTypes() bool { diff --git a/gormsupport/listener.go b/gormsupport/listener.go new file mode 100644 index 0000000000..25fa04dd79 --- /dev/null +++ b/gormsupport/listener.go @@ -0,0 +1,80 @@ +package gormsupport + +import ( + "time" + + "github.com/fabric8-services/fabric8-wit/configuration" + "github.com/fabric8-services/fabric8-wit/log" + "github.com/lib/pq" + errs "github.com/pkg/errors" +) + +const ( + // ChanSpaceTemplateUpdates is the name for the postgres notification + // channel on which subscribers are informed about updates to the space + // templates (e.g. when a migration has happened). + ChanSpaceTemplateUpdates = "f8_space_template_updates" +) + +// A SubscriberFunc describes the function signature that a subscriber needs to +// have. The channel parameter is just an arbitrary identifier string the +// identities a channel. The extra parameter is can contain optional data that +// was sent along with the notification. +type SubscriberFunc func(channel, extra string) + +// SetupDatabaseListener sets up a Postgres LISTEN/NOTIFY connection and listens +// on events that we have subscribers for. +func SetupDatabaseListener(config configuration.Registry, subscribers map[string]SubscriberFunc) error { + if len(subscribers) == 0 { + return nil + } + + dbConnectCallback := func(ev pq.ListenerEventType, err error) { + switch ev { + case pq.ListenerEventConnected: + log.Logger().Infof("database connection for LISTEN/NOTIFY established successfully") + case pq.ListenerEventDisconnected: + log.Logger().Errorf("lost LISTEN/NOTIFY database connection: %+v", err) + case pq.ListenerEventReconnected: + log.Logger().Infof("database connection for LISTEN/NOTIFY re-established successfully") + case pq.ListenerEventConnectionAttemptFailed: + log.Logger().Errorf("failed to connect to database for LISTEN/NOTIFY: %+v", err) + } + } + + listener := pq.NewListener(config.GetPostgresConfigString(), config.GetPostgresListenNotifyMinReconnectInterval(), config.GetPostgresListenNotifyMaxReconnectInterval(), dbConnectCallback) + + // listen on every subscribed channel + for channel := range subscribers { + err := listener.Listen(channel) + if err != nil { + log.Logger().Errorf("unable to open connection to database for LISTEN/NOTIFY %v", err) + return errs.Wrapf(err, "failed listen to postgres channel \"%s\"", channel) + } + } + + // asynchronously handle notifications + go func() { + for { + select { + case n := <-listener.Notify: + sub, ok := subscribers[n.Channel] + if ok { + log.Logger().Debugf("received notification from postgres channel \"%s\": %s", n.Channel, n.Extra) + sub(n.Channel, n.Extra) + } + case <-time.After(90 * time.Second): + log.Logger().Infof("received no events for 90 seconds, checking connection") + go func() { + err := listener.Ping() + if err != nil { + log.Panic(nil, map[string]interface{}{ + "err": err, + }, "failed to ping for LISTEN/NOTIFY database connection") + } + }() + } + } + }() + return nil +} diff --git a/gormsupport/listener_test.go b/gormsupport/listener_test.go new file mode 100644 index 0000000000..39ac11d5c5 --- /dev/null +++ b/gormsupport/listener_test.go @@ -0,0 +1,71 @@ +package gormsupport_test + +import ( + "sync" + "testing" + + "github.com/fabric8-services/fabric8-wit/gormsupport" + "github.com/fabric8-services/fabric8-wit/gormtestsupport" + "github.com/fabric8-services/fabric8-wit/migration" + "github.com/fabric8-services/fabric8-wit/resource" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type TestListenerSuite struct { + gormtestsupport.DBTestSuite +} + +func TestListener(t *testing.T) { + resource.Require(t, resource.Database) + suite.Run(t, &TestListenerSuite{DBTestSuite: gormtestsupport.NewDBTestSuite()}) +} + +func (s *TestListenerSuite) TestSetupDatabaseListener() { + s.T().Run("setup listener", func(t *testing.T) { + // given + channelName := "f8_custom_event_channel" + payload := "some additional info about the event" + wg := sync.WaitGroup{} + wg.Add(2) + var executedMigration bool + + err := gormsupport.SetupDatabaseListener(*s.Configuration, map[string]gormsupport.SubscriberFunc{ + // This is the channel we send to from this test + channelName: func(channel, extra string) { + t.Logf("received notification on channel %s: %s", channel, extra) + require.Equal(t, channelName, channel) + require.Equal(t, payload, extra) + wg.Done() + }, + // This is the channel that we send to from + // migration.PopulateCommonTypes() which is called by + // gormtestsupport.DBTestSuite internally. + gormsupport.ChanSpaceTemplateUpdates: func(channel, extra string) { + // potentially the migration is executed twice but we're only + // interested in one event. + if !executedMigration { + executedMigration = true + t.Logf("received notification on channel %s: %s", channel, extra) + require.Equal(t, gormsupport.ChanSpaceTemplateUpdates, channel) + require.Equal(t, "", extra) + wg.Done() + } + }, + }) + require.NoError(t, err) + + // Send a notification from a completely different connection than the + // one we established to listen to channels. + db := s.DB.Debug().Exec("SELECT pg_notify($1, $2)", channelName, payload) + require.NoError(t, db.Error) + + // This will send a notification on the + // gormsupport.ChanSpaceTemplateUpdates channel + err = migration.PopulateCommonTypes(nil, s.DB) + require.NoError(t, err) + + // wait until notification was received + wg.Wait() + }) +} diff --git a/main.go b/main.go index 83ce572b44..df1ca644be 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,9 @@ import ( "runtime" "time" + "github.com/fabric8-services/fabric8-wit/gormsupport" + "github.com/fabric8-services/fabric8-wit/workitem" + "github.com/fabric8-services/fabric8-wit/closeable" "github.com/fabric8-services/fabric8-wit/account" @@ -140,6 +143,13 @@ func main() { os.Exit(0) } + // Ensure we delete the work item cache when we receive a notification from postgres + gormsupport.SetupDatabaseListener(*config, map[string]gormsupport.SubscriberFunc{ + gormsupport.ChanSpaceTemplateUpdates: func(channel, extra string) { + workitem.ClearGlobalWorkItemTypeCache() + }, + }) + // Make sure the database is populated with the correct types (e.g. bug etc.) if config.GetPopulateCommonTypes() { ctx := migration.NewMigrationContext(context.Background()) diff --git a/migration/migration.go b/migration/migration.go index 2ecbad7b45..44d8d40e39 100644 --- a/migration/migration.go +++ b/migration/migration.go @@ -12,6 +12,7 @@ import ( "text/template" "github.com/fabric8-services/fabric8-wit/errors" + "github.com/fabric8-services/fabric8-wit/gormsupport" "github.com/fabric8-services/fabric8-wit/log" "github.com/fabric8-services/fabric8-wit/ptr" "github.com/fabric8-services/fabric8-wit/space" @@ -674,5 +675,16 @@ func PopulateCommonTypes(ctx context.Context, db *gorm.DB) error { log.Debug(ctx, nil, `imported space template #%d "%s"`, idx, t.Template.Name) } workitem.ClearGlobalWorkItemTypeCache() // Clear the WIT cache after updating existing WITs + + // Ensure the WIT cache is cleared in all pods + db = db.Exec("SELECT pg_notify($1, '')", gormsupport.ChanSpaceTemplateUpdates) + if db.Error != nil { + log.Error(ctx, map[string]interface{}{ + "err": db.Error, + "channel": gormsupport.ChanSpaceTemplateUpdates, + }, `failed to notify postgres event subscribers about template updates`) + return errs.Wrapf(db.Error, `failed to notify postgres event subscribers about template updates`) + } + return nil }