diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index e8b9c5c671..f01a030ee2 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -1092,6 +1092,8 @@ class MySQL_HostGroups_Manager { void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); unsigned long long Get_Memory_Stats(); + void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector>& new_servers); + void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 8454cf939e..4116ac3898 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -56,6 +56,9 @@ struct cmp_str { #define N_L_ASE 16 +#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com" +#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology" + /* Implementation of monitoring in AWS Aurora will be different than previous modules @@ -197,7 +200,8 @@ enum MySQL_Monitor_State_Data_Task_Type { MON_GROUP_REPLICATION, MON_REPLICATION_LAG, MON_GALERA, - MON_AWS_AURORA + MON_AWS_AURORA, + MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY }; enum class MySQL_Monitor_State_Data_Task_Result { @@ -229,6 +233,7 @@ class MySQL_Monitor_State_Data { char *hostname; int port; int writer_hostgroup; // used only by group replication + int reader_hostgroup; bool writer_is_also_reader; // used only by group replication int max_transactions_behind; // used only by group replication int max_transactions_behind_count; // used only by group replication @@ -442,6 +447,7 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); + void process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, int reader_hostgroup); private: std::vector *tables_defs_monitor; @@ -553,7 +559,7 @@ class MySQL_Monitor { * Note: Calling init_async is mandatory before executing tasks asynchronously. */ void monitor_ping_async(SQLite3_result* resultset); - void monitor_read_only_async(SQLite3_result* resultset); + void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check); void monitor_replication_lag_async(SQLite3_result* resultset); void monitor_group_replication_async(); void monitor_galera_async(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 86fdd26348..906b4ab433 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -304,6 +304,7 @@ struct p_th_gauge { mysql_monitor_ping_interval, mysql_monitor_ping_timeout, mysql_monitor_ping_max_failures, + mysql_monitor_aws_rds_topology_discovery_interval, mysql_monitor_read_only_interval, mysql_monitor_read_only_timeout, mysql_monitor_writer_is_also_reader, @@ -385,6 +386,8 @@ class MySQL_Threads_Handler int monitor_ping_max_failures; //! Monitor ping timeout. Unit: 'ms'. int monitor_ping_timeout; + //! Monitor aws rds topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'. + int monitor_aws_rds_topology_discovery_interval; //! Monitor read only timeout. Unit: 'ms'. int monitor_read_only_interval; //! Monitor read only timeout. Unit: 'ms'. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index be78d1c69c..28f98ac8c5 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -902,6 +902,7 @@ __thread int mysql_thread___monitor_connect_timeout; __thread int mysql_thread___monitor_ping_interval; __thread int mysql_thread___monitor_ping_max_failures; __thread int mysql_thread___monitor_ping_timeout; +__thread int mysql_thread___monitor_aws_rds_topology_discovery_interval; __thread int mysql_thread___monitor_read_only_interval; __thread int mysql_thread___monitor_read_only_timeout; __thread int mysql_thread___monitor_read_only_max_timeout_count; @@ -1073,6 +1074,7 @@ extern __thread int mysql_thread___monitor_connect_timeout; extern __thread int mysql_thread___monitor_ping_interval; extern __thread int mysql_thread___monitor_ping_max_failures; extern __thread int mysql_thread___monitor_ping_timeout; +extern __thread int mysql_thread___monitor_aws_rds_topology_discovery_interval; extern __thread int mysql_thread___monitor_read_only_interval; extern __thread int mysql_thread___monitor_read_only_timeout; extern __thread int mysql_thread___monitor_read_only_max_timeout_count; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 37c848fece..b185416af8 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -2261,6 +2261,7 @@ bool MySQL_HostGroups_Manager::commit( // fill Hostgroup_Manager_Mapping with latest records update_hostgroup_manager_mappings(); + ev_async_send(gtid_ev_loop, gtid_ev_async); __sync_fetch_and_add(&status.servers_table_version,1); @@ -8583,3 +8584,80 @@ MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *h } return NULL; } + +/** +* @brief Updates replication hostgroups by adding autodiscovered mysql servers. +* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table. +* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'. +* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server. +*/ +void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups( + const vector>& new_servers +) { + int added_new_server = -1; + + GloAdmin->mysql_servers_wrlock(); + wrlock(); + + // Add the discovered server with default values + for (const tuple& s : new_servers) { + string host = std::get<0>(s); + uint16_t port = std::get<1>(s); + long int hostgroup_id = std::get<2>(s); + + srv_info_t srv_info { host.c_str(), port, "AWS RDS" }; + srv_opts_t srv_opts { -1, -1, -1 }; + + added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts); + } + + // If servers were added, perform necessary updates to internal structures + if (added_new_server > -1) { + purge_mysql_servers_table(); + mydb->execute("DELETE FROM mysql_servers"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); + generate_mysql_servers_table(); + + // Update the global checksums after 'mysql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; + string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; + save_runtime_mysql_servers(resultset.release()); + + // Update the runtime_mysql_servers checksum with the new checksum + uint64_t raw_checksum = this->runtime_mysql_servers ? this->runtime_mysql_servers->raw_checksum() : 0; + table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = raw_checksum; + + // This is required for preserving coherence in the checksums, otherwise they would be inconsistent with `commit` generated checksums + SpookyHash rep_hgs_hash {}; + bool init = false; + uint64_t servers_v2_hash = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_V2]; + + if (servers_v2_hash) { + if (init == false) { + init = true; + rep_hgs_hash.Init(19, 3); + } + + rep_hgs_hash.Update(&servers_v2_hash, sizeof(servers_v2_hash)); + } + + CUCFT1( + rep_hgs_hash, init, "mysql_replication_hostgroups", "writer_hostgroup", + table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS] + ); + + proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str()); + + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_mysql_servers_checksum(mysrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); + } + + update_table_mysql_servers_for_monitor(false); + update_hostgroup_manager_mappings(); + } + + wrunlock(); + GloAdmin->mysql_servers_wrunlock(); +} diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index c58e0ccd55..a943f10a98 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -616,6 +616,12 @@ void MySQL_Monitor_State_Data::init_async() { task_timeout_ = mysql_thread___monitor_read_only_timeout; task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; break; + case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY: + query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY; + async_state_machine_ = ASYNC_QUERY_START; + task_timeout_ = mysql_thread___monitor_read_only_timeout; + task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; + break; #else // TEST_READONLY case MON_READ_ONLY: case MON_INNODB_READ_ONLY: @@ -1623,6 +1629,8 @@ void * monitor_read_only_thread(void *arg) { mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only&@@global.innodb_read_only read_only"); } else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) { mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only"); + } else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY); } else { // default mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only"); } @@ -3282,6 +3290,68 @@ VALGRIND_ENABLE_ERROR_REPORTING; return ret; } +/** +* @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'. +* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers'. +* @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found. +* @param discovered_servers A vector of servers discovered when querying the cluster's topology. +* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers. +*/ +void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, int reader_hostgroup) { + char *error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result *runtime_mysql_servers = NULL; + + char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers); + + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + vector> new_servers; + vector saved_hostnames; + saved_hostnames.push_back(originating_server_hostname); + + // Do an initial loop through the query results to save existing runtime server hostnames + for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { + SQLite3_row *r1 = *it; + string current_runtime_hostname = r1->fields[0]; + + saved_hostnames.push_back(current_runtime_hostname); + } + + // Loop through discovered servers and process the ones we haven't saved yet + for (MYSQL_ROW s : discovered_servers) { + string current_discovered_hostname = s[2]; + string current_discovered_port_string = s[3]; + int current_discovered_port_int; + + try { + current_discovered_port_int = stoi(s[3]); + } catch (...) { + proxy_error( + "Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n", + originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str() + ); + return; + } + + if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) { + tuple new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup); + new_servers.push_back(new_server); + saved_hostnames.push_back(current_discovered_hostname); + } + } + + // Add the new servers if any + if (!new_servers.empty()) { + MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers); + } + } +} + void * MySQL_Monitor::monitor_read_only() { mysql_close(mysql_init(NULL)); // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) @@ -3295,14 +3365,17 @@ void * MySQL_Monitor::monitor_read_only() { unsigned long long t1; unsigned long long t2; unsigned long long next_loop_at=0; + int topology_loop = 0; + int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval; while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) { + bool do_discovery_check = false; unsigned int glover; char *error=NULL; SQLite3_result *resultset=NULL; // add support for SSL - char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()"; + char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()"; t1=monotonic_time(); if (!GloMTH) return NULL; // quick exit during shutdown/restart @@ -3313,6 +3386,7 @@ void * MySQL_Monitor::monitor_read_only() { next_loop_at=0; } + if (t1 < next_loop_at) { goto __sleep_monitor_read_only; } @@ -3329,8 +3403,14 @@ void * MySQL_Monitor::monitor_read_only() { goto __end_monitor_read_only_loop; } + if (topology_loop >= topology_loop_max) { + do_discovery_check = true; + topology_loop = 0; + } + topology_loop += 1; + // resultset must be initialized before calling monitor_read_only_async - monitor_read_only_async(resultset); + monitor_read_only_async(resultset, do_discovery_check); if (shutdown) return NULL; __end_monitor_read_only_loop: @@ -7199,7 +7279,7 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector mysql_servers; for (auto& mmsd : mmsds) { - + string originating_server_hostname = mmsd->hostname; const auto task_result = mmsd->get_task_result(); assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING); @@ -7269,6 +7349,38 @@ VALGRIND_ENABLE_ERROR_REPORTING; } rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + // Process the read_only field as above and store the first server + vector discovered_servers; + for (k = 0; k < num_fields; k++) { + if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) { + j = k; + } + } + if (j > -1) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + if (row) { + discovered_servers.push_back(row); +VALGRIND_DISABLE_ERROR_REPORTING; + if (row[j]) { + if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF")) + read_only = 0; + } +VALGRIND_ENABLE_ERROR_REPORTING; + } + } + + // Store the remaining servers + int num_rows = mysql_num_rows(mmsd->result); + for (int i = 1; i < num_rows; i++) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + discovered_servers.push_back(row); + } + + // Process the discovered servers and add them to 'runtime_mysql_servers' + if (!discovered_servers.empty()) { + process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup); + } } else { proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); @@ -7329,7 +7441,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; return true; } -void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) { +void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) { assert(resultset); std::vector> mmsds; @@ -7352,11 +7464,18 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) { } else if (strcasecmp(r->fields[3], (char*)"read_only|innodb_read_only") == 0) { task_type = MON_READ_ONLY__OR__INNODB_READ_ONLY; } + + // Change task type if it's time to do discovery check. Only for aws rds endpoints + string hostname = r->fields[0]; + if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { + task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY; + } } std::unique_ptr mmsd( new MySQL_Monitor_State_Data(task_type, r->fields[0], atoi(r->fields[1]), atoi(r->fields[2]))); + mmsd->reader_hostgroup = atoi(r->fields[4]); // set reader_hostgroup mmsd->mondb = monitordb; mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 48ec0e3ea7..167b2aa733 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -307,6 +307,7 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_ping_interval", (char *)"monitor_ping_max_failures", (char *)"monitor_ping_timeout", + (char *)"monitor_aws_rds_topology_discovery_interval", (char *)"monitor_read_only_interval", (char *)"monitor_read_only_timeout", (char *)"monitor_read_only_max_timeout_count", @@ -823,6 +824,12 @@ th_metrics_map = std::make_tuple( "Reached maximum ping attempts from monitor.", metric_tags {} ), + std::make_tuple ( + p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval, + "proxysql_mysql_monitor_aws_rds_topology_discovery_interval", + "How frequently a topology discovery is performed, e.g. a value of 500 means one topology discovery every 500 read-only checks ", + metric_tags {} + ), std::make_tuple ( p_th_gauge::mysql_monitor_read_only_interval, "proxysql_mysql_monitor_read_only_interval_seconds", @@ -914,6 +921,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_ping_interval=8000; variables.monitor_ping_max_failures=3; variables.monitor_ping_timeout=1000; + variables.monitor_aws_rds_topology_discovery_interval=1000; variables.monitor_read_only_interval=1000; variables.monitor_read_only_timeout=800; variables.monitor_read_only_max_timeout_count=3; @@ -2081,6 +2089,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000*1000, false); + VariablesPointers_int["monitor_aws_rds_topology_discovery_interval"] = make_tuple(&variables.monitor_aws_rds_topology_discovery_interval, 1, 100000, false); VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7*24*3600*1000, false); VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000*1000, false); @@ -4230,6 +4239,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval"); mysql_thread___monitor_ping_max_failures=GloMTH->get_variable_int((char *)"monitor_ping_max_failures"); mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout"); + mysql_thread___monitor_aws_rds_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_aws_rds_topology_discovery_interval"); mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval"); mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout"); mysql_thread___monitor_read_only_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_read_only_max_timeout_count"); @@ -5555,6 +5565,7 @@ void MySQL_Threads_Handler::p_update_metrics() { this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_enabled]->Set(this->variables.monitor_enabled); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_timeout]->Set(this->variables.monitor_ping_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_max_failures]->Set(this->variables.monitor_ping_max_failures); + this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval]->Set(this->variables.monitor_aws_rds_topology_discovery_interval); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_interval]->Set(this->variables.monitor_read_only_interval/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_timeout]->Set(this->variables.monitor_read_only_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_writer_is_also_reader]->Set(this->variables.monitor_writer_is_also_reader); diff --git a/lib/proxysql_utils.cpp b/lib/proxysql_utils.cpp index a5c31b7d52..fe30562ed7 100644 --- a/lib/proxysql_utils.cpp +++ b/lib/proxysql_utils.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include