From cb2d2918e95d80ce3882cdee12313534618b161b Mon Sep 17 00:00:00 2001
From: Harshit Gangal <harshit@planetscale.com>
Date: Mon, 17 Oct 2022 22:10:27 +0530
Subject: [PATCH 1/3] fix: stream exec once in case of transactional connection

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
---
 .../vttablet/tabletserver/connpool/dbconn.go  | 18 +++++++++++++++
 go/vt/vttablet/tabletserver/query_executor.go | 22 +++++++------------
 2 files changed, 26 insertions(+), 14 deletions(-)

diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go
index 3b30b46d1f4..971e09c77cf 100644
--- a/go/vt/vttablet/tabletserver/connpool/dbconn.go
+++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go
@@ -275,6 +275,24 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(*
 	return err
 }
 
+// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
+func (dbc *DBConn) StreamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int, includedFields querypb.ExecuteOptions_IncludedFields) error {
+	resultSent := false
+	return dbc.streamOnce(
+		ctx,
+		query,
+		func(r *sqltypes.Result) error {
+			if !resultSent {
+				resultSent = true
+				r = r.StripMetadata(includedFields)
+			}
+			return callback(r)
+		},
+		alloc,
+		streamBufferSize,
+	)
+}
+
 var (
 	getModeSQL    = "select @@global.sql_mode"
 	getAutocommit = "select @@autocommit"
diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 214fced7005..4c85c947bac 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -303,7 +303,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {
 	}
 
 	var replaceKeyspace string
-	if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL {
+	if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL && qre.tsv.sm.target.Keyspace != qre.tsv.config.DB.DBName {
 		replaceKeyspace = qre.tsv.sm.target.Keyspace
 	}
 
@@ -1006,29 +1006,23 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.DBConn, isTransaction boo
 		return callback(result)
 	}
 
-	qd := NewQueryDetail(qre.logStats.Ctx, conn)
+	start := time.Now()
+	defer qre.logStats.AddRewrittenSQL(sql, start)
 
 	// Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it
 	// to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional)
 	// weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go.
 	// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
 	// once their grace period is over.
+	qd := NewQueryDetail(qre.logStats.Ctx, conn)
 	if isTransaction {
 		qre.tsv.statefulql.Add(qd)
 		defer qre.tsv.statefulql.Remove(qd)
-	} else {
-		qre.tsv.olapql.Add(qd)
-		defer qre.tsv.olapql.Remove(qd)
-	}
-
-	start := time.Now()
-	err := conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
-	qre.logStats.AddRewrittenSQL(sql, start)
-	if err != nil {
-		// MySQL error that isn't due to a connection issue
-		return err
+		return conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
 	}
-	return nil
+	qre.tsv.olapql.Add(qd)
+	defer qre.tsv.olapql.Remove(qd)
+	return conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
 }
 
 func (qre *QueryExecutor) recordUserQuery(queryType string, duration int64) {

From fe608cb55defbbb67059115f4944f64e1d360247 Mon Sep 17 00:00:00 2001
From: Harshit Gangal <harshit@planetscale.com>
Date: Mon, 17 Oct 2022 22:55:00 +0530
Subject: [PATCH 2/3] test: added e2e test

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
---
 .../vtgate/transaction/restart/main_test.go   | 114 ++++++++++++++++++
 .../vtgate/transaction/restart/schema.sql     |   5 +
 test/config.json                              |   9 ++
 3 files changed, 128 insertions(+)
 create mode 100644 go/test/endtoend/vtgate/transaction/restart/main_test.go
 create mode 100644 go/test/endtoend/vtgate/transaction/restart/schema.sql

diff --git a/go/test/endtoend/vtgate/transaction/restart/main_test.go b/go/test/endtoend/vtgate/transaction/restart/main_test.go
new file mode 100644
index 00000000000..3c7ac710e9d
--- /dev/null
+++ b/go/test/endtoend/vtgate/transaction/restart/main_test.go
@@ -0,0 +1,114 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package misc
+
+import (
+	"context"
+	_ "embed"
+	"flag"
+	"os"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	"vitess.io/vitess/go/mysql"
+	"vitess.io/vitess/go/test/endtoend/cluster"
+	"vitess.io/vitess/go/test/endtoend/utils"
+)
+
+var (
+	clusterInstance *cluster.LocalProcessCluster
+	vtParams        mysql.ConnParams
+	keyspaceName    = "ks"
+	cell            = "test"
+
+	//go:embed schema.sql
+	schemaSQL string
+)
+
+func TestMain(m *testing.M) {
+	defer cluster.PanicHandler(nil)
+	flag.Parse()
+
+	exitCode := func() int {
+		clusterInstance = cluster.NewCluster(cell, "localhost")
+		defer clusterInstance.Teardown()
+
+		// Start topo server
+		err := clusterInstance.StartTopo()
+		if err != nil {
+			return 1
+		}
+
+		// Start keyspace
+		keyspace := &cluster.Keyspace{
+			Name:      keyspaceName,
+			SchemaSQL: schemaSQL,
+		}
+		err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
+		if err != nil {
+			return 1
+		}
+
+		// Start vtgate
+		clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
+			"--planner-version=gen4",
+			"--mysql_default_workload=olap")
+		err = clusterInstance.StartVtgate()
+		if err != nil {
+			return 1
+		}
+
+		vtParams = mysql.ConnParams{
+			Host: clusterInstance.Hostname,
+			Port: clusterInstance.VtgateMySQLPort,
+		}
+		return m.Run()
+	}()
+	os.Exit(exitCode)
+}
+
+/*
+TestStreamTxRestart tests that when a connection is killed my mysql (may be due to restart),
+then the transaction should not continue to serve the query via reconnect.
+*/
+func TestStreamTxRestart(t *testing.T) {
+	ctx := context.Background()
+	conn, err := mysql.Connect(ctx, &vtParams)
+	require.NoError(t, err)
+	defer conn.Close()
+
+	utils.Exec(t, conn, "begin")
+	// BeginStreamExecute
+	_ = utils.Exec(t, conn, "select connection_id()")
+
+	// StreamExecute
+	_ = utils.Exec(t, conn, "select connection_id()")
+
+	// restart the mysql to terminate all the existing connections.
+	primTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet()
+	err = primTablet.MysqlctlProcess.Stop()
+	require.NoError(t, err)
+	err = primTablet.MysqlctlProcess.StartProvideInit(false)
+	require.NoError(t, err)
+
+	// query should return connection error
+	_, err = utils.ExecAllowError(t, conn, "select connection_id()")
+	require.Error(t, err)
+	assert.Contains(t, err.Error(), "broken pipe (errno 2006) (sqlstate HY000)")
+}
diff --git a/go/test/endtoend/vtgate/transaction/restart/schema.sql b/go/test/endtoend/vtgate/transaction/restart/schema.sql
new file mode 100644
index 00000000000..3e78cab09d6
--- /dev/null
+++ b/go/test/endtoend/vtgate/transaction/restart/schema.sql
@@ -0,0 +1,5 @@
+create table t1(
+                   id1 bigint,
+                   id2 bigint,
+                   primary key(id1)
+) Engine=InnoDB;
\ No newline at end of file
diff --git a/test/config.json b/test/config.json
index 9d71d274af7..a3f63e172a4 100644
--- a/test/config.json
+++ b/test/config.json
@@ -801,6 +801,15 @@
 			"RetryMax": 1,
 			"Tags": []
 		},
+		"vtgate_transaction_restart": {
+			"File": "unused.go",
+			"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/restart"],
+			"Command": [],
+			"Manual": false,
+			"Shard": "vtgate_transaction",
+			"RetryMax": 1,
+			"Tags": []
+		},
 		"vtgate_transaction_rollback": {
 			"File": "unused.go",
 			"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/rollback"],

From 51fb5f70dfcd7b4ed656ec977e34424320893290 Mon Sep 17 00:00:00 2001
From: Harshit Gangal <harshit@planetscale.com>
Date: Mon, 17 Oct 2022 22:55:20 +0530
Subject: [PATCH 3/3] generate ci worflow

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
---
 .../cluster_endtoend_vttablet_prscomplex.yml       | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml b/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml
index 692c8afd7e4..1bbcc59753b 100644
--- a/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml
+++ b/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml
@@ -14,7 +14,7 @@ env:
 jobs:
   build:
     name: Run endtoend tests on Cluster (vttablet_prscomplex)
-    runs-on: ubuntu-18.04
+    runs-on: ubuntu-20.04
 
     steps:
     - name: Check if workflow needs to be skipped
@@ -71,8 +71,16 @@ jobs:
     - name: Get dependencies
       if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
       run: |
+        # Setup Percona Server for MySQL 8.0
         sudo apt-get update
-        sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
+        sudo apt-get install -y lsb-release gnupg2 curl
+        wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
+        sudo DEBIAN_FRONTEND="noninteractive" dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
+        sudo percona-release setup ps80
+        sudo apt-get update
+
+        # Install everything else we need, and configure
+        sudo apt-get install -y percona-server-server percona-server-client make unzip g++ etcd git wget eatmydata xz-utils
         sudo service mysql stop
         sudo service etcd stop
         sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
@@ -96,7 +104,7 @@ jobs:
 
     - name: Run cluster endtoend test
       if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
-      timeout-minutes: 30
+      timeout-minutes: 45
       run: |
         # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file
         # which musn't be more than 107 characters long.