diff --git a/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml b/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml index d9f0316380f..e2b14e835f9 100644 --- a/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml +++ b/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml @@ -14,7 +14,7 @@ env: jobs: build: name: Run endtoend tests on Cluster (vttablet_prscomplex) - runs-on: ubuntu-18.04 + runs-on: ubuntu-20.04 steps: - name: Check if workflow needs to be skipped @@ -71,8 +71,16 @@ jobs: - name: Get dependencies if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' run: | + # Setup Percona Server for MySQL 8.0 sudo apt-get update - sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo apt-get install -y lsb-release gnupg2 curl + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo DEBIAN_FRONTEND="noninteractive" dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo percona-release setup ps80 + sudo apt-get update + + # Install everything else we need, and configure + sudo apt-get install -y percona-server-server percona-server-client make unzip g++ etcd git wget eatmydata xz-utils sudo service mysql stop sudo service etcd stop sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ @@ -96,7 +104,7 @@ jobs: - name: Run cluster endtoend test if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - timeout-minutes: 30 + timeout-minutes: 45 run: | # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file # which musn't be more than 107 characters long. diff --git a/go/test/endtoend/vtgate/transaction/restart/main_test.go b/go/test/endtoend/vtgate/transaction/restart/main_test.go new file mode 100644 index 00000000000..3c7ac710e9d --- /dev/null +++ b/go/test/endtoend/vtgate/transaction/restart/main_test.go @@ -0,0 +1,114 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package misc + +import ( + "context" + _ "embed" + "flag" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "test" + + //go:embed schema.sql + schemaSQL string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + } + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false) + if err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--planner-version=gen4", + "--mysql_default_workload=olap") + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +/* +TestStreamTxRestart tests that when a connection is killed my mysql (may be due to restart), +then the transaction should not continue to serve the query via reconnect. +*/ +func TestStreamTxRestart(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + utils.Exec(t, conn, "begin") + // BeginStreamExecute + _ = utils.Exec(t, conn, "select connection_id()") + + // StreamExecute + _ = utils.Exec(t, conn, "select connection_id()") + + // restart the mysql to terminate all the existing connections. + primTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet() + err = primTablet.MysqlctlProcess.Stop() + require.NoError(t, err) + err = primTablet.MysqlctlProcess.StartProvideInit(false) + require.NoError(t, err) + + // query should return connection error + _, err = utils.ExecAllowError(t, conn, "select connection_id()") + require.Error(t, err) + assert.Contains(t, err.Error(), "broken pipe (errno 2006) (sqlstate HY000)") +} diff --git a/go/test/endtoend/vtgate/transaction/restart/schema.sql b/go/test/endtoend/vtgate/transaction/restart/schema.sql new file mode 100644 index 00000000000..3e78cab09d6 --- /dev/null +++ b/go/test/endtoend/vtgate/transaction/restart/schema.sql @@ -0,0 +1,5 @@ +create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; \ No newline at end of file diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index 3b30b46d1f4..971e09c77cf 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -275,6 +275,24 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(* return err } +// StreamOnce executes the query and streams the results. But, does not retry on connection errors. +func (dbc *DBConn) StreamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int, includedFields querypb.ExecuteOptions_IncludedFields) error { + resultSent := false + return dbc.streamOnce( + ctx, + query, + func(r *sqltypes.Result) error { + if !resultSent { + resultSent = true + r = r.StripMetadata(includedFields) + } + return callback(r) + }, + alloc, + streamBufferSize, + ) +} + var ( getModeSQL = "select @@global.sql_mode" getAutocommit = "select @@autocommit" diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 214fced7005..4c85c947bac 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -303,7 +303,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error { } var replaceKeyspace string - if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL { + if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL && qre.tsv.sm.target.Keyspace != qre.tsv.config.DB.DBName { replaceKeyspace = qre.tsv.sm.target.Keyspace } @@ -1006,29 +1006,23 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.DBConn, isTransaction boo return callback(result) } - qd := NewQueryDetail(qre.logStats.Ctx, conn) + start := time.Now() + defer qre.logStats.AddRewrittenSQL(sql, start) // Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it // to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional) // weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go. // This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange // once their grace period is over. + qd := NewQueryDetail(qre.logStats.Ctx, conn) if isTransaction { qre.tsv.statefulql.Add(qd) defer qre.tsv.statefulql.Remove(qd) - } else { - qre.tsv.olapql.Add(qd) - defer qre.tsv.olapql.Remove(qd) - } - - start := time.Now() - err := conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options)) - qre.logStats.AddRewrittenSQL(sql, start) - if err != nil { - // MySQL error that isn't due to a connection issue - return err + return conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options)) } - return nil + qre.tsv.olapql.Add(qd) + defer qre.tsv.olapql.Remove(qd) + return conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options)) } func (qre *QueryExecutor) recordUserQuery(queryType string, duration int64) { diff --git a/test/config.json b/test/config.json index 9d71d274af7..a3f63e172a4 100644 --- a/test/config.json +++ b/test/config.json @@ -801,6 +801,15 @@ "RetryMax": 1, "Tags": [] }, + "vtgate_transaction_restart": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/restart"], + "Command": [], + "Manual": false, + "Shard": "vtgate_transaction", + "RetryMax": 1, + "Tags": [] + }, "vtgate_transaction_rollback": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/rollback"],