Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve error handling in VTGate VStreams #17558

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 53 additions & 38 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func(events []*binlogdatapb.VEvent) error) error {
vgtid, filter, flags, err := vsm.resolveParams(ctx, tabletType, vgtid, filter, flags)
if err != nil {
return err
return vterrors.Wrap(err, "failed to resolve vstream parameters")
}
ts, err := vsm.toposerv.GetTopoServer()
if err != nil {
return err
return vterrors.Wrap(err, "failed to get topology server")
}
if ts == nil {
log.Errorf("unable to get topo server in VStream()")
return fmt.Errorf("unable to get topo server")
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "unable to get topoology server")
}
vs := &vstream{
vgtid: vgtid,
Expand Down Expand Up @@ -215,7 +215,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
flags = &vtgatepb.VStreamFlags{}
}
if vgtid == nil || len(vgtid.ShardGtids) == 0 {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position")
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position in ShardGtids")
}
// To fetch from all keyspaces, the input must contain a single ShardGtid
// that has an empty keyspace, and the Gtid must be "current".
Expand All @@ -228,7 +228,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
newvgtid := &binlogdatapb.VGtid{}
keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, vterrors.Wrapf(err, "failed to get keyspace names in cell %s", vsm.cell)
}

if isEmpty {
Expand All @@ -244,7 +244,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
} else {
re, err := regexp.Compile(strings.Trim(inputKeyspace, "/"))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, vterrors.Wrapf(err, "failed to compile regexp using %s", inputKeyspace)
}
for _, keyspace := range keyspaces {
if re.MatchString(keyspace) {
Expand All @@ -262,12 +262,13 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
for _, sgtid := range vgtid.ShardGtids {
if sgtid.Shard == "" {
if sgtid.Gtid != "current" && sgtid.Gtid != "" {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %v", vgtid)
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %+v",
vgtid)
}
// TODO(sougou): this should work with the new Migrate workflow
_, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, vterrors.Wrapf(err, "failed to get shards in keyspace %s", sgtid.Keyspace)
}
for _, shard := range allShards {
newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{
Expand Down Expand Up @@ -329,23 +330,23 @@ func (vs *vstream) sendEvents(ctx context.Context) {
send := func(evs []*binlogdatapb.VEvent) error {
if err := vs.send(evs); err != nil {
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, "error sending events")
})
return err
return vterrors.Wrap(err, "error sending events")
}
return nil
}
for {
select {
case <-ctx.Done():
vs.once.Do(func() {
vs.setError(fmt.Errorf("context canceled"))
vs.setError(ctx.Err(), "context ended while sending events")
})
return
case evs := <-vs.eventCh:
if err := send(evs); err != nil {
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, "error sending events")
})
return
}
Expand All @@ -359,7 +360,7 @@ func (vs *vstream) sendEvents(ctx context.Context) {
}}
if err := send(evs); err != nil {
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, "error sending heartbeat")
})
return
}
Expand All @@ -378,7 +379,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
if err != nil {
log.Errorf("Error in vstream for %+v: %s", sgtid, err)
vs.once.Do(func() {
vs.setError(err)
vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid))
vs.cancel()
})
}
Expand Down Expand Up @@ -464,10 +465,12 @@ func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent,
}
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while waiting for skew to reduce for stream %s from %s/%s",
streamID, keyspace, shard)
case <-time.After(time.Duration(vs.skewTimeoutSeconds) * time.Second):
log.Errorf("timed out while waiting for skew to reduce: %s", streamID)
return fmt.Errorf("timed out while waiting for skew to reduce: %s", streamID)
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "timed out while waiting for skew to reduce for stream %s from %s/%s",
streamID, keyspace, shard)
case <-vs.skewCh:
// once skew is fixed the channel is closed and all waiting streams "wake up"
}
Expand Down Expand Up @@ -500,7 +503,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
for {
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from %s/%s", sgtid.Keyspace, sgtid.Shard)
case <-journalDone:
// Unreachable.
// This can happen if a server misbehaves and does not end
Expand Down Expand Up @@ -544,8 +547,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err != nil {
return tabletPickerErr(err)
}
log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
tabletAliasString := topoproto.TabletAliasString(tablet.Alias)
log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), tabletAliasString, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))

target := &querypb.Target{
Keyspace: sgtid.Keyspace,
Expand All @@ -556,7 +560,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(ctx, tablet.Alias, target)
if err != nil {
log.Errorf(err.Error())
return err
return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletAliasString)
}

errCh := make(chan error, 1)
Expand All @@ -565,9 +569,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
switch {
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletAliasString)
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
err = fmt.Errorf("health check failed on %s", tabletAliasString)
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
Expand All @@ -580,6 +584,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletAliasString)
errCh <- err
return err
}
Expand All @@ -604,7 +609,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
Options: options,
}
var vstreamCreatedOnce sync.Once
log.Infof("Starting to vstream from %s, with req %+v", topoproto.TabletAliasString(tablet.Alias), req)
log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand All @@ -617,9 +622,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha

select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
case streamErr := <-errCh:
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
return vterrors.Wrapf(streamErr, "error streaming from tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
case <-journalDone:
// Unreachable.
// This can happen if a server misbehaves and does not end
Expand All @@ -628,6 +635,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
default:
}

aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard)
sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletAliasString)

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for i, event := range events {
switch event.Type {
Expand All @@ -648,11 +658,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
eventss = append(eventss, sendevents)

if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
return vterrors.Wrap(err, aligningStreamsErr)
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
return vterrors.Wrap(err, sendingEventsErr)
}
eventss = nil
sendevents = nil
Expand All @@ -664,11 +674,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
eventss = append(eventss, sendevents)

if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
return vterrors.Wrap(err, aligningStreamsErr)
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
return vterrors.Wrap(err, sendingEventsErr)
}
eventss = nil
sendevents = nil
Expand All @@ -677,7 +687,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Otherwise they can accumulate indefinitely if there are no real events.
// TODO(sougou): figure out a model for this.
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
return vterrors.Wrap(err, aligningStreamsErr)
}
case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
Expand All @@ -698,14 +708,15 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
eventss = append(eventss, sendevents)
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
return vterrors.Wrap(err, sendingEventsErr)
}
eventss = nil
sendevents = nil
}
je, err := vs.getJournalEvent(ctx, sgtid, journal)
if err != nil {
return err
return vterrors.Wrapf(err, "error getting journal event for shard GTID %+v on tablet %s",
sgtid, tabletAliasString)
}
if je != nil {
var endTimer *time.Timer
Expand All @@ -725,7 +736,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
journalDone = je.done
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s",
sgtid, tabletAliasString)
case <-journalDone:
if endTimer != nil {
<-endTimer.C
Expand All @@ -752,13 +764,15 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
if err == nil {
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly on tablet %s in %s/%s",
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
}

retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s",
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
Expand All @@ -768,7 +782,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
return vterrors.Wrapf(err, "persistent error in vstream for %s/%s on tablet %s; giving up",
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
}
log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
}
Expand Down Expand Up @@ -884,10 +899,10 @@ func (vs *vstream) getError() error {
return vs.err
}

func (vs *vstream) setError(err error) {
func (vs *vstream) setError(err error, msg string) {
vs.errMu.Lock()
defer vs.errMu.Unlock()
vs.err = err
vs.err = vterrors.Wrap(err, msg)
}

// getJournalEvent returns a journalEvent. The caller has to wait on its done channel.
Expand Down
Loading