Skip to content

Commit

Permalink
Merge pull request vitessio#11517 from planetscale/stream-exec-once
Browse files Browse the repository at this point in the history
fix: fail over reconnect in stream execution for connection with transaction
  • Loading branch information
harshit-gangal authored Oct 18, 2022
2 parents 460e0f1 + 51fb5f7 commit a6a1e24
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 17 deletions.
14 changes: 11 additions & 3 deletions .github/workflows/cluster_endtoend_vttablet_prscomplex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ env:
jobs:
build:
name: Run endtoend tests on Cluster (vttablet_prscomplex)
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04

steps:
- name: Check if workflow needs to be skipped
Expand Down Expand Up @@ -71,8 +71,16 @@ jobs:
- name: Get dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
run: |
# Setup Percona Server for MySQL 8.0
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo apt-get install -y lsb-release gnupg2 curl
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo DEBIAN_FRONTEND="noninteractive" dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo percona-release setup ps80
sudo apt-get update
# Install everything else we need, and configure
sudo apt-get install -y percona-server-server percona-server-client make unzip g++ etcd git wget eatmydata xz-utils
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
Expand All @@ -96,7 +104,7 @@ jobs:
- name: Run cluster endtoend test
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 30
timeout-minutes: 45
run: |
# We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file
# which musn't be more than 107 characters long.
Expand Down
114 changes: 114 additions & 0 deletions go/test/endtoend/vtgate/transaction/restart/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package misc

import (
"context"
_ "embed"
"flag"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "ks"
cell = "test"

//go:embed schema.sql
schemaSQL string
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, "localhost")
defer clusterInstance.Teardown()

// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
}

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
if err != nil {
return 1
}

// Start vtgate
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--planner-version=gen4",
"--mysql_default_workload=olap")
err = clusterInstance.StartVtgate()
if err != nil {
return 1
}

vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
}

/*
TestStreamTxRestart tests that when a connection is killed my mysql (may be due to restart),
then the transaction should not continue to serve the query via reconnect.
*/
func TestStreamTxRestart(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "begin")
// BeginStreamExecute
_ = utils.Exec(t, conn, "select connection_id()")

// StreamExecute
_ = utils.Exec(t, conn, "select connection_id()")

// restart the mysql to terminate all the existing connections.
primTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet()
err = primTablet.MysqlctlProcess.Stop()
require.NoError(t, err)
err = primTablet.MysqlctlProcess.StartProvideInit(false)
require.NoError(t, err)

// query should return connection error
_, err = utils.ExecAllowError(t, conn, "select connection_id()")
require.Error(t, err)
assert.Contains(t, err.Error(), "broken pipe (errno 2006) (sqlstate HY000)")
}
5 changes: 5 additions & 0 deletions go/test/endtoend/vtgate/transaction/restart/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
18 changes: 18 additions & 0 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(*
return err
}

// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
func (dbc *DBConn) StreamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int, includedFields querypb.ExecuteOptions_IncludedFields) error {
resultSent := false
return dbc.streamOnce(
ctx,
query,
func(r *sqltypes.Result) error {
if !resultSent {
resultSent = true
r = r.StripMetadata(includedFields)
}
return callback(r)
},
alloc,
streamBufferSize,
)
}

var (
getModeSQL = "select @@global.sql_mode"
getAutocommit = "select @@autocommit"
Expand Down
22 changes: 8 additions & 14 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {
}

var replaceKeyspace string
if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL {
if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL && qre.tsv.sm.target.Keyspace != qre.tsv.config.DB.DBName {
replaceKeyspace = qre.tsv.sm.target.Keyspace
}

Expand Down Expand Up @@ -1006,29 +1006,23 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.DBConn, isTransaction boo
return callback(result)
}

qd := NewQueryDetail(qre.logStats.Ctx, conn)
start := time.Now()
defer qre.logStats.AddRewrittenSQL(sql, start)

// Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it
// to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional)
// weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go.
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn)
if isTransaction {
qre.tsv.statefulql.Add(qd)
defer qre.tsv.statefulql.Remove(qd)
} else {
qre.tsv.olapql.Add(qd)
defer qre.tsv.olapql.Remove(qd)
}

start := time.Now()
err := conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
qre.logStats.AddRewrittenSQL(sql, start)
if err != nil {
// MySQL error that isn't due to a connection issue
return err
return conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
}
return nil
qre.tsv.olapql.Add(qd)
defer qre.tsv.olapql.Remove(qd)
return conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
}

func (qre *QueryExecutor) recordUserQuery(queryType string, duration int64) {
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,15 @@
"RetryMax": 1,
"Tags": []
},
"vtgate_transaction_restart": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/restart"],
"Command": [],
"Manual": false,
"Shard": "vtgate_transaction",
"RetryMax": 1,
"Tags": []
},
"vtgate_transaction_rollback": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/rollback"],
Expand Down

0 comments on commit a6a1e24

Please sign in to comment.