Skip to content

Commit

Permalink
Merge pull request #11397 from vegaprotocol/file-streaming-reload
Browse files Browse the repository at this point in the history
feat: enable hot reloading of broker config for ease of debugging
  • Loading branch information
EVODelavega committed Jun 20, 2024
2 parents eb07bfd + 576de79 commit c7a9e90
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 5 deletions.
42 changes: 42 additions & 0 deletions core/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type SocketClient interface {

type FileClientSend interface {
SendBatch(events []events.Event) error
Replace(*FileClient)
Close() error
}

type subscription struct {
Expand Down Expand Up @@ -128,6 +130,46 @@ func New(ctx context.Context, log *logging.Logger, config Config, stats Stats) (
return b, nil
}

func (b *Broker) ReloadConf(config Config) {
// check if we need to update the log level.
if newLvl := config.Level.Get(); newLvl != b.config.Level.Get() {
b.log.SetLevel(newLvl)
b.config.Level = config.Level
}
// if the config has enabled the file client, check if the config is different, and replace or activate the file client.
if config.File.Enabled && config.File != b.config.File {
fc, err := NewFileClient(b.log, &config.File)
if err != nil {
// we can't instantiate the file client, though the config is updated - treat this as a fatal error?
b.log.Fatal("Could not instantiate file client", logging.Error(err))
}
if b.fileClient != nil {
b.fileClient.Replace(fc)
} else {
b.fileClient = fc
}
// update the conifg
b.config.File = config.File
} else if !config.File.Enabled && b.fileClient != nil {
// we were streaming to a file, not anymore, so close the file and set the field to nil.
oldFC := b.fileClient
b.fileClient = nil
if err := oldFC.Close(); err != nil {
b.log.Error("Disabled file streaming, but file was not closed properly", logging.Error(err))
}
b.config.File = config.File
}
// we only allow switching socket config on if it isn't already. Switching sockets is a bit of a pain.
if config.Socket.Enabled && !b.config.Socket.Enabled {
sc, err := newSocketClient(b.ctx, b.log, &config.Socket)
if err != nil {
b.log.Fatal("Could not instantiate new socket client", logging.Error(err))
}
b.socketClient = sc
b.config.Socket = config.Socket
}
}

func (b *Broker) OnTick(ctx context.Context, _ time.Time) {
if len(b.staged) == 0 {
return
Expand Down
89 changes: 84 additions & 5 deletions core/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,81 @@ func TestSendEvent(t *testing.T) {
t.Run("Send only to typed subscriber (also tests TxErrEvents are skipped)", testEventTypeSubscription)
}

func TestFileStreaming(t *testing.T) {
tstBroker := getBroker(t)
defer tstBroker.Finish()
// create a temp file to use, close it so the opening is definitely handled by the file client, but remove it after the tests.
tmp, err := os.CreateTemp("", "file_client_tmp")
require.NoError(t, err)
tmpName := tmp.Name()
require.NoError(t, tmp.Close())
defer os.Remove(tmpName)
cfg := broker.NewDefaultConfig()
// current file contents:
content, err := os.ReadFile(tmpName)
require.NoError(t, err)
tmp2, err := os.CreateTemp("", "file_client_tmp_2")
require.NoError(t, err)
tmp2Name := tmp2.Name()
defer os.Remove(tmp2Name)
require.NoError(t, tmp2.Close())
content2, err := os.ReadFile(tmp2Name)
require.NoError(t, err)
require.Equal(t, content, content2)
t.Run("Reload config, toggling file streaming", func(t *testing.T) {
cfg.File.Enabled = true
cfg.File.File = tmpName
tstBroker.ReloadConf(cfg)
batchSendTest(t, tstBroker)
// now disable file streaming (closes the temp file)
cfg.File.Enabled = false
cfg.File.File = ""
tstBroker.ReloadConf(cfg)
newContent, err := os.ReadFile(tmpName)
require.NoError(t, err)
require.NotEqual(t, newContent, content)
require.True(t, len(content) < len(newContent))
})
t.Run("Reload config, switch file output", func(t *testing.T) {
cfg.File.Enabled = true
cfg.File.File = tmpName
batchSendTest(t, tstBroker)
cfg.File.File = tmp2Name
tstBroker.ReloadConf(cfg)
// turn the file streaming off
cfg.File.Enabled = false
cfg.File.File = ""
tstBroker.ReloadConf(cfg)
tmpCont, err := os.ReadFile(tmpName)
require.NoError(t, err)
tmp2Cont, err := os.ReadFile(tmp2Name)
require.NoError(t, err)
require.NotEqual(t, tmpCont, tmp2Cont)
require.Equal(t, tmp2Cont, content2)
})
t.Run("Reload Config, switch file output, but write to the new file", func(t *testing.T) {
cfg.File.Enabled = true
cfg.File.File = tmpName
tstBroker.ReloadConf(cfg)
// send another batch, this doubles the data in the file because we used tmpName in tests above
batchSendTest(t, tstBroker)
cfg.File.File = tmp2Name
tstBroker.ReloadConf(cfg)
// send the data once here...
batchSendTest(t, tstBroker)
// turn the file streaming off (optional step)
cfg.File.Enabled = false
cfg.File.File = ""
tstBroker.ReloadConf(cfg)
tmpCont, err := os.ReadFile(tmpName)
require.NoError(t, err)
tmp2Cont, err := os.ReadFile(tmp2Name)
require.NoError(t, err)
require.NotEqual(t, tmpCont, tmp2Cont)
require.NotEqual(t, tmp2Cont, content2)
})
}

func testSequenceIDGenSeveralBlocksOrdered(t *testing.T) {
t.Parallel()
tstBroker := getBroker(t)
Expand Down Expand Up @@ -341,13 +416,16 @@ func testUnsubscribeAutomaticallyIfSubscriberIsClosed(t *testing.T) {
func testSendBatch(t *testing.T) {
t.Parallel()
tstBroker := getBroker(t)
defer tstBroker.Finish()
batchSendTest(t, tstBroker)
}

func batchSendTest(t *testing.T, tstBroker *brokerTst) {
t.Helper()
sub := mocks.NewMockSubscriber(tstBroker.ctrl)
cancelCh := make(chan struct{})
defer func() {
tstBroker.Finish()
close(cancelCh)
}()
sub.EXPECT().Types().Times(1).Return(nil)
defer close(cancelCh)
sub.EXPECT().Types().Times(2).Return(nil)
sub.EXPECT().Ack().AnyTimes().Return(true)
sub.EXPECT().SetID(gomock.Any()).Times(1)
k1 := tstBroker.Subscribe(sub)
Expand All @@ -373,6 +451,7 @@ func testSendBatch(t *testing.T) {
require.Equal(t, uint64(3), tstBroker.stats.CurrentEventsInBatch())
tstBroker.stats.NewBatch()
require.Equal(t, uint64(3), tstBroker.stats.TotalEventsLastBatch())
tstBroker.Unsubscribe(k1)
}

func testSendBatchChannel(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions core/broker/file_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ func WriteRawToBufferFile(bufferFile *os.File, bufferSeqNum uint64, rawEvent []b
return nil
}

// Replace - in case of a config change, just replace the file we're writing to with the new one.
func (fc *FileClient) Replace(newFC *FileClient) {
fc.mut.Lock()
defer fc.mut.Unlock()
fc.config = newFC.config
_ = fc.file.Close()
fc.file = newFC.file
}

func (fc *FileClient) Close() error {
return fc.file.Close()
}
1 change: 1 addition & 0 deletions core/protocol/all_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func (svcs *allServices) registerConfigWatchers() {
func(cfg config.Config) { svcs.banking.ReloadConf(cfg.Banking) },
func(cfg config.Config) { svcs.governance.ReloadConf(cfg.Governance) },
func(cfg config.Config) { svcs.stats.ReloadConf(cfg.Stats) },
func(cfg config.Config) { svcs.broker.ReloadConf(cfg.Broker) },
)

if svcs.conf.HaveEthClient() {
Expand Down

0 comments on commit c7a9e90

Please sign in to comment.