From 95de1fc684aad04a2859dbfb5014ec416b115b33 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 4 Nov 2024 11:59:25 +0100 Subject: [PATCH] Move to native sqlite3 queries (#17124) Signed-off-by: Dirkjan Bussink Co-authored-by: Tim Vaillancourt --- go/vt/external/golib/sqlutils/dialect.go | 53 --- .../external/golib/sqlutils/sqlite_dialect.go | 161 -------- .../golib/sqlutils/sqlite_dialect_test.go | 352 ------------------ go/vt/vtorc/db/db.go | 33 +- go/vt/vtorc/inst/analysis_dao.go | 14 +- go/vt/vtorc/inst/audit_dao.go | 2 +- go/vt/vtorc/inst/instance_dao.go | 71 ++-- go/vt/vtorc/inst/instance_dao_test.go | 55 ++- go/vt/vtorc/logic/disable_recovery.go | 2 +- go/vt/vtorc/logic/topology_recovery_dao.go | 46 +-- go/vt/vtorc/process/election_dao.go | 4 +- go/vt/vtorc/process/health_dao.go | 8 +- 12 files changed, 95 insertions(+), 706 deletions(-) delete mode 100644 go/vt/external/golib/sqlutils/dialect.go delete mode 100644 go/vt/external/golib/sqlutils/sqlite_dialect.go delete mode 100644 go/vt/external/golib/sqlutils/sqlite_dialect_test.go diff --git a/go/vt/external/golib/sqlutils/dialect.go b/go/vt/external/golib/sqlutils/dialect.go deleted file mode 100644 index 8dabe57ccaf..00000000000 --- a/go/vt/external/golib/sqlutils/dialect.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -package sqlutils - -import ( - "regexp" - "strings" -) - -type regexpMap struct { - r *regexp.Regexp - replacement string -} - -func (this *regexpMap) process(text string) (result string) { - return this.r.ReplaceAllString(text, this.replacement) -} - -func rmap(regexpExpression string, replacement string) regexpMap { - return regexpMap{ - r: regexp.MustCompile(regexpSpaces(regexpExpression)), - replacement: replacement, - } -} - -func regexpSpaces(statement string) string { - return strings.Replace(statement, " ", `[\s]+`, -1) -} - -func applyConversions(statement string, conversions []regexpMap) string { - for _, rmap := range conversions { - statement = rmap.process(statement) - } - return statement -} diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect.go b/go/vt/external/golib/sqlutils/sqlite_dialect.go deleted file mode 100644 index c76f920d14a..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect.go +++ /dev/null @@ -1,161 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -// What's this about? -// This is a brute-force regular-expression based conversion from MySQL syntax to sqlite3 syntax. -// It is NOT meant to be a general purpose solution and is only expected & confirmed to run on -// queries issued by orchestrator. There are known limitations to this design. -// It's not even pretty. -// In fact... -// Well, it gets the job done at this time. Call it debt. - -package sqlutils - -import ( - "regexp" -) - -var sqlite3CreateTableConversions = []regexpMap{ - rmap(`(?i) (character set|charset) [\S]+`, ``), - rmap(`(?i)int unsigned`, `int`), - rmap(`(?i)int[\s]*[(][\s]*([0-9]+)[\s]*[)] unsigned`, `int`), - rmap(`(?i)engine[\s]*=[\s]*(innodb|myisam|ndb|memory|tokudb)`, ``), - rmap(`(?i)DEFAULT CHARSET[\s]*=[\s]*[\S]+`, ``), - rmap(`(?i)[\S]*int( not null|) auto_increment`, `integer`), - rmap(`(?i)comment '[^']*'`, ``), - rmap(`(?i)after [\S]+`, ``), - rmap(`(?i)alter table ([\S]+) add (index|key) ([\S]+) (.+)`, `create index ${3}_${1} on $1 $4`), - rmap(`(?i)alter table ([\S]+) add unique (index|key) ([\S]+) (.+)`, `create unique index ${3}_${1} on $1 $4`), - rmap(`(?i)([\S]+) enum[\s]*([(].*?[)])`, `$1 text check($1 in $2)`), - rmap(`(?i)([\s\S]+[/][*] sqlite3-skip [*][/][\s\S]+)`, ``), - rmap(`(?i)timestamp default current_timestamp`, `timestamp default ('')`), - rmap(`(?i)timestamp not null default current_timestamp`, `timestamp not null default ('')`), - - rmap(`(?i)add column (.*int) not null[\s]*$`, `add column $1 not null default 0`), - rmap(`(?i)add column (.* text) not null[\s]*$`, `add column $1 not null default ''`), - rmap(`(?i)add column (.* varchar.*) not null[\s]*$`, `add column $1 not null default ''`), -} - -var sqlite3InsertConversions = []regexpMap{ - rmap(`(?i)insert ignore ([\s\S]+) on duplicate key update [\s\S]+`, `insert or ignore $1`), - rmap(`(?i)insert ignore`, `insert or ignore`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)insert into ([\s\S]+) on duplicate key update [\s\S]+`, `replace into $1`), -} - -var sqlite3GeneralConversions = []regexpMap{ - rmap(`(?i)now[(][)][\s]*[-][\s]*interval [?] ([\w]+)`, `datetime('now', printf('-%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval [?] ([\w]+)`, `datetime('now', printf('+%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[-][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '-${1} $2')`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '+${1} $2')`), - - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[-][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('-%d $2', ?))`), - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[+][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('+%d $2', ?))`), - - rmap(`(?i)unix_timestamp[(][)]`, `strftime('%s', 'now')`), - rmap(`(?i)unix_timestamp[(]([^)]+)[)]`, `strftime('%s', $1)`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)cast[(][\s]*([\S]+) as signed[\s]*[)]`, `cast($1 as integer)`), - - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2)`), - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2 || $3)`), - - rmap(`(?i) rlike `, ` like `), -} - -var sqlite3CreateIndexConversions = []regexpMap{ - rmap(`(?i)create index([\s\S]+)[(][\s]*[0-9]+[\s]*[)]([\s\S]+)`, `create index ${1}${2}`), -} - -var sqlite3DropIndexConversions = []regexpMap{ - rmap(`(?i)drop index ([\S]+) on ([\S]+)`, `drop index if exists $1`), -} - -var ( - sqlite3IdentifyCreateTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create table`)) - sqlite3IdentifyCreateIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create( unique|) index`)) - sqlite3IdentifyDropIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*drop index`)) - sqlite3IdentifyAlterTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*alter table`)) - sqlite3IdentifyInsertStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*(insert|replace)`)) -) - -func IsInsert(statement string) bool { - return sqlite3IdentifyInsertStatement.MatchString(statement) -} - -func IsCreateTable(statement string) bool { - return sqlite3IdentifyCreateTableStatement.MatchString(statement) -} - -func IsCreateIndex(statement string) bool { - return sqlite3IdentifyCreateIndexStatement.MatchString(statement) -} - -func IsDropIndex(statement string) bool { - return sqlite3IdentifyDropIndexStatement.MatchString(statement) -} - -func IsAlterTable(statement string) bool { - return sqlite3IdentifyAlterTableStatement.MatchString(statement) -} - -func ToSqlite3CreateTable(statement string) string { - return applyConversions(statement, sqlite3CreateTableConversions) -} - -func ToSqlite3CreateIndex(statement string) string { - return applyConversions(statement, sqlite3CreateIndexConversions) -} - -func ToSqlite3DropIndex(statement string) string { - return applyConversions(statement, sqlite3DropIndexConversions) -} - -func ToSqlite3Insert(statement string) string { - statement = applyConversions(statement, sqlite3GeneralConversions) - return applyConversions(statement, sqlite3InsertConversions) -} - -// ToSqlite3Dialect converts a statement to sqlite3 dialect. The statement -// is checked in this order: -// 1. If a query, return the statement with sqlite3GeneralConversions applied. -// 2. If an insert/replace, convert with ToSqlite3Insert. -// 3. If a create index, convert with IsCreateIndex. -// 4. If an drop table, convert with IsDropIndex. -// 5. If a create table, convert with IsCreateTable. -// 6. If an alter table, convert with IsAlterTable. -// 7. As fallback, return the statement with sqlite3GeneralConversions applied. -func ToSqlite3Dialect(statement string, potentiallyDMLOrDDL bool) (translated string) { - if potentiallyDMLOrDDL { - switch { - case IsInsert(statement): - return ToSqlite3Insert(statement) - case IsCreateIndex(statement): - return ToSqlite3CreateIndex(statement) - case IsDropIndex(statement): - return ToSqlite3DropIndex(statement) - case IsCreateTable(statement): - return ToSqlite3CreateTable(statement) - case IsAlterTable(statement): - return ToSqlite3CreateTable(statement) - } - } - return applyConversions(statement, sqlite3GeneralConversions) -} diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go b/go/vt/external/golib/sqlutils/sqlite_dialect_test.go deleted file mode 100644 index cd6d1477d04..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go +++ /dev/null @@ -1,352 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -package sqlutils - -import ( - "fmt" - "regexp" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var spacesRegexp = regexp.MustCompile(`[\s]+`) - -func init() { -} - -func stripSpaces(statement string) string { - statement = strings.TrimSpace(statement) - statement = spacesRegexp.ReplaceAllString(statement, " ") - return statement -} - -func TestIsCreateTable(t *testing.T) { - require.True(t, IsCreateTable("create table t(id int)")) - require.True(t, IsCreateTable(" create table t(id int)")) - require.True(t, IsCreateTable("CREATE TABLE t(id int)")) - require.True(t, IsCreateTable(` - create table t(id int) - `)) - require.False(t, IsCreateTable("where create table t(id int)")) - require.False(t, IsCreateTable("insert")) -} - -func TestToSqlite3CreateTable(t *testing.T) { - { - statement := "create table t(id int)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, statement) - } - { - statement := "create table t(id int, v varchar(123) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar(123) NOT NULL default '')") - } - { - statement := "create table t(id int, v varchar ( 123 ) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar ( 123 ) NOT NULL default '')") - } - { - statement := "create table t(i smallint unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint(5) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint ( 5 ) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } -} - -func TestToSqlite3AlterTable(t *testing.T) { - { - statement := ` - ALTER TABLE - database_instance - ADD COLUMN sql_delay INT UNSIGNED NOT NULL AFTER replica_lag_seconds - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - ALTER TABLE - database_instance - add column sql_delay int not null default 0 - `)) - } - { - statement := ` - ALTER TABLE - database_instance - ADD INDEX source_host_port_idx (source_host, source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } - { - statement := ` - ALTER TABLE - topology_recovery - ADD KEY last_detection_idx (last_detection_id) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - create index - last_detection_idx_topology_recovery - on topology_recovery (last_detection_id) - `)) - } - -} - -func TestCreateIndex(t *testing.T) { - { - statement := ` - create index - source_host_port_idx_database_instance - on database_instance (source_host(128), source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } -} - -func TestIsInsert(t *testing.T) { - require.True(t, IsInsert("insert into t")) - require.True(t, IsInsert("insert ignore into t")) - require.True(t, IsInsert(` - insert ignore into t - `)) - require.False(t, IsInsert("where create table t(id int)")) - require.False(t, IsInsert("create table t(id int)")) - require.True(t, IsInsert(` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - `)) -} - -func TestToSqlite3Insert(t *testing.T) { - { - statement := ` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - replace into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - `)) - } -} - -func TestToSqlite3GeneralConversions(t *testing.T) { - { - statement := "select now()" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now')") - } - { - statement := "select now() - interval ? second" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now', printf('-%d second', ?))") - } - { - statement := "select now() + interval ? minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now', printf('+%d minute', ?))") - } - { - statement := "select now() + interval 5 minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now', '+5 minute')") - } - { - statement := "select some_table.some_column + interval ? minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime(some_table.some_column, printf('+%d minute', ?))") - } - { - statement := "AND primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "AND primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d minute', ?))") - } - { - statement := "select concat(primary_instance.port, '') as port" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select (primary_instance.port || '') as port") - } - { - statement := "select concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select ('abc' || 'def') as s") - } - { - statement := "select concat( 'abc' , 'def', last.col) as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select ('abc' || 'def' || last.col) as s") - } - { - statement := "select concat(myself.only) as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select concat(myself.only) as s") - } - { - statement := "select concat(1, '2', 3, '4') as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select concat(1, '2', 3, '4') as s") - } - { - statement := "select group_concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select group_concat( 'abc' , 'def') as s") - } -} - -func TestIsCreateIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"create index my_index on my_table(column);", true}, - {"CREATE INDEX my_index ON my_table(column);", true}, - {"create unique index my_index on my_table(column);", true}, - {"CREATE UNIQUE INDEX my_index ON my_table(column);", true}, - {"create index my_index on my_table(column) where condition;", true}, - {"create unique index my_index on my_table(column) where condition;", true}, - {"create table my_table(column);", false}, - {"drop index my_index on my_table;", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsCreateIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestIsDropIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"drop index my_index on my_table;", true}, - {"DROP INDEX my_index ON my_table;", true}, - {"drop index if exists my_index on my_table;", true}, - {"DROP INDEX IF EXISTS my_index ON my_table;", true}, - {"drop table my_table;", false}, - {"create index my_index on my_table(column);", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsDropIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestToSqlite3Dialect(t *testing.T) { - tests := []struct { - input string - expected string - }{ - {"create table my_table(id int);", "create table my_table(id int);"}, - {"alter table my_table add column new_col int;", "alter table my_table add column new_col int;"}, - {"insert into my_table values (1);", "insert into my_table values (1);"}, - {"", ""}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := ToSqlite3Dialect(test.input, true) - assert.Equal(t, test.expected, result) - }) - } -} - -func buildToSqlite3Dialect_Insert(instances int) string { - var rows []string - for i := 0; i < instances; i++ { - rows = append(rows, `(?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())`) - } - - return fmt.Sprintf(`INSERT ignore INTO database_instance - (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, - version, major_version, version_comment, binlog_server, read_only, binlog_format, - binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, - replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, - source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - %s - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_net_timeout=VALUES(replica_net_timeout), heartbeat_interval=VALUES(heartbeat_interval), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), - semi_sync_enforced=VALUES(semi_sync_enforced), semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) - `, strings.Join(rows, "\n\t\t\t\t")) -} - -func BenchmarkToSqlite3Dialect_Insert1000(b *testing.B) { - for i := 0; i < b.N; i++ { - b.StopTimer() - statement := buildToSqlite3Dialect_Insert(1000) - b.StartTimer() - ToSqlite3Dialect(statement, true) - } -} - -func BenchmarkToSqlite3Dialect_Select(b *testing.B) { - for i := 0; i < b.N; i++ { - statement := "select now() - interval ? second" - ToSqlite3Dialect(statement, false) - } -} diff --git a/go/vt/vtorc/db/db.go b/go/vt/vtorc/db/db.go index 097d3732797..688cc2039b8 100644 --- a/go/vt/vtorc/db/db.go +++ b/go/vt/vtorc/db/db.go @@ -18,7 +18,6 @@ package db import ( "database/sql" - "strings" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" @@ -68,21 +67,13 @@ func OpenVTOrc() (db *sql.DB, err error) { return db, err } -func translateStatement(statement string) string { - return sqlutils.ToSqlite3Dialect(statement, true /* potentiallyDMLOrDDL */) -} - -func translateQueryStatement(statement string) string { - return sqlutils.ToSqlite3Dialect(statement, false /* potentiallyDMLOrDDL */) -} - // registerVTOrcDeployment updates the vtorc_db_deployments table upon successful deployment func registerVTOrcDeployment(db *sql.DB) error { query := ` replace into vtorc_db_deployments ( deployed_version, deployed_timestamp ) values ( - ?, NOW() + ?, datetime('now') ) ` if _, err := execInternal(db, query, ""); err != nil { @@ -97,25 +88,12 @@ func deployStatements(db *sql.DB, queries []string) error { tx, err := db.Begin() if err != nil { log.Fatal(err.Error()) + return err } for _, query := range queries { - query = translateStatement(query) if _, err := tx.Exec(query); err != nil { - if strings.Contains(err.Error(), "syntax error") { - log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) - return err - } - if !sqlutils.IsAlterTable(query) && !sqlutils.IsCreateIndex(query) && !sqlutils.IsDropIndex(query) { - log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) - return err - } - if !strings.Contains(err.Error(), "duplicate column name") && - !strings.Contains(err.Error(), "Duplicate column name") && - !strings.Contains(err.Error(), "check that column/key exists") && - !strings.Contains(err.Error(), "already exists") && - !strings.Contains(err.Error(), "Duplicate key name") { - log.Errorf("Error initiating vtorc: %+v; query=%+v", err, query) - } + log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) + return err } } if err := tx.Commit(); err != nil { @@ -150,7 +128,6 @@ func initVTOrcDB(db *sql.DB) error { // execInternal func execInternal(db *sql.DB, query string, args ...any) (sql.Result, error) { var err error - query = translateStatement(query) res, err := sqlutils.ExecNoPrepare(db, query, args...) return res, err } @@ -166,7 +143,6 @@ func ExecVTOrc(query string, args ...any) (sql.Result, error) { // QueryVTOrcRowsMap func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { - query = translateQueryStatement(query) db, err := OpenVTOrc() if err != nil { return err @@ -177,7 +153,6 @@ func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { // QueryVTOrc func QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error { - query = translateQueryStatement(query) db, err := OpenVTOrc() if err != nil { return err diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index ecb47804306..1ef9c5e2aec 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -94,13 +94,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna IFNULL( primary_instance.binary_log_file = database_instance_stale_binlog_coordinates.binary_log_file AND primary_instance.binary_log_pos = database_instance_stale_binlog_coordinates.binary_log_pos - AND database_instance_stale_binlog_coordinates.first_seen < NOW() - interval ? second, + AND database_instance_stale_binlog_coordinates.first_seen < datetime('now', printf('-%d second', ?)), 0 ) ) AS is_stale_binlog_coordinates, MIN( primary_instance.last_checked <= primary_instance.last_seen - and primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? second + and primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d second', ?)) ) = 1 AS is_last_check_valid, /* To be considered a primary, traditional async replication must not be present/valid AND the host should either */ /* not be a replication group member OR be the primary of the replication group */ @@ -684,7 +684,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC sqlResult, err := db.ExecVTOrc(` update database_instance_last_analysis set analysis = ?, - analysis_timestamp = now() + analysis_timestamp = datetime('now') where alias = ? and analysis != ? @@ -709,10 +709,10 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC if !lastAnalysisChanged { // The insert only returns more than 1 row changed if this is the first insertion. sqlResult, err := db.ExecVTOrc(` - insert ignore into database_instance_last_analysis ( + insert or ignore into database_instance_last_analysis ( alias, analysis_timestamp, analysis ) values ( - ?, now(), ? + ?, datetime('now'), ? ) `, tabletAlias, string(analysisCode), @@ -738,7 +738,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC insert into database_instance_analysis_changelog ( alias, analysis_timestamp, analysis ) values ( - ?, now(), ? + ?, datetime('now'), ? ) `, tabletAlias, string(analysisCode), @@ -757,7 +757,7 @@ func ExpireInstanceAnalysisChangelog() error { delete from database_instance_analysis_changelog where - analysis_timestamp < now() - interval ? hour + analysis_timestamp < datetime('now', printf('-%d hour', ?)) `, config.UnseenInstanceForgetHours, ) diff --git a/go/vt/vtorc/inst/audit_dao.go b/go/vt/vtorc/inst/audit_dao.go index 47435be82a0..60ef08f24b7 100644 --- a/go/vt/vtorc/inst/audit_dao.go +++ b/go/vt/vtorc/inst/audit_dao.go @@ -60,7 +60,7 @@ func AuditOperation(auditType string, tabletAlias string, message string) error into audit ( audit_timestamp, audit_type, alias, keyspace, shard, message ) VALUES ( - NOW(), ?, ?, ?, ?, ? + datetime('now'), ?, ?, ?, ?, ? ) `, auditType, diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index d0a08f5afb7..e72ff2f0da9 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -111,9 +111,11 @@ func ExecDBWriteFunc(f func() error) error { } func ExpireTableData(tableName string, timestampColumn string) error { - query := fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn) writeFunc := func() error { - _, err := db.ExecVTOrc(query, config.Config.AuditPurgeDays) + _, err := db.ExecVTOrc( + fmt.Sprintf("delete from %s where %s < datetime('now', printf('-%%d DAY', ?))", tableName, timestampColumn), + config.Config.AuditPurgeDays, + ) return err } return ExecDBWriteFunc(writeFunc) @@ -574,9 +576,9 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In query := fmt.Sprintf(` select *, - unix_timestamp() - unix_timestamp(last_checked) as seconds_since_last_checked, + strftime('%%s', 'now') - strftime('%%s', last_checked) as seconds_since_last_checked, ifnull(last_checked <= last_seen, 0) as is_last_check_valid, - unix_timestamp() - unix_timestamp(last_seen) as seconds_since_last_seen + strftime('%%s', 'now') - strftime('%%s', last_seen) as seconds_since_last_seen from vitess_tablet left join database_instance using (alias, hostname, port) @@ -657,11 +659,11 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) and ( (last_seen < last_checked) - or (unix_timestamp() - unix_timestamp(last_checked) > ?) + or (strftime('%%s', 'now') - strftime('%%s', last_checked) > ?) or (replication_sql_thread_state not in (-1 ,1)) or (replication_io_thread_state not in (-1 ,1)) - or (abs(cast(replication_lag_seconds as signed) - cast(sql_delay as signed)) > ?) - or (abs(cast(replica_lag_seconds as signed) - cast(sql_delay as signed)) > ?) + or (abs(cast(replication_lag_seconds as integer) - cast(sql_delay as integer)) > ?) + or (abs(cast(replica_lag_seconds as integer) - cast(sql_delay as integer)) > ?) or (gtid_errant != '') ) ` @@ -723,8 +725,8 @@ func ReadOutdatedInstanceKeys() ([]string, error) { WHERE CASE WHEN last_attempted_check <= last_checked - THEN last_checked < now() - interval ? second - ELSE last_checked < now() - interval ? second + THEN last_checked < datetime('now', printf('-%d second', ?)) + ELSE last_checked < datetime('now', printf('-%d second', ?)) END UNION SELECT @@ -753,7 +755,7 @@ func ReadOutdatedInstanceKeys() ([]string, error) { return res, err } -func mkInsertOdku(table string, columns []string, values []string, nrRows int, insertIgnore bool) (string, error) { +func mkInsert(table string, columns []string, values []string, nrRows int, insertIgnore bool) (string, error) { if len(columns) == 0 { return "", errors.New("Column list cannot be empty") } @@ -765,9 +767,9 @@ func mkInsertOdku(table string, columns []string, values []string, nrRows int, i } var q strings.Builder - var ignore string + insertStr := "REPLACE INTO" if insertIgnore { - ignore = "ignore" + insertStr = "INSERT OR IGNORE INTO" } valRow := fmt.Sprintf("(%s)", strings.Join(values, ", ")) var val strings.Builder @@ -778,26 +780,17 @@ func mkInsertOdku(table string, columns []string, values []string, nrRows int, i } col := strings.Join(columns, ", ") - var odku strings.Builder - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", columns[0], columns[0])) - for _, c := range columns[1:] { - odku.WriteString(", ") - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", c, c)) - } - - q.WriteString(fmt.Sprintf(`INSERT %s INTO %s + q.WriteString(fmt.Sprintf(`%s %s (%s) VALUES %s - ON DUPLICATE KEY UPDATE - %s `, - ignore, table, col, val.String(), odku.String())) + insertStr, table, col, val.String())) return q.String(), nil } -func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bool, updateLastSeen bool) (string, []any, error) { +func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, updateLastSeen bool) (string, []any, error) { if len(instances) == 0 { return "", nil, nil } @@ -876,19 +869,19 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo for i := range columns { values[i] = "?" } - values[3] = "NOW()" // last_checked - values[4] = "NOW()" // last_attempted_check - values[5] = "1" // last_check_partial_success + values[3] = "datetime('now')" // last_checked + values[4] = "datetime('now')" // last_attempted_check + values[5] = "1" // last_check_partial_success if updateLastSeen { columns = append(columns, "last_seen") - values = append(values, "NOW()") + values = append(values, "datetime('now')") } var args []any for _, instance := range instances { // number of columns minus 2 as last_checked and last_attempted_check - // updated with NOW() + // updated with datetime('now') args = append(args, instance.InstanceAlias) args = append(args, instance.Hostname) args = append(args, instance.Port) @@ -951,7 +944,7 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo args = append(args, instance.LastDiscoveryLatency.Nanoseconds()) } - sql, err := mkInsertOdku("database_instance", columns, values, len(instances), insertIgnore) + sql, err := mkInsert("database_instance", columns, values, len(instances), insertIgnore) if err != nil { errMsg := fmt.Sprintf("Failed to build query: %v", err) log.Errorf(errMsg) @@ -973,7 +966,7 @@ func writeManyInstances(instances []*Instance, instanceWasActuallyFound bool, up if len(writeInstances) == 0 { return nil // nothing to write } - sql, args, err := mkInsertOdkuForInstances(writeInstances, instanceWasActuallyFound, updateLastSeen) + sql, args, err := mkInsertForInstances(writeInstances, instanceWasActuallyFound, updateLastSeen) if err != nil { return err } @@ -1000,7 +993,7 @@ func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { update database_instance set - last_checked = NOW(), + last_checked = datetime('now'), last_check_partial_success = ? where alias = ?`, @@ -1029,7 +1022,7 @@ func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { update database_instance set - last_attempted_check = NOW() + last_attempted_check = datetime('now') where alias = ?`, tabletAlias, @@ -1104,7 +1097,7 @@ func ForgetLongUnseenInstances() error { delete from database_instance where - last_seen < NOW() - interval ? hour`, + last_seen < datetime('now', printf('-%d hour', ?))`, config.UnseenInstanceForgetHours, ) if err != nil { @@ -1126,11 +1119,11 @@ func ForgetLongUnseenInstances() error { func SnapshotTopologies() error { writeFunc := func() error { _, err := db.ExecVTOrc(` - insert ignore into + insert or ignore into database_instance_topology_history (snapshot_unix_timestamp, alias, hostname, port, source_host, source_port, keyspace, shard, version) select - UNIX_TIMESTAMP(NOW()), + strftime('%s', 'now'), vitess_tablet.alias, vitess_tablet.hostname, vitess_tablet.port, database_instance.source_host, database_instance.source_port, vitess_tablet.keyspace, vitess_tablet.shard, database_instance.version @@ -1171,12 +1164,12 @@ func RecordStaleInstanceBinlogCoordinates(tabletAlias string, binlogCoordinates return err } _, err = db.ExecVTOrc(` - insert ignore into + insert or ignore into database_instance_stale_binlog_coordinates ( alias, binary_log_file, binary_log_pos, first_seen ) values ( - ?, ?, ?, NOW() + ?, ?, ?, datetime('now') )`, args...) if err != nil { @@ -1193,7 +1186,7 @@ func ExpireStaleInstanceBinlogCoordinates() error { writeFunc := func() error { _, err := db.ExecVTOrc(` delete from database_instance_stale_binlog_coordinates - where first_seen < NOW() - INTERVAL ? SECOND + where first_seen < datetime('now', printf('-%d second', ?)) `, expireSeconds, ) if err != nil { diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 549389f91fe..14b1bdf36d5 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -49,57 +49,44 @@ func mkTestInstances() []*Instance { return instances } -func TestMkInsertOdkuSingle(t *testing.T) { +func TestMkInsertSingle(t *testing.T) { instances := mkTestInstances() - sql, args, err := mkInsertOdkuForInstances(nil, true, true) + sql, args, err := mkInsertForInstances(nil, true, true) require.NoError(t, err) require.Equal(t, sql, "") require.Equal(t, len(args), 0) // one instance - s1 := `INSERT ignore INTO database_instance + s1 := `INSERT OR IGNORE INTO database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), - semi_sync_enforced=VALUES(semi_sync_enforced), semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + VALUES (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ` a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,` - sql1, args1, err := mkInsertOdkuForInstances(instances[:1], false, true) + sql1, args1, err := mkInsertForInstances(instances[:1], false, true) require.NoError(t, err) require.Equal(t, normalizeQuery(sql1), normalizeQuery(s1)) require.Equal(t, stripSpaces(fmtArgs(args1)), stripSpaces(a1)) } -func TestMkInsertOdkuThree(t *testing.T) { +func TestMkInsertThree(t *testing.T) { instances := mkTestInstances() // three instances - s3 := `INSERT INTO database_instance + s3 := `REPLACE INTO database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), - physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), - semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + VALUES (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ` a3 := ` zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, @@ -107,7 +94,7 @@ func TestMkInsertOdkuThree(t *testing.T) { zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, ` - sql3, args3, err := mkInsertOdkuForInstances(instances[:3], true, true) + sql3, args3, err := mkInsertForInstances(instances[:3], true, true) require.NoError(t, err) require.Equal(t, normalizeQuery(sql3), normalizeQuery(s3)) require.Equal(t, stripSpaces(fmtArgs(args3)), stripSpaces(a3)) @@ -474,27 +461,27 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { }{ { name: "No problems", - sql: []string{"update database_instance set last_checked = now()"}, + sql: []string{"update database_instance set last_checked = datetime('now')"}, instancesRequired: nil, }, { name: "One instance is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", }, instancesRequired: []string{"zone1-0000000100"}, }, { name: "One instance doesn't have myql data", sql: []string{ - "update database_instance set last_checked = now()", + "update database_instance set last_checked = datetime('now')", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103"}, }, { name: "One instance doesn't have myql data and one is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103", "zone1-0000000100"}, @@ -531,10 +518,10 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { errInDataCollection := db.QueryVTOrcRowsMap(`select alias, last_checked, last_attempted_check, -ROUND((JULIANDAY(now()) - JULIANDAY(last_checked)) * 86400) AS difference, +ROUND((JULIANDAY(datetime('now')) - JULIANDAY(last_checked)) * 86400) AS difference, last_attempted_check <= last_checked as use1, -last_checked < now() - interval 1500 second as is_outdated1, -last_checked < now() - interval 3000 second as is_outdated2 +last_checked < datetime('now', '-1500 second') as is_outdated1, +last_checked < datetime('now', '-3000 second') as is_outdated2 from database_instance`, func(rowMap sqlutils.RowMap) error { log.Errorf("Row in database_instance - %+v", rowMap) return nil @@ -558,12 +545,12 @@ func TestUpdateInstanceLastChecked(t *testing.T) { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", partialSuccess: false, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = false", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = false", }, { name: "Verify partial success", tabletAlias: "zone1-0000000100", partialSuccess: true, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = true", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -609,7 +596,7 @@ func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", - conditionToCheck: "last_attempted_check >= now() - interval 30 second", + conditionToCheck: "last_attempted_check >= datetime('now', '-30 second')", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", diff --git a/go/vt/vtorc/logic/disable_recovery.go b/go/vt/vtorc/logic/disable_recovery.go index 4a3766055d2..44e4f5a66ff 100644 --- a/go/vt/vtorc/logic/disable_recovery.go +++ b/go/vt/vtorc/logic/disable_recovery.go @@ -63,7 +63,7 @@ func IsRecoveryDisabled() (disabled bool, err error) { // DisableRecovery ensures recoveries are disabled globally func DisableRecovery() error { _, err := db.ExecVTOrc(` - INSERT IGNORE INTO global_recovery_disable + INSERT OR IGNORE INTO global_recovery_disable (disable_recovery) VALUES (1) `, diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 4a7a6c77ef1..37947d53af3 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -41,14 +41,14 @@ func AttemptFailureDetectionRegistration(analysisEntry *inst.ReplicationAnalysis analysisEntry.CountReplicas, analysisEntry.IsActionableRecovery, ) - startActivePeriodHint := "now()" + startActivePeriodHint := "datetime('now')" if analysisEntry.StartActivePeriod != "" { startActivePeriodHint = "?" args = append(args, analysisEntry.StartActivePeriod) } query := fmt.Sprintf(` - insert ignore + insert or ignore into topology_failure_detection ( alias, in_active_period, @@ -95,10 +95,10 @@ func ClearActiveFailureDetections() error { _, err := db.ExecVTOrc(` update topology_failure_detection set in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() + end_active_period_unixtime = strftime('%%s', 'now') where in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? MINUTE + AND start_active_period < datetime('now', printf('-%d MINUTE', ?)) `, config.FailureDetectionPeriodBlockMinutes, ) @@ -111,7 +111,7 @@ func ClearActiveFailureDetections() error { func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecovery, error) { analysisEntry := topologyRecovery.AnalysisEntry sqlResult, err := db.ExecVTOrc(` - insert ignore + insert or ignore into topology_recovery ( recovery_id, uid, @@ -131,7 +131,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ?, ?, 1, - NOW(), + datetime('now'), 0, ?, ?, @@ -225,10 +225,10 @@ func ClearActiveRecoveries() error { _, err := db.ExecVTOrc(` update topology_recovery set in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() + end_active_period_unixtime = strftime('%%s', 'now') where in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? SECOND + AND start_active_period < datetime('now', printf('-%d SECOND', ?)) `, config.Config.RecoveryPeriodBlockSeconds, ) @@ -243,7 +243,7 @@ func ClearActiveRecoveries() error { func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*TopologyRecovery) error { for _, recovery := range blockingRecoveries { _, err := db.ExecVTOrc(` - insert + replace into blocked_topology_recovery ( alias, keyspace, @@ -256,15 +256,15 @@ func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blocking ?, ?, ?, - NOW(), + datetime('now'), ? ) - on duplicate key update - keyspace=values(keyspace), - shard=values(shard), - analysis=values(analysis), - last_blocked_timestamp=values(last_blocked_timestamp), - blocking_recovery_id=values(blocking_recovery_id) + on conflict(alias) do update set + keyspace=keyspace, + shard=shard, + analysis=analysis, + last_blocked_timestamp=last_blocked_timestamp, + blocking_recovery_id=blocking_recovery_id `, analysisEntry.AnalyzedInstanceAlias, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, @@ -325,7 +325,7 @@ func ExpireBlockedRecoveries() error { delete from blocked_topology_recovery where - last_blocked_timestamp < NOW() - interval ? second + last_blocked_timestamp < datetime('now', printf('-%d second', ?)) `, config.Config.RecoveryPollSeconds*2, ) if err != nil { @@ -339,16 +339,16 @@ func acknowledgeRecoveries(owner string, comment string, markEndRecovery bool, w additionalSet := `` if markEndRecovery { additionalSet = ` - end_recovery=IFNULL(end_recovery, NOW()), + end_recovery=IFNULL(end_recovery, datetime('now')), ` } query := fmt.Sprintf(` update topology_recovery set in_active_period = 0, - end_active_period_unixtime = case when end_active_period_unixtime = 0 then UNIX_TIMESTAMP() else end_active_period_unixtime end, + end_active_period_unixtime = case when end_active_period_unixtime = 0 then strftime('%%s', 'now') else end_active_period_unixtime end, %s acknowledged = 1, - acknowledged_at = NOW(), + acknowledged_at = datetime('now'), acknowledged_by = ?, acknowledge_comment = ? where @@ -399,7 +399,7 @@ func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { is_successful = ?, successor_alias = ?, all_errors = ?, - end_recovery = NOW() + end_recovery = datetime('now') where uid = ? `, topologyRecovery.IsSuccessful, @@ -531,10 +531,10 @@ func ReadRecentRecoveries(unacknowledgedOnly bool, page int) ([]*TopologyRecover // writeTopologyRecoveryStep writes down a single step in a recovery process func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error { sqlResult, err := db.ExecVTOrc(` - insert ignore + insert or ignore into topology_recovery_steps ( recovery_step_id, recovery_uid, audit_at, message - ) values (?, ?, now(), ?) + ) values (?, ?, datetime('now'), ?) `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryUID, topologyRecoveryStep.Message, ) if err != nil { diff --git a/go/vt/vtorc/process/election_dao.go b/go/vt/vtorc/process/election_dao.go index f723bd48dde..1009c49b56a 100644 --- a/go/vt/vtorc/process/election_dao.go +++ b/go/vt/vtorc/process/election_dao.go @@ -28,10 +28,10 @@ import ( func AttemptElection() (bool, error) { { sqlResult, err := db.ExecVTOrc(` - insert ignore into active_node ( + insert or ignore into active_node ( anchor, hostname, token, first_seen_active, last_seen_active ) values ( - 1, ?, ?, now(), now() + 1, ?, ?, datetime('now'), datetime('now') ) `, ThisHostname, util.ProcessToken.Hash, diff --git a/go/vt/vtorc/process/health_dao.go b/go/vt/vtorc/process/health_dao.go index 59ea557223d..7cab623994f 100644 --- a/go/vt/vtorc/process/health_dao.go +++ b/go/vt/vtorc/process/health_dao.go @@ -37,10 +37,10 @@ func WriteRegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { nodeHealth.onceHistory.Do(func() { _, _ = db.ExecVTOrc(` - insert ignore into node_health_history + insert or ignore into node_health_history (hostname, token, first_seen_active, extra_info, command, app_version) values - (?, ?, NOW(), ?, ?, ?) + (?, ?, datetime('now'), ?, ?, ?) `, nodeHealth.Hostname, nodeHealth.Token, nodeHealth.ExtraInfo, nodeHealth.Command, nodeHealth.AppVersion, @@ -79,11 +79,11 @@ func WriteRegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { { dbBackend := config.Config.SQLite3DataFile sqlResult, err := db.ExecVTOrc(` - insert ignore into node_health + insert or ignore into node_health (hostname, token, first_seen_active, last_seen_active, extra_info, command, app_version, db_backend) values ( ?, ?, - now() - interval ? second, now() - interval ? second, + datetime('now', printf('-%d second', ?)), datetime('now', printf('-%d second', ?)), ?, ?, ?, ?) `, nodeHealth.Hostname, nodeHealth.Token,