diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 995e340518..1415884455 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -541,9 +541,10 @@ using address_t = std::string; using port_t = unsigned int; using read_only_t = int; using current_replication_lag = int; +using replace_current_replication_lag = bool; using read_only_server_t = std::tuple; -using replication_lag_server_t = std::tuple; +using replication_lag_server_t = std::tuple; enum READ_ONLY_SERVER_T { ROS_HOSTNAME = 0, @@ -557,6 +558,7 @@ enum REPLICATION_LAG_SERVER_T { RLS_ADDRESS, RLS_PORT, RLS_CURRENT_REPLICATION_LAG, + RLS_OVERRIDE_REPLICATION_LAG, RLS__SIZE }; @@ -1090,7 +1092,7 @@ class MySQL_HostGroups_Manager { void push_MyConn_to_pool_array(MySQL_Connection **, unsigned int); void destroy_MyConn_from_pool(MySQL_Connection *, bool _lock=true); - void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int); + void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int, bool); void replication_lag_action(const std::list& mysql_servers); void read_only_action(char *hostname, int port, int read_only); void read_only_action_v2(const std::list& mysql_servers); diff --git a/include/SQLite3_Server.h b/include/SQLite3_Server.h index 6b0526983e..09fe3f9bc0 100644 --- a/include/SQLite3_Server.h +++ b/include/SQLite3_Server.h @@ -56,7 +56,7 @@ class SQLite3_Server { std::vector *tables_defs_readonly; #endif // TEST_READONLY #ifdef TEST_REPLICATIONLAG - std::unordered_map replicationlag_map; + std::unordered_map> replicationlag_map; std::vector* tables_defs_replicationlag; #endif // TEST_REPLICATIONLAG #if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG) @@ -105,7 +105,7 @@ class SQLite3_Server { #ifdef TEST_REPLICATIONLAG pthread_mutex_t test_replicationlag_mutex; void load_replicationlag_table(MySQL_Session* sess); - int replicationlag_test_value(const char* p); + int* replicationlag_test_value(const char* p); int replicationlag_map_size() { return replicationlag_map.size(); } diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index d380c3b29a..0a724fe922 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -2685,9 +2685,10 @@ void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) { myhgc->mysrvs->add(mysrvc); } -void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port, int current_replication_lag) { +void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port, + int current_replication_lag, bool override_repl_lag) { - if (current_replication_lag == -1) { + if (current_replication_lag == -1 && override_repl_lag == true) { current_replication_lag = myhgc->get_monitor_slave_lag_when_null(); proxy_error("Replication lag on server %s:%d is NULL, using value %d\n", address, port, current_replication_lag); } @@ -2730,7 +2731,7 @@ void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const if ( (current_replication_lag>=0 && ((unsigned int)current_replication_lag <= mysrvc->max_replication_lag)) || - (current_replication_lag==-2) // see issue 959 + (current_replication_lag==-2 && override_repl_lag == true) // see issue 959 ) { mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; proxy_warning("Re-enabling server %s:%d from HG %u with replication lag of %d second\n", address, port, myhgc->hid, current_replication_lag); @@ -2756,18 +2757,19 @@ void MySQL_HostGroups_Manager::replication_lag_action(const std::list(server); const unsigned int port = std::get(server); const int current_replication_lag = std::get(server); + const bool override_repl_lag = std::get(server); if (mysql_thread___monitor_replication_lag_group_by_host == false) { // legacy check. 1 check per server per hostgroup MyHGC *myhgc = MyHGC_find(hid); - replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag); + replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag); } else { // only 1 check per server, no matter the hostgroup // all hostgroups must be searched for (unsigned int i=0; ilen; i++) { MyHGC*myhgc=(MyHGC*)MyHostGroups->index(i); - replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag); + replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag); } } } diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 1034129653..d306f7dbb0 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -2756,6 +2756,7 @@ void * monitor_replication_lag_thread(void *arg) { ASSERT_SQLITE_OK(rc, mmsd->mondb); // 'replication_lag' to be feed to 'replication_lag_action' int repl_lag=-2; + bool override_repl_lag = true; rc=(*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); rc=(*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); unsigned long long time_now=realtime_time(); @@ -2792,14 +2793,16 @@ void * monitor_replication_lag_thread(void *arg) { MYSQL_ROW row=mysql_fetch_row(mmsd->result); if (row) { repl_lag=-1; // this is old behavior + override_repl_lag = true; if (row[j]) { // if Seconds_Behind_Master is not NULL repl_lag=atoi(row[j]); + override_repl_lag = false; } else { MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG); } } } - if (repl_lag>=0) { + if (/*repl_lag >= 0 ||*/ override_repl_lag == false) { rc=(*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb); } else { rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); @@ -2820,7 +2823,7 @@ void * monitor_replication_lag_thread(void *arg) { rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); MyHGM->replication_lag_action( std::list { - replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag} + replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag } } ); (*proxy_sqlite3_finalize)(statement); if (mmsd->mysql_error_msg == NULL) { @@ -7739,8 +7742,7 @@ void MySQL_Monitor::monitor_gr_async_actions_handler( bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vector& mmsds) { - - std::list> mysql_servers; + std::list mysql_servers; for (auto& mmsd : mmsds) { @@ -7782,6 +7784,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto ASSERT_SQLITE_OK(rc, mmsd->mondb); // 'replication_lag' to be feed to 'replication_lag_action' int repl_lag = -2; + bool override_repl_lag = true; rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); unsigned long long time_now = realtime_time(); @@ -7818,14 +7821,16 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto MYSQL_ROW row = mysql_fetch_row(mmsd->result); if (row) { repl_lag = -1; // this is old behavior + override_repl_lag = true; if (row[j]) { // if Seconds_Behind_Master is not NULL repl_lag = atoi(row[j]); + override_repl_lag = false; } else { MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG); } } } - if (repl_lag >= 0) { + if (/*repl_lag >= 0 ||*/ override_repl_lag == false) { rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb); } else { rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); @@ -7847,7 +7852,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); //MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag); (*proxy_sqlite3_finalize)(statement); - mysql_servers.push_back( std::tuple { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag }); + mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag }); } //executing replication lag action diff --git a/src/SQLite3_Server.cpp b/src/SQLite3_Server.cpp index ad269a6078..926e54783e 100644 --- a/src/SQLite3_Server.cpp +++ b/src/SQLite3_Server.cpp @@ -879,11 +879,17 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p // probably never initialized GloSQLite3Server->load_replicationlag_table(sess); } - const int rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS ")); + const int* rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS ")); free(query); - char* a = (char*)"SELECT %d as Seconds_Behind_Master"; - query = (char*)malloc(strlen(a) + 2); - sprintf(query, a, rc); + if (rc == nullptr) { + const char* a = (char*)"SELECT null as Seconds_Behind_Master"; + query = (char*)malloc(strlen(a) + 2); + sprintf(query, a); + } else { + const char* a = (char*)"SELECT %d as Seconds_Behind_Master"; + query = (char*)malloc(strlen(a) + 2); + sprintf(query, a, *rc); + } pthread_mutex_unlock(&GloSQLite3Server->test_replicationlag_mutex); } } @@ -1845,7 +1851,7 @@ bool SQLite3_Server::init() { insert_into_tables_defs(tables_defs_replicationlag, (const char*)"REPLICATIONLAG_HOST_STATUS", (const char*)"CREATE TABLE REPLICATIONLAG_HOST_STATUS (" - "hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT NOT NULL, PRIMARY KEY (hostname, port)" + "hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT DEFAULT NULL, PRIMARY KEY (hostname, port)" ")" ); @@ -2016,7 +2022,14 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) { for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { SQLite3_row* r = *it; const std::string& s = std::string(r->fields[0]) + ":" + std::string(r->fields[1]); - replicationlag_map[s] = atoi(r->fields[2]); + + if (r->fields[2] == nullptr) { + replicationlag_map[s] = nullptr; + } else { + int* repl_lag = new int; + *repl_lag = atoi(r->fields[2]); + replicationlag_map[s] = std::unique_ptr(repl_lag); + } } } delete resultset; @@ -2024,7 +2037,7 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) { GloAdmin->admindb->execute_statement((char*)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE hostgroup_id BETWEEN 5202 AND 5700", &error, &cols, &affected_rows, &resultset); for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { SQLite3_row* r = *it; - const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",0)"; + const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",null)"; sessdb->execute(s.c_str()); } delete resultset; @@ -2032,11 +2045,11 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) { GloAdmin->mysql_servers_wrunlock(); } -int SQLite3_Server::replicationlag_test_value(const char* p) { - int rc = 0; // default - std::unordered_map::iterator it = replicationlag_map.find(std::string(p)); +int* SQLite3_Server::replicationlag_test_value(const char* p) { + int* rc = 0; // default + std::unordered_map>::iterator it = replicationlag_map.find(std::string(p)); if (it != replicationlag_map.end()) { - rc = it->second; + rc = it->second.get(); } return rc; }