Skip to content

Commit 185b31e

Browse files
[release-19.0] Fix deadlock in messager and health streamer (vitessio#17230) (vitessio#17233)
Signed-off-by: Manan Gupta <[email protected]> Co-authored-by: Manan Gupta <[email protected]> Co-authored-by: Manan Gupta <[email protected]>
1 parent 3a35868 commit 185b31e

File tree

5 files changed

+135
-35
lines changed

5 files changed

+135
-35
lines changed

go/vt/vttablet/tabletserver/health_streamer.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,18 @@ type healthStreamer struct {
6868
degradedThreshold time.Duration
6969
unhealthyThreshold atomic.Int64
7070

71-
mu sync.Mutex
72-
ctx context.Context
73-
cancel context.CancelFunc
74-
clients map[chan *querypb.StreamHealthResponse]struct{}
75-
state *querypb.StreamHealthResponse
71+
// cancelMu is a mutex used to protect the cancel variable
72+
// and for ensuring we don't call setup functions in parallel.
73+
cancelMu sync.Mutex
74+
ctx context.Context
75+
cancel context.CancelFunc
76+
77+
// fieldsMu is used to protect access to the fields below.
78+
// We require two separate mutexes, so that we don't have to acquire the same mutex
79+
// in Close and reload that can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
80+
fieldsMu sync.Mutex
81+
clients map[chan *querypb.StreamHealthResponse]struct{}
82+
state *querypb.StreamHealthResponse
7683
// isServingPrimary stores if this tablet is currently the serving primary or not.
7784
isServingPrimary bool
7885

@@ -126,8 +133,8 @@ func (hs *healthStreamer) InitDBConfig(target *querypb.Target, cp dbconfigs.Conn
126133
}
127134

128135
func (hs *healthStreamer) Open() {
129-
hs.mu.Lock()
130-
defer hs.mu.Unlock()
136+
hs.cancelMu.Lock()
137+
defer hs.cancelMu.Unlock()
131138

132139
if hs.cancel != nil {
133140
return
@@ -140,8 +147,8 @@ func (hs *healthStreamer) Open() {
140147
}
141148

142149
func (hs *healthStreamer) Close() {
143-
hs.mu.Lock()
144-
defer hs.mu.Unlock()
150+
hs.cancelMu.Lock()
151+
defer hs.cancelMu.Unlock()
145152

146153
if hs.cancel != nil {
147154
hs.se.UnregisterNotifier("healthStreamer")
@@ -182,13 +189,16 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str
182189
}
183190

184191
func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) {
185-
hs.mu.Lock()
186-
defer hs.mu.Unlock()
192+
hs.cancelMu.Lock()
193+
defer hs.cancelMu.Unlock()
187194

188195
if hs.cancel == nil {
189196
return nil, nil
190197
}
191198

199+
hs.fieldsMu.Lock()
200+
defer hs.fieldsMu.Unlock()
201+
192202
ch := make(chan *querypb.StreamHealthResponse, streamHealthBufferSize)
193203
hs.clients[ch] = struct{}{}
194204

@@ -198,15 +208,15 @@ func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, contex
198208
}
199209

200210
func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) {
201-
hs.mu.Lock()
202-
defer hs.mu.Unlock()
211+
hs.fieldsMu.Lock()
212+
defer hs.fieldsMu.Unlock()
203213

204214
delete(hs.clients, ch)
205215
}
206216

207217
func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, ptsTimestamp time.Time, lag time.Duration, err error, serving bool) {
208-
hs.mu.Lock()
209-
defer hs.mu.Unlock()
218+
hs.fieldsMu.Lock()
219+
defer hs.fieldsMu.Unlock()
210220

211221
hs.state.Target.TabletType = tabletType
212222
if tabletType == topodatapb.TabletType_PRIMARY {
@@ -260,8 +270,8 @@ func (hs *healthStreamer) broadCastToClients(shr *querypb.StreamHealthResponse)
260270
}
261271

262272
func (hs *healthStreamer) AppendDetails(details []*kv) []*kv {
263-
hs.mu.Lock()
264-
defer hs.mu.Unlock()
273+
hs.fieldsMu.Lock()
274+
defer hs.fieldsMu.Unlock()
265275
if hs.state.Target.TabletType == topodatapb.TabletType_PRIMARY {
266276
return details
267277
}
@@ -306,8 +316,8 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) {
306316
// MakePrimary tells the healthstreamer that the current tablet is now the primary,
307317
// so it can read and write to the MySQL instance for schema-tracking.
308318
func (hs *healthStreamer) MakePrimary(serving bool) {
309-
hs.mu.Lock()
310-
defer hs.mu.Unlock()
319+
hs.fieldsMu.Lock()
320+
defer hs.fieldsMu.Unlock()
311321
hs.isServingPrimary = serving
312322
// We register for notifications from the schema Engine only when schema tracking is enabled,
313323
// and we are going to a serving primary state.
@@ -322,15 +332,15 @@ func (hs *healthStreamer) MakePrimary(serving bool) {
322332

323333
// MakeNonPrimary tells the healthstreamer that the current tablet is now not a primary.
324334
func (hs *healthStreamer) MakeNonPrimary() {
325-
hs.mu.Lock()
326-
defer hs.mu.Unlock()
335+
hs.fieldsMu.Lock()
336+
defer hs.fieldsMu.Unlock()
327337
hs.isServingPrimary = false
328338
}
329339

330340
// reload reloads the schema from the underlying mysql for the tables that we get the alert on.
331341
func (hs *healthStreamer) reload(full map[string]*schema.Table, created, altered, dropped []*schema.Table) error {
332-
hs.mu.Lock()
333-
defer hs.mu.Unlock()
342+
hs.fieldsMu.Lock()
343+
defer hs.fieldsMu.Unlock()
334344
// Schema Reload to happen only on primary when it is serving.
335345
// We can be in a state when the primary is not serving after we have run DemotePrimary. In that case,
336346
// we don't want to run any queries in MySQL, so we shouldn't reload anything in the healthStreamer.

go/vt/vttablet/tabletserver/health_streamer_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,3 +564,46 @@ func testStream(hs *healthStreamer) (<-chan *querypb.StreamHealthResponse, conte
564564
func testBlpFunc() (int64, int32) {
565565
return 1, 2
566566
}
567+
568+
// TestDeadlockBwCloseAndReload tests the deadlock observed between Close and Reload
569+
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
570+
func TestDeadlockBwCloseAndReload(t *testing.T) {
571+
db := fakesqldb.New(t)
572+
defer db.Close()
573+
cfg := newConfig(db)
574+
env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestNotServingPrimary")
575+
alias := &topodatapb.TabletAlias{
576+
Cell: "cell",
577+
Uid: 1,
578+
}
579+
se := schema.NewEngineForTests()
580+
// Create a new health streamer and set it to a serving primary state
581+
hs := newHealthStreamer(env, alias, se)
582+
hs.signalWhenSchemaChange = true
583+
hs.InitDBConfig(&querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, cfg.DB.DbaWithDB())
584+
hs.Open()
585+
hs.MakePrimary(true)
586+
defer hs.Close()
587+
588+
wg := sync.WaitGroup{}
589+
wg.Add(2)
590+
// Try running Close and reload in parallel multiple times.
591+
// This reproduces the deadlock quite readily.
592+
go func() {
593+
defer wg.Done()
594+
for i := 0; i < 100; i++ {
595+
hs.Close()
596+
hs.Open()
597+
}
598+
}()
599+
600+
go func() {
601+
defer wg.Done()
602+
for i := 0; i < 100; i++ {
603+
se.BroadcastForTesting(nil, nil, nil)
604+
}
605+
}()
606+
607+
// Wait for wait group to finish.
608+
wg.Wait()
609+
}

go/vt/vttablet/tabletserver/messager/engine.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,16 @@ type VStreamer interface {
5151

5252
// Engine is the engine for handling messages.
5353
type Engine struct {
54-
mu sync.Mutex
55-
isOpen bool
56-
managers map[string]*messageManager
54+
// mu is a mutex used to protect the isOpen variable
55+
// and for ensuring we don't call setup functions in parallel.
56+
mu sync.Mutex
57+
isOpen bool
58+
59+
// managersMu is a mutex used to protect the managers field.
60+
// We require two separate mutexes, so that we don't have to acquire the same mutex
61+
// in Close and schemaChanged which can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229.
62+
managersMu sync.Mutex
63+
managers map[string]*messageManager
5764

5865
tsv TabletService
5966
se *schema.Engine
@@ -75,15 +82,12 @@ func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine {
7582
// Open starts the Engine service.
7683
func (me *Engine) Open() {
7784
me.mu.Lock()
85+
defer me.mu.Unlock()
7886
if me.isOpen {
79-
me.mu.Unlock()
8087
return
8188
}
8289
me.isOpen = true
83-
me.mu.Unlock()
8490
log.Info("Messager: opening")
85-
// Unlock before invoking RegisterNotifier because it
86-
// obtains the same lock.
8791
me.se.RegisterNotifier("messages", me.schemaChanged, true)
8892
}
8993

@@ -101,6 +105,8 @@ func (me *Engine) Close() {
101105
log.Infof("messager Engine - unregistering notifiers")
102106
me.se.UnregisterNotifier("messages")
103107
log.Infof("messager Engine - closing all managers")
108+
me.managersMu.Lock()
109+
defer me.managersMu.Unlock()
104110
for _, mm := range me.managers {
105111
mm.Close()
106112
}
@@ -109,8 +115,8 @@ func (me *Engine) Close() {
109115
}
110116

111117
func (me *Engine) GetGenerator(name string) (QueryGenerator, error) {
112-
me.mu.Lock()
113-
defer me.mu.Unlock()
118+
me.managersMu.Lock()
119+
defer me.managersMu.Unlock()
114120
mm := me.managers[name]
115121
if mm == nil {
116122
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found in schema", name)
@@ -131,6 +137,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
131137
if !me.isOpen {
132138
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "messager engine is closed, probably because this is not a primary any more")
133139
}
140+
me.managersMu.Lock()
141+
defer me.managersMu.Unlock()
134142
mm := me.managers[name]
135143
if mm == nil {
136144
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found", name)
@@ -139,8 +147,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
139147
}
140148

141149
func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table) {
142-
me.mu.Lock()
143-
defer me.mu.Unlock()
150+
me.managersMu.Lock()
151+
defer me.managersMu.Unlock()
144152
for _, table := range append(dropped, altered...) {
145153
name := table.Name.String()
146154
mm := me.managers[name]

go/vt/vttablet/tabletserver/messager/engine_test.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package messager
1919
import (
2020
"context"
2121
"reflect"
22+
"sync"
2223
"testing"
2324

2425
"vitess.io/vitess/go/sqltypes"
@@ -156,7 +157,7 @@ func newTestEngine() *Engine {
156157
tsv := &fakeTabletServer{
157158
Env: tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "MessagerTest"),
158159
}
159-
se := schema.NewEngine(tsv)
160+
se := schema.NewEngineForTests()
160161
te := NewEngine(tsv, se, newFakeVStreamer())
161162
te.Open()
162163
return te
@@ -169,3 +170,33 @@ func newEngineReceiver() (f func(qr *sqltypes.Result) error, ch chan *sqltypes.R
169170
return nil
170171
}, ch
171172
}
173+
174+
// TestDeadlockBwCloseAndSchemaChange tests the deadlock observed between Close and schemaChanged
175+
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229.
176+
func TestDeadlockBwCloseAndSchemaChange(t *testing.T) {
177+
engine := newTestEngine()
178+
defer engine.Close()
179+
se := engine.se
180+
181+
wg := sync.WaitGroup{}
182+
wg.Add(2)
183+
// Try running Close and schemaChanged in parallel multiple times.
184+
// This reproduces the deadlock quite readily.
185+
go func() {
186+
defer wg.Done()
187+
for i := 0; i < 100; i++ {
188+
engine.Close()
189+
engine.Open()
190+
}
191+
}()
192+
193+
go func() {
194+
defer wg.Done()
195+
for i := 0; i < 100; i++ {
196+
se.BroadcastForTesting(nil, nil, nil)
197+
}
198+
}()
199+
200+
// Wait for wait group to finish.
201+
wg.Wait()
202+
}

go/vt/vttablet/tabletserver/schema/engine.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,13 @@ func (se *Engine) broadcast(created, altered, dropped []*Table) {
743743
}
744744
}
745745

746+
// BroadcastForTesting is meant to be a testing function that triggers a broadcast call.
747+
func (se *Engine) BroadcastForTesting(created, altered, dropped []*Table) {
748+
se.mu.Lock()
749+
defer se.mu.Unlock()
750+
se.broadcast(created, altered, dropped)
751+
}
752+
746753
// GetTable returns the info for a table.
747754
func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table {
748755
se.mu.Lock()
@@ -829,6 +836,7 @@ func NewEngineForTests() *Engine {
829836
tables: make(map[string]*Table),
830837
historian: newHistorian(false, 0, nil),
831838
env: tabletenv.NewEnv(vtenv.NewTestEnv(), tabletenv.NewDefaultConfig(), "SchemaEngineForTests"),
839+
notifiers: make(map[string]notifier),
832840
}
833841
return se
834842
}

0 commit comments

Comments
 (0)