From e7aa5ff4c5392e93327e963e91a3fcae516384bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sun, 21 Apr 2024 13:31:06 +0000 Subject: [PATCH] Split of MySQL_HostGroups_Manager.cpp into multiple files First commit to split MySQL_HostGroups_Manager.cpp into multiple files. More to split. --- include/GTID_Server_Data.h | 27 + include/MySQL_HostGroups_Manager.h | 5 +- lib/GTID_Server_Data.cpp | 469 ++++++++++ lib/Makefile | 1 + lib/MyHGC.cpp | 384 +++++++++ lib/MySQL_HostGroups_Manager.cpp | 1272 +--------------------------- lib/MySrvC.cpp | 198 +++++ lib/MySrvConnList.cpp | 256 ++++++ lib/MySrvList.cpp | 44 + 9 files changed, 1385 insertions(+), 1271 deletions(-) create mode 100644 include/GTID_Server_Data.h create mode 100644 lib/GTID_Server_Data.cpp create mode 100644 lib/MyHGC.cpp create mode 100644 lib/MySrvC.cpp create mode 100644 lib/MySrvConnList.cpp create mode 100644 lib/MySrvList.cpp diff --git a/include/GTID_Server_Data.h b/include/GTID_Server_Data.h new file mode 100644 index 0000000000..9bc9219fda --- /dev/null +++ b/include/GTID_Server_Data.h @@ -0,0 +1,27 @@ +#ifndef CLASS_GTID_Server_Data_H +#define CLASS_GTID_Server_Data_H +class GTID_Server_Data { + public: + char *address; + uint16_t port; + uint16_t mysql_port; + char *data; + size_t len; + size_t size; + size_t pos; + struct ev_io *w; + char uuid_server[64]; + unsigned long long events_read; + gtid_set_t gtid_executed; + bool active; + GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port); + void resize(size_t _s); + ~GTID_Server_Data(); + bool readall(); + bool writeout(); + bool read_next_gtid(); + bool gtid_exists(char *gtid_uuid, uint64_t gtid_trxid); + void read_all_gtids(); + void dump(); +}; +#endif // CLASS_GTID_Server_Data_H diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index f01a030ee2..eaa71a293c 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -130,6 +130,9 @@ class MyHGC; std::string gtid_executed_to_string(gtid_set_t& gtid_executed); void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed); +#include "GTID_Server_Data.h" + +/* class GTID_Server_Data { public: char *address; @@ -154,7 +157,7 @@ class GTID_Server_Data { void read_all_gtids(); void dump(); }; - +*/ class MySrvConnList { diff --git a/lib/GTID_Server_Data.cpp b/lib/GTID_Server_Data.cpp new file mode 100644 index 0000000000..6dbf572354 --- /dev/null +++ b/lib/GTID_Server_Data.cpp @@ -0,0 +1,469 @@ +#include "MySQL_HostGroups_Manager.h" + +#include "ev.h" +#include + + +extern ProxySQL_Admin *GloAdmin; + +extern MySQL_Threads_Handler *GloMTH; + +extern MySQL_Monitor *GloMyMon; + +static pthread_mutex_t ev_loop_mutex = PTHREAD_MUTEX_INITIALIZER; + +static void gtid_async_cb(struct ev_loop *loop, struct ev_async *watcher, int revents) { + if (glovars.shutdown) { + ev_break(loop); + } + pthread_mutex_lock(&ev_loop_mutex); + MyHGM->gtid_missing_nodes = false; + MyHGM->generate_mysql_gtid_executed_tables(); + pthread_mutex_unlock(&ev_loop_mutex); + return; +} + +static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int revents) { + if (GloMTH == nullptr) { return; } + ev_timer_stop(loop, timer); + ev_timer_set(timer, __sync_add_and_fetch(&GloMTH->variables.binlog_reader_connect_retry_msec,0)/1000, 0); + if (glovars.shutdown) { + ev_break(loop); + } + if (MyHGM->gtid_missing_nodes) { + pthread_mutex_lock(&ev_loop_mutex); + MyHGM->gtid_missing_nodes = false; + MyHGM->generate_mysql_gtid_executed_tables(); + pthread_mutex_unlock(&ev_loop_mutex); + } + ev_timer_start(loop, timer); + return; +} + +void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { + pthread_mutex_lock(&ev_loop_mutex); + if (revents & EV_READ) { + GTID_Server_Data *sd = (GTID_Server_Data *)w->data; + bool rc = true; + rc = sd->readall(); + if (rc == false) { + //delete sd; + std::string s1 = sd->address; + s1.append(":"); + s1.append(std::to_string(sd->mysql_port)); + MyHGM->gtid_missing_nodes = true; + proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); + std::unordered_map ::iterator it2; + it2 = MyHGM->gtid_map.find(s1); + if (it2 != MyHGM->gtid_map.end()) { + //MyHGM->gtid_map.erase(it2); + it2->second = NULL; + delete sd; + } + ev_io_stop(MyHGM->gtid_ev_loop, w); + free(w); + } else { + sd->dump(); + } + } + pthread_mutex_unlock(&ev_loop_mutex); +} + +void connect_cb(EV_P_ ev_io *w, int revents) { + pthread_mutex_lock(&ev_loop_mutex); + struct ev_io * c = w; + if (revents & EV_WRITE) { + int optval = 0; + socklen_t optlen = sizeof(optval); + if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) || + (optval != 0)) { + /* Connection failed; try the next address in the list. */ + //int errnum = optval ? optval : errno; + ev_io_stop(MyHGM->gtid_ev_loop, w); + close(w->fd); + MyHGM->gtid_missing_nodes = true; + GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data; + GTID_Server_Data *sd = custom_data; + std::string s1 = sd->address; + s1.append(":"); + s1.append(std::to_string(sd->mysql_port)); + proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); + std::unordered_map ::iterator it2; + it2 = MyHGM->gtid_map.find(s1); + if (it2 != MyHGM->gtid_map.end()) { + //MyHGM->gtid_map.erase(it2); + it2->second = NULL; + delete sd; + } + //delete custom_data; + free(c); + } else { + ev_io_stop(MyHGM->gtid_ev_loop, w); + int fd=w->fd; + struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io)); + new_w->data = w->data; + GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data; + custom_data->w = new_w; + free(w); + ev_io_init(new_w, reader_cb, fd, EV_READ); + ev_io_start(MyHGM->gtid_ev_loop, new_w); + } + } + pthread_mutex_unlock(&ev_loop_mutex); +} + +struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) { + //struct sockaddr_in a; + int s; + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + perror("socket"); + close(s); + return NULL; + } +/* + memset(&a, 0, sizeof(a)); + a.sin_port = htons(gtid_port); + a.sin_family = AF_INET; + if (!inet_aton(address, (struct in_addr *) &a.sin_addr.s_addr)) { + perror("bad IP address format"); + close(s); + return NULL; + } +*/ + ioctl_FIONBIO(s,1); + + struct addrinfo hints; + struct addrinfo *res = NULL; + memset(&hints, 0, sizeof(hints)); + hints.ai_protocol= IPPROTO_TCP; + hints.ai_family= AF_UNSPEC; + hints.ai_socktype= SOCK_STREAM; + + char str_port[NI_MAXSERV+1]; + sprintf(str_port,"%d", gtid_port); + int gai_rc = getaddrinfo(address, str_port, &hints, &res); + if (gai_rc) { + freeaddrinfo(res); + //exit here + return NULL; + } + + //int status = connect(s, (struct sockaddr *) &a, sizeof(a)); + int status = connect(s, res->ai_addr, res->ai_addrlen); + if ((status == 0) || ((status == -1) && (errno == EINPROGRESS))) { + struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io)); + if (c) { + ev_io_init(c, connect_cb, s, EV_WRITE); + GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port); + c->data = (void *)custom_data; + return c; + } + /* else error */ + } + return NULL; +} + + + +GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) { + active = true; + w = _w; + size = 1024; // 1KB buffer + data = (char *)malloc(size); + memset(uuid_server, 0, sizeof(uuid_server)); + pos = 0; + len = 0; + address = strdup(_address); + port = _port; + mysql_port = _mysql_port; + events_read = 0; +} + +void GTID_Server_Data::resize(size_t _s) { + char *data_ = (char *)malloc(_s); + memcpy(data_, data, (_s > size ? size : _s)); + size = _s; + free(data); + data = data_; +} + +GTID_Server_Data::~GTID_Server_Data() { + free(address); + free(data); +} + +bool GTID_Server_Data::readall() { + bool ret = true; + if (size == len) { + // buffer is full, expand + resize(len*2); + } + int rc = 0; + rc = read(w->fd,data+len,size-len); + if (rc > 0) { + len += rc; + } else { + int myerr = errno; + proxy_error("Read returned %d bytes, error %d\n", rc, myerr); + if ( + (rc == 0) || + (rc==-1 && myerr != EINTR && myerr != EAGAIN) + ) { + ret = false; + } + } + return ret; +} + + +bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) { + std::string s = gtid_uuid; + auto it = gtid_executed.find(s); +// fprintf(stderr,"Checking if server %s:%d has GTID %s:%lu ... ", address, port, gtid_uuid, gtid_trxid); + if (it == gtid_executed.end()) { +// fprintf(stderr,"NO\n"); + return false; + } + for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) { + if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) { +// fprintf(stderr,"YES\n"); + return true; + } + } +// fprintf(stderr,"NO\n"); + return false; +} + +void GTID_Server_Data::read_all_gtids() { + while (read_next_gtid()) { + } + } + +void GTID_Server_Data::dump() { + if (len==0) { + return; + } + read_all_gtids(); + //int rc = write(1,data+pos,len-pos); + fflush(stdout); + ///pos += rc; + if (pos >= len/2) { + memmove(data,data+pos,len-pos); + len = len-pos; + pos = 0; + } +} + +bool GTID_Server_Data::writeout() { + bool ret = true; + if (len==0) { + return ret; + } + int rc = 0; + rc = write(w->fd,data+pos,len-pos); + if (rc > 0) { + pos += rc; + if (pos >= len/2) { + memmove(data,data+pos,len-pos); + len = len-pos; + pos = 0; + } + } + return ret; +} + +bool GTID_Server_Data::read_next_gtid() { + if (len==0) { + return false; + } + void *nlp = NULL; + nlp = memchr(data+pos,'\n',len-pos); + if (nlp == NULL) { + return false; + } + int l = (char *)nlp - (data+pos); + char rec_msg[80]; + if (strncmp(data+pos,(char *)"ST=",3)==0) { + // we are reading the bootstrap + char *bs = (char *)malloc(l+1-3); // length + 1 (null byte) - 3 (header) + memcpy(bs, data+pos+3, l-3); + bs[l-3] = '\0'; + char *saveptr1=NULL; + char *saveptr2=NULL; + //char *saveptr3=NULL; + char *token = NULL; + char *subtoken = NULL; + //char *subtoken2 = NULL; + char *str1 = NULL; + char *str2 = NULL; + //char *str3 = NULL; + for (str1 = bs; ; str1 = NULL) { + token = strtok_r(str1, ",", &saveptr1); + if (token == NULL) { + break; + } + int j = 0; + for (str2 = token; ; str2 = NULL) { + subtoken = strtok_r(str2, ":", &saveptr2); + if (subtoken == NULL) { + break; + } + j++; + if (j%2 == 1) { // we are reading the uuid + char *p = uuid_server; + for (unsigned int k=0; kfirst; + s.insert(8,"-"); + s.insert(13,"-"); + s.insert(18,"-"); + s.insert(23,"-"); + s = s + ":"; + for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) { + std::string s2 = s; + s2 = s2 + std::to_string(itr->first); + s2 = s2 + "-"; + s2 = s2 + std::to_string(itr->second); + s2 = s2 + ","; + gtid_set = gtid_set + s2; + } + } + // Extract latest comma only in case 'gtid_executed' isn't empty + if (gtid_set.empty() == false) { + gtid_set.pop_back(); + } + return gtid_set; +} + + + +void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) { + auto it = gtid_executed.find(gtid.first); + if (it == gtid_executed.end()) + { + gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second); + return; + } + + bool flag = true; + for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) + { + if (gtid.second >= itr->first && gtid.second <= itr->second) + return; + if (gtid.second + 1 == itr->first) + { + --itr->first; + flag = false; + break; + } + else if (gtid.second == itr->second + 1) + { + ++itr->second; + flag = false; + break; + } + else if (gtid.second < itr->first) + { + it->second.emplace(itr, gtid.second, gtid.second); + return; + } + } + + if (flag) + it->second.emplace_back(gtid.second, gtid.second); + + for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) + { + auto next_itr = std::next(itr); + if (next_itr != it->second.end() && itr->second + 1 == next_itr->first) + { + itr->second = next_itr->second; + it->second.erase(next_itr); + break; + } + } +} + +void * GTID_syncer_run() { + //struct ev_loop * gtid_ev_loop; + //gtid_ev_loop = NULL; + MyHGM->gtid_ev_loop = ev_loop_new (EVBACKEND_POLL | EVFLAG_NOENV); + if (MyHGM->gtid_ev_loop == NULL) { + proxy_error("could not initialise GTID sync loop\n"); + exit(EXIT_FAILURE); + } + //ev_async_init(gtid_ev_async, gtid_async_cb); + //ev_async_start(gtid_ev_loop, gtid_ev_async); + MyHGM->gtid_ev_timer = (struct ev_timer *)malloc(sizeof(struct ev_timer)); + ev_async_init(MyHGM->gtid_ev_async, gtid_async_cb); + ev_async_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_async); + //ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, __sync_add_and_fetch(&GloMTH->variables.binlog_reader_connect_retry_msec,0)/1000, 0); + ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, 3, 0); + ev_timer_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_timer); + //ev_ref(gtid_ev_loop); + ev_run(MyHGM->gtid_ev_loop, 0); + //sleep(1000); + return NULL; +} + diff --git a/lib/Makefile b/lib/Makefile index 6cf7396ec5..8ab3daea5f 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -128,6 +128,7 @@ default: libproxysql.a _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo set_parser.oo MySQL_Variables.oo c_tokenizer.oo proxysql_utils.oo proxysql_coredump.oo proxysql_sslkeylog.oo \ sha256crypt.oo \ QP_rule_text.oo QP_query_digest_stats.oo \ + GTID_Server_Data.oo MyHGC.oo MySrvConnList.oo MySrvList.oo MySrvC.oo \ proxysql_find_charset.oo ProxySQL_Poll.oo OBJ_CXX := $(patsubst %,$(ODIR)/%,$(_OBJ_CXX)) HEADERS := ../include/*.h ../include/*.hpp diff --git a/lib/MyHGC.cpp b/lib/MyHGC.cpp new file mode 100644 index 0000000000..6daa6be295 --- /dev/null +++ b/lib/MyHGC.cpp @@ -0,0 +1,384 @@ +#include "MySQL_HostGroups_Manager.h" + + +extern MySQL_Threads_Handler *GloMTH; + +MyHGC::MyHGC(int _hid) { + hid=_hid; + mysrvs=new MySrvList(this); + current_time_now = 0; + new_connections_now = 0; + attributes.initialized = false; + reset_attributes(); + // Uninitialized server defaults. Should later be initialized via 'mysql_hostgroup_attributes'. + servers_defaults.weight = -1; + servers_defaults.max_connections = -1; + servers_defaults.use_ssl = -1; +} + +void MyHGC::reset_attributes() { + if (attributes.initialized == false) { + attributes.init_connect = NULL; + attributes.comment = NULL; + attributes.ignore_session_variables_text = NULL; + } + attributes.initialized = true; + attributes.configured = false; + attributes.max_num_online_servers = 1000000; + attributes.throttle_connections_per_sec = 1000000; + attributes.autocommit = -1; + attributes.free_connections_pct = 10; + attributes.handle_warnings = -1; + attributes.multiplex = true; + attributes.connection_warming = false; + free(attributes.init_connect); + attributes.init_connect = NULL; + free(attributes.comment); + attributes.comment = NULL; + free(attributes.ignore_session_variables_text); + attributes.ignore_session_variables_text = NULL; + attributes.ignore_session_variables_json = json(); +} + +MyHGC::~MyHGC() { + reset_attributes(); // free all memory + delete mysrvs; +} + +MySrvC *MyHGC::get_random_MySrvC(char * gtid_uuid, uint64_t gtid_trxid, int max_lag_ms, MySQL_Session *sess) { + MySrvC *mysrvc=NULL; + unsigned int j; + unsigned int sum=0; + unsigned int TotalUsedConn=0; + unsigned int l=mysrvs->cnt(); + static time_t last_hg_log = 0; +#ifdef TEST_AURORA + unsigned long long a1 = array_mysrvc_total/10000; + array_mysrvc_total += l; + unsigned long long a2 = array_mysrvc_total/10000; + if (a2 > a1) { + fprintf(stderr, "Total: %llu, Candidates: %llu\n", array_mysrvc_total-l, array_mysrvc_cands); + } +#endif // TEST_AURORA + MySrvC *mysrvcCandidates_static[32]; + MySrvC **mysrvcCandidates = mysrvcCandidates_static; + unsigned int num_candidates = 0; + bool max_connections_reached = false; + if (l>32) { + mysrvcCandidates = (MySrvC **)malloc(sizeof(MySrvC *)*l); + } + if (l) { + //int j=0; + for (j=0; jidx(j); + if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { // consider this server only if ONLINE + if (mysrvc->ConnectionsUsed->conns_length() < mysrvc->max_connections) { // consider this server only if didn't reach max_connections + if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far + if (gtid_trxid) { + if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } else { + if (max_lag_ms >= 0) { + if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } else { + sess->thread->status_variables.stvar[st_var_aws_aurora_replicas_skipped_during_query]++; + } + } else { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } + } + } else { + max_connections_reached = true; + } + } else { + if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) { + // try to recover shunned servers + if (mysrvc->shunned_automatic && mysql_thread___shun_recovery_time_sec) { + time_t t; + t=time(NULL); + // we do all these changes without locking . We assume the server is not used from long + // even if the server is still in used and any of the follow command fails it is not critical + // because this is only an attempt to recover a server that is probably dead anyway + + // the next few lines of code try to solve issue #530 + int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/1000 - 1 : mysql_thread___shun_recovery_time_sec ); + if (max_wait_sec < 1) { // min wait time should be at least 1 second + max_wait_sec = 1; + } + if (t > mysrvc->time_last_detected_error && (t - mysrvc->time_last_detected_error) > max_wait_sec) { + if ( + (mysrvc->shunned_and_kill_all_connections==false) // it is safe to bring it back online + || + (mysrvc->shunned_and_kill_all_connections==true && mysrvc->ConnectionsUsed->conns_length()==0 && mysrvc->ConnectionsFree->conns_length()==0) // if shunned_and_kill_all_connections is set, ensure all connections are already dropped + ) { +#ifdef DEBUG + if (GloMTH->variables.hostgroup_manager_verbose >= 3) { + proxy_info("Unshunning server %s:%d.\n", mysrvc->address, mysrvc->port); + } +#endif + mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; + mysrvc->shunned_automatic=false; + mysrvc->shunned_and_kill_all_connections=false; + mysrvc->connect_ERR_at_time_last_detected_error=0; + mysrvc->time_last_detected_error=0; + // note: the following function scans all the hostgroups. + // This is ok for now because we only have a global mutex. + // If one day we implement a mutex per hostgroup (unlikely, + // but possible), this must be taken into consideration + if (mysql_thread___unshun_algorithm == 1) { + MyHGM->unshun_server_all_hostgroups(mysrvc->address, mysrvc->port, t, max_wait_sec, &mysrvc->myhgc->hid); + } + // if a server is taken back online, consider it immediately + if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far + if (gtid_trxid) { + if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } else { + if (max_lag_ms >= 0) { + if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } else { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } + } + } + } + } + } + } + } + if (max_lag_ms > 0) { // we are using AWS Aurora, as this logic is implemented only here + unsigned int min_num_replicas = sess->thread->variables.aurora_max_lag_ms_only_read_from_replicas; + if (min_num_replicas) { + if (num_candidates >= min_num_replicas) { // there are at least N replicas + // we try to remove the writer + unsigned int total_aws_aurora_current_lag_us=0; + for (j=0; jaws_aurora_current_lag_us; + } + if (total_aws_aurora_current_lag_us) { // we are just double checking that we don't have all servers with aws_aurora_current_lag_us==0 + for (j=0; jaws_aurora_current_lag_us==0) { + sum-=mysrvc->weight; + TotalUsedConn-=mysrvc->ConnectionsUsed->conns_length(); + if (j < num_candidates-1) { + mysrvcCandidates[j]=mysrvcCandidates[num_candidates-1]; + } + num_candidates--; + } + } + } + } + } + } + if (sum==0) { + // per issue #531 , we try a desperate attempt to bring back online any shunned server + // we do this lowering the maximum wait time to 10% + // most of the follow code is copied from few lines above + time_t t; + t=time(NULL); + int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/10000 - 1 : mysql_thread___shun_recovery_time_sec/10 ); + if (max_wait_sec < 1) { // min wait time should be at least 1 second + max_wait_sec = 1; + } + if (t - last_hg_log > 1) { // log this at most once per second to avoid spamming the logs + last_hg_log = time(NULL); + + if (gtid_trxid) { + proxy_error("Hostgroup %u has no servers ready for GTID '%s:%ld'. Waiting for replication...\n", hid, gtid_uuid, gtid_trxid); + } else { + proxy_error("Hostgroup %u has no servers available%s! Checking servers shunned for more than %u second%s\n", hid, + (max_connections_reached ? " or max_connections reached for all servers" : ""), max_wait_sec, max_wait_sec == 1 ? "" : "s"); + } + } + for (j=0; jidx(j); + if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED && mysrvc->shunned_automatic==true) { + if ((t - mysrvc->time_last_detected_error) > max_wait_sec) { + mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; + mysrvc->shunned_automatic=false; + mysrvc->connect_ERR_at_time_last_detected_error=0; + mysrvc->time_last_detected_error=0; + // if a server is taken back online, consider it immediately + if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far + if (gtid_trxid) { + if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } else { + if (max_lag_ms >= 0) { + if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } else { + sum+=mysrvc->weight; + TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); + mysrvcCandidates[num_candidates]=mysrvc; + num_candidates++; + } + } + } + } + } + } + } + if (sum==0) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n"); + if (l>32) { + free(mysrvcCandidates); + } +#ifdef TEST_AURORA + array_mysrvc_cands += num_candidates; +#endif // TEST_AURORA + return NULL; // if we reach here, we couldn't find any target + } + +/* + unsigned int New_sum=0; + unsigned int New_TotalUsedConn=0; + // we will now scan again to ignore overloaded servers + for (j=0; jConnectionsUsed->conns_length(); + if ((len * sum) <= (TotalUsedConn * mysrvc->weight * 1.5 + 1)) { + + New_sum+=mysrvc->weight; + New_TotalUsedConn+=len; + } else { + // remove the candidate + if (j+1 < num_candidates) { + mysrvcCandidates[j] = mysrvcCandidates[num_candidates-1]; + } + j--; + num_candidates--; + } + } +*/ + + unsigned int New_sum=sum; + + if (New_sum==0) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n"); + if (l>32) { + free(mysrvcCandidates); + } +#ifdef TEST_AURORA + array_mysrvc_cands += num_candidates; +#endif // TEST_AURORA + return NULL; // if we reach here, we couldn't find any target + } + + // latency awareness algorithm is enabled only when compiled with USE_MYSRVC_ARRAY + if (sess && sess->thread->variables.min_num_servers_lantency_awareness) { + if ((int) num_candidates >= sess->thread->variables.min_num_servers_lantency_awareness) { + unsigned int servers_with_latency = 0; + unsigned int total_latency_us = 0; + // scan and verify that all servers have some latency + for (j=0; jcurrent_latency_us) { + servers_with_latency++; + total_latency_us += mysrvc->current_latency_us; + } + } + if (servers_with_latency == num_candidates) { + // all servers have some latency. + // That is good. If any server have no latency, something is wrong + // and we will skip this algorithm + sess->thread->status_variables.stvar[st_var_ConnPool_get_conn_latency_awareness]++; + unsigned int avg_latency_us = 0; + avg_latency_us = total_latency_us/num_candidates; + for (j=0; jcurrent_latency_us > avg_latency_us) { + // remove the candidate + if (j+1 < num_candidates) { + mysrvcCandidates[j] = mysrvcCandidates[num_candidates-1]; + } + j--; + num_candidates--; + } + } + // we scan again to adjust weight + New_sum = 0; + for (j=0; jweight; + } + } + } + } + + + unsigned int k; + if (New_sum > 32768) { + k=rand()%New_sum; + } else { + k=fastrand()%New_sum; + } + k++; + New_sum=0; + + for (j=0; jweight; + if (k<=New_sum) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC %p, server %s:%d\n", mysrvc, mysrvc->address, mysrvc->port); + if (l>32) { + free(mysrvcCandidates); + } +#ifdef TEST_AURORA + array_mysrvc_cands += num_candidates; +#endif // TEST_AURORA + return mysrvc; + } + } + } else { + time_t t = time(NULL); + + if (t - last_hg_log > 1) { + last_hg_log = time(NULL); + proxy_error("Hostgroup %u has no servers available!\n", hid); + } + } + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL\n"); + if (l>32) { + free(mysrvcCandidates); + } +#ifdef TEST_AURORA + array_mysrvc_cands += num_candidates; +#endif // TEST_AURORA + return NULL; // if we reach here, we couldn't find any target +} diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index d1f6188ec4..90bc415367 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -57,39 +57,9 @@ class MySrvC; class MySrvList; class MyHGC; -//static struct ev_async * gtid_ev_async; +struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port); +void * GTID_syncer_run(); -static pthread_mutex_t ev_loop_mutex; - -//static std::unordered_map gtid_map; - -static void gtid_async_cb(struct ev_loop *loop, struct ev_async *watcher, int revents) { - if (glovars.shutdown) { - ev_break(loop); - } - pthread_mutex_lock(&ev_loop_mutex); - MyHGM->gtid_missing_nodes = false; - MyHGM->generate_mysql_gtid_executed_tables(); - pthread_mutex_unlock(&ev_loop_mutex); - return; -} - -static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int revents) { - if (GloMTH == nullptr) { return; } - ev_timer_stop(loop, timer); - ev_timer_set(timer, __sync_add_and_fetch(&GloMTH->variables.binlog_reader_connect_retry_msec,0)/1000, 0); - if (glovars.shutdown) { - ev_break(loop); - } - if (MyHGM->gtid_missing_nodes) { - pthread_mutex_lock(&ev_loop_mutex); - MyHGM->gtid_missing_nodes = false; - MyHGM->generate_mysql_gtid_executed_tables(); - pthread_mutex_unlock(&ev_loop_mutex); - } - ev_timer_start(loop, timer); - return; -} static int wait_for_mysql(MYSQL *mysql, int status) { struct pollfd pfd; @@ -115,432 +85,6 @@ static int wait_for_mysql(MYSQL *mysql, int status) { } } -void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { - pthread_mutex_lock(&ev_loop_mutex); - if (revents & EV_READ) { - GTID_Server_Data *sd = (GTID_Server_Data *)w->data; - bool rc = true; - rc = sd->readall(); - if (rc == false) { - //delete sd; - std::string s1 = sd->address; - s1.append(":"); - s1.append(std::to_string(sd->mysql_port)); - MyHGM->gtid_missing_nodes = true; - proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); - std::unordered_map ::iterator it2; - it2 = MyHGM->gtid_map.find(s1); - if (it2 != MyHGM->gtid_map.end()) { - //MyHGM->gtid_map.erase(it2); - it2->second = NULL; - delete sd; - } - ev_io_stop(MyHGM->gtid_ev_loop, w); - free(w); - } else { - sd->dump(); - } - } - pthread_mutex_unlock(&ev_loop_mutex); -} - -void connect_cb(EV_P_ ev_io *w, int revents) { - pthread_mutex_lock(&ev_loop_mutex); - struct ev_io * c = w; - if (revents & EV_WRITE) { - int optval = 0; - socklen_t optlen = sizeof(optval); - if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) || - (optval != 0)) { - /* Connection failed; try the next address in the list. */ - //int errnum = optval ? optval : errno; - ev_io_stop(MyHGM->gtid_ev_loop, w); - close(w->fd); - MyHGM->gtid_missing_nodes = true; - GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data; - GTID_Server_Data *sd = custom_data; - std::string s1 = sd->address; - s1.append(":"); - s1.append(std::to_string(sd->mysql_port)); - proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); - std::unordered_map ::iterator it2; - it2 = MyHGM->gtid_map.find(s1); - if (it2 != MyHGM->gtid_map.end()) { - //MyHGM->gtid_map.erase(it2); - it2->second = NULL; - delete sd; - } - //delete custom_data; - free(c); - } else { - ev_io_stop(MyHGM->gtid_ev_loop, w); - int fd=w->fd; - struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io)); - new_w->data = w->data; - GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data; - custom_data->w = new_w; - free(w); - ev_io_init(new_w, reader_cb, fd, EV_READ); - ev_io_start(MyHGM->gtid_ev_loop, new_w); - } - } - pthread_mutex_unlock(&ev_loop_mutex); -} - -struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) { - //struct sockaddr_in a; - int s; - - if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - perror("socket"); - close(s); - return NULL; - } -/* - memset(&a, 0, sizeof(a)); - a.sin_port = htons(gtid_port); - a.sin_family = AF_INET; - if (!inet_aton(address, (struct in_addr *) &a.sin_addr.s_addr)) { - perror("bad IP address format"); - close(s); - return NULL; - } -*/ - ioctl_FIONBIO(s,1); - - struct addrinfo hints; - struct addrinfo *res = NULL; - memset(&hints, 0, sizeof(hints)); - hints.ai_protocol= IPPROTO_TCP; - hints.ai_family= AF_UNSPEC; - hints.ai_socktype= SOCK_STREAM; - - char str_port[NI_MAXSERV+1]; - sprintf(str_port,"%d", gtid_port); - int gai_rc = getaddrinfo(address, str_port, &hints, &res); - if (gai_rc) { - freeaddrinfo(res); - //exit here - return NULL; - } - - //int status = connect(s, (struct sockaddr *) &a, sizeof(a)); - int status = connect(s, res->ai_addr, res->ai_addrlen); - if ((status == 0) || ((status == -1) && (errno == EINPROGRESS))) { - struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io)); - if (c) { - ev_io_init(c, connect_cb, s, EV_WRITE); - GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port); - c->data = (void *)custom_data; - return c; - } - /* else error */ - } - return NULL; -} - - - -GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) { - active = true; - w = _w; - size = 1024; // 1KB buffer - data = (char *)malloc(size); - memset(uuid_server, 0, sizeof(uuid_server)); - pos = 0; - len = 0; - address = strdup(_address); - port = _port; - mysql_port = _mysql_port; - events_read = 0; -} - -void GTID_Server_Data::resize(size_t _s) { - char *data_ = (char *)malloc(_s); - memcpy(data_, data, (_s > size ? size : _s)); - size = _s; - free(data); - data = data_; -} - -GTID_Server_Data::~GTID_Server_Data() { - free(address); - free(data); -} - -bool GTID_Server_Data::readall() { - bool ret = true; - if (size == len) { - // buffer is full, expand - resize(len*2); - } - int rc = 0; - rc = read(w->fd,data+len,size-len); - if (rc > 0) { - len += rc; - } else { - int myerr = errno; - proxy_error("Read returned %d bytes, error %d\n", rc, myerr); - if ( - (rc == 0) || - (rc==-1 && myerr != EINTR && myerr != EAGAIN) - ) { - ret = false; - } - } - return ret; -} - - -bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) { - std::string s = gtid_uuid; - auto it = gtid_executed.find(s); -// fprintf(stderr,"Checking if server %s:%d has GTID %s:%lu ... ", address, port, gtid_uuid, gtid_trxid); - if (it == gtid_executed.end()) { -// fprintf(stderr,"NO\n"); - return false; - } - for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) { - if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) { -// fprintf(stderr,"YES\n"); - return true; - } - } -// fprintf(stderr,"NO\n"); - return false; -} - -void GTID_Server_Data::read_all_gtids() { - while (read_next_gtid()) { - } - } - -void GTID_Server_Data::dump() { - if (len==0) { - return; - } - read_all_gtids(); - //int rc = write(1,data+pos,len-pos); - fflush(stdout); - ///pos += rc; - if (pos >= len/2) { - memmove(data,data+pos,len-pos); - len = len-pos; - pos = 0; - } -} - -bool GTID_Server_Data::writeout() { - bool ret = true; - if (len==0) { - return ret; - } - int rc = 0; - rc = write(w->fd,data+pos,len-pos); - if (rc > 0) { - pos += rc; - if (pos >= len/2) { - memmove(data,data+pos,len-pos); - len = len-pos; - pos = 0; - } - } - return ret; -} - -bool GTID_Server_Data::read_next_gtid() { - if (len==0) { - return false; - } - void *nlp = NULL; - nlp = memchr(data+pos,'\n',len-pos); - if (nlp == NULL) { - return false; - } - int l = (char *)nlp - (data+pos); - char rec_msg[80]; - if (strncmp(data+pos,(char *)"ST=",3)==0) { - // we are reading the bootstrap - char *bs = (char *)malloc(l+1-3); // length + 1 (null byte) - 3 (header) - memcpy(bs, data+pos+3, l-3); - bs[l-3] = '\0'; - char *saveptr1=NULL; - char *saveptr2=NULL; - //char *saveptr3=NULL; - char *token = NULL; - char *subtoken = NULL; - //char *subtoken2 = NULL; - char *str1 = NULL; - char *str2 = NULL; - //char *str3 = NULL; - for (str1 = bs; ; str1 = NULL) { - token = strtok_r(str1, ",", &saveptr1); - if (token == NULL) { - break; - } - int j = 0; - for (str2 = token; ; str2 = NULL) { - subtoken = strtok_r(str2, ":", &saveptr2); - if (subtoken == NULL) { - break; - } - j++; - if (j%2 == 1) { // we are reading the uuid - char *p = uuid_server; - for (unsigned int k=0; kfirst; - s.insert(8,"-"); - s.insert(13,"-"); - s.insert(18,"-"); - s.insert(23,"-"); - s = s + ":"; - for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) { - std::string s2 = s; - s2 = s2 + std::to_string(itr->first); - s2 = s2 + "-"; - s2 = s2 + std::to_string(itr->second); - s2 = s2 + ","; - gtid_set = gtid_set + s2; - } - } - // Extract latest comma only in case 'gtid_executed' isn't empty - if (gtid_set.empty() == false) { - gtid_set.pop_back(); - } - return gtid_set; -} - - - -void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) { - auto it = gtid_executed.find(gtid.first); - if (it == gtid_executed.end()) - { - gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second); - return; - } - - bool flag = true; - for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) - { - if (gtid.second >= itr->first && gtid.second <= itr->second) - return; - if (gtid.second + 1 == itr->first) - { - --itr->first; - flag = false; - break; - } - else if (gtid.second == itr->second + 1) - { - ++itr->second; - flag = false; - break; - } - else if (gtid.second < itr->first) - { - it->second.emplace(itr, gtid.second, gtid.second); - return; - } - } - - if (flag) - it->second.emplace_back(gtid.second, gtid.second); - - for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) - { - auto next_itr = std::next(itr); - if (next_itr != it->second.end() && itr->second + 1 == next_itr->first) - { - itr->second = next_itr->second; - it->second.erase(next_itr); - break; - } - } -} - -static void * GTID_syncer_run() { - //struct ev_loop * gtid_ev_loop; - //gtid_ev_loop = NULL; - MyHGM->gtid_ev_loop = ev_loop_new (EVBACKEND_POLL | EVFLAG_NOENV); - if (MyHGM->gtid_ev_loop == NULL) { - proxy_error("could not initialise GTID sync loop\n"); - exit(EXIT_FAILURE); - } - //ev_async_init(gtid_ev_async, gtid_async_cb); - //ev_async_start(gtid_ev_loop, gtid_ev_async); - MyHGM->gtid_ev_timer = (struct ev_timer *)malloc(sizeof(struct ev_timer)); - ev_async_init(MyHGM->gtid_ev_async, gtid_async_cb); - ev_async_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_async); - //ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, __sync_add_and_fetch(&GloMTH->variables.binlog_reader_connect_retry_msec,0)/1000, 0); - ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, 3, 0); - ev_timer_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_timer); - //ev_ref(gtid_ev_loop); - ev_run(MyHGM->gtid_ev_loop, 0); - //sleep(1000); - return NULL; -} //static void * HGCU_thread_run() { static void * HGCU_thread_run() { @@ -661,266 +205,6 @@ static void * HGCU_thread_run() { } -MySQL_Connection *MySrvConnList::index(unsigned int _k) { - return (MySQL_Connection *)conns->index(_k); -} - -MySQL_Connection * MySrvConnList::remove(int _k) { - return (MySQL_Connection *)conns->remove_index_fast(_k); -} - -/* -unsigned int MySrvConnList::conns_length() { - return conns->len; -} -*/ - -MySrvConnList::MySrvConnList(MySrvC *_mysrvc) { - mysrvc=_mysrvc; - conns=new PtrArray(); -} - -void MySrvConnList::add(MySQL_Connection *c) { - conns->add(c); -} - -MySrvConnList::~MySrvConnList() { - mysrvc=NULL; - while (conns_length()) { - MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0); - delete conn; - } - delete conns; -} - -MySrvList::MySrvList(MyHGC *_myhgc) { - myhgc=_myhgc; - servers=new PtrArray(); -} - -void MySrvList::add(MySrvC *s) { - if (s->myhgc==NULL) { - s->myhgc=myhgc; - } - servers->add(s); -} - - -int MySrvList::find_idx(MySrvC *s) { - for (unsigned int i=0; ilen; i++) { - MySrvC *mysrv=(MySrvC *)servers->index(i); - if (mysrv==s) { - return (unsigned int)i; - } - } - return -1; -} - -void MySrvList::remove(MySrvC *s) { - int i=find_idx(s); - assert(i>=0); - servers->remove_index_fast((unsigned int)i); -} - -void MySrvConnList::drop_all_connections() { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Dropping all connections (%u total) on MySrvConnList %p for server %s:%d , hostgroup=%d , status=%d\n", conns_length(), this, mysrvc->address, mysrvc->port, mysrvc->myhgc->hid, mysrvc->status); - while (conns_length()) { - MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0); - delete conn; - } -} - - -MySrvC::MySrvC( - char* add, uint16_t p, uint16_t gp, int64_t _weight, enum MySerStatus _status, unsigned int _compression, - int64_t _max_connections, unsigned int _max_replication_lag, int32_t _use_ssl, unsigned int _max_latency_ms, - char* _comment -) { - address=strdup(add); - port=p; - gtid_port=gp; - weight=_weight; - status=_status; - compression=_compression; - max_connections=_max_connections; - max_replication_lag=_max_replication_lag; - use_ssl=_use_ssl; - cur_replication_lag=0; - cur_replication_lag_count=0; - max_latency_us=_max_latency_ms*1000; - current_latency_us=0; - aws_aurora_current_lag_us = 0; - connect_OK=0; - connect_ERR=0; - queries_sent=0; - bytes_sent=0; - bytes_recv=0; - max_connections_used=0; - queries_gtid_sync=0; - time_last_detected_error=0; - connect_ERR_at_time_last_detected_error=0; - shunned_automatic=false; - shunned_and_kill_all_connections=false; // false to default - //charset=_charset; - myhgc=NULL; - comment=strdup(_comment); - ConnectionsUsed=new MySrvConnList(this); - ConnectionsFree=new MySrvConnList(this); -} - -void MySrvC::connect_error(int err_num, bool get_mutex) { - // NOTE: this function operates without any mutex - // although, it is not extremely important if any counter is lost - // as a single connection failure won't make a significant difference - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Connect failed with code '%d'\n", err_num); - __sync_fetch_and_add(&connect_ERR,1); - __sync_fetch_and_add(&MyHGM->status.server_connections_aborted,1); - if (err_num >= 1048 && err_num <= 1052) - return; - if (err_num >= 1054 && err_num <= 1075) - return; - if (err_num >= 1099 && err_num <= 1104) - return; - if (err_num >= 1106 && err_num <= 1113) - return; - if (err_num >= 1116 && err_num <= 1118) - return; - if (err_num == 1136 || (err_num >= 1138 && err_num <= 1149)) - return; - switch (err_num) { - case 1007: // Can't create database - case 1008: // Can't drop database - case 1044: // access denied - case 1045: // access denied -/* - case 1048: // Column cannot be null - case 1049: // Unknown database - case 1050: // Table already exists - case 1051: // Unknown table - case 1052: // Column is ambiguous -*/ - case 1120: - case 1203: // User %s already has more than 'max_user_connections' active connections - case 1226: // User '%s' has exceeded the '%s' resource (current value: %ld) - case 3118: // Access denied for user '%s'. Account is locked.. - return; - break; - default: - break; - } - time_t t=time(NULL); - if (t > time_last_detected_error) { - time_last_detected_error=t; - connect_ERR_at_time_last_detected_error=1; - } else { - if (t < time_last_detected_error) { - // time_last_detected_error is in the future - // this means that monitor has a ping interval too big and tuned that in the future - return; - } - // same time - /** - * @brief The expected configured retries set by 'mysql-connect_retries_on_failure' + '2' extra expected - * connection errors. - * @details This two extra connections errors are expected: - * 1. An initial connection error generated by the datastream and the connection when being created, - * this is, right after the session has requested a connection to the connection pool. This error takes - * places directly in the state machine from 'MySQL_Connection'. Because of this, we consider this - * additional error to be a consequence of the two states machines, and it's not considered for - * 'connect_retries'. - * 2. A second connection connection error, which is the initial connection error generated by 'MySQL_Session' - * when already in the 'CONNECTING_SERVER' state. This error is an 'extra error' to always consider, since - * it's not part of the retries specified by 'mysql_thread___connect_retries_on_failure', thus, we set the - * 'connect_retries' to be 'mysql_thread___connect_retries_on_failure + 1'. - */ - int connect_retries = mysql_thread___connect_retries_on_failure + 1; - int max_failures = mysql_thread___shun_on_failures > connect_retries ? connect_retries : mysql_thread___shun_on_failures; - - if (__sync_add_and_fetch(&connect_ERR_at_time_last_detected_error,1) >= (unsigned int)max_failures) { - bool _shu=false; - if (get_mutex==true) - MyHGM->wrlock(); // to prevent race conditions, lock here. See #627 - if (status==MYSQL_SERVER_STATUS_ONLINE) { - status=MYSQL_SERVER_STATUS_SHUNNED; - shunned_automatic=true; - _shu=true; - } else { - _shu=false; - } - if (get_mutex==true) - MyHGM->wrunlock(); - if (_shu) { - proxy_error("Shunning server %s:%d with %u errors/sec. Shunning for %u seconds\n", address, port, connect_ERR_at_time_last_detected_error , mysql_thread___shun_recovery_time_sec); - } - } - } -} - -void MySrvC::shun_and_killall() { - status=MYSQL_SERVER_STATUS_SHUNNED; - shunned_automatic=true; - shunned_and_kill_all_connections=true; -} - -MySrvC::~MySrvC() { - if (address) free(address); - if (comment) free(comment); - delete ConnectionsUsed; - delete ConnectionsFree; -} - -MySrvList::~MySrvList() { - myhgc=NULL; - while (servers->len) { - MySrvC *mysrvc=(MySrvC *)servers->remove_index_fast(0); - delete mysrvc; - } - delete servers; -} - - -MyHGC::MyHGC(int _hid) { - hid=_hid; - mysrvs=new MySrvList(this); - current_time_now = 0; - new_connections_now = 0; - attributes.initialized = false; - reset_attributes(); - // Uninitialized server defaults. Should later be initialized via 'mysql_hostgroup_attributes'. - servers_defaults.weight = -1; - servers_defaults.max_connections = -1; - servers_defaults.use_ssl = -1; -} - -void MyHGC::reset_attributes() { - if (attributes.initialized == false) { - attributes.init_connect = NULL; - attributes.comment = NULL; - attributes.ignore_session_variables_text = NULL; - } - attributes.initialized = true; - attributes.configured = false; - attributes.max_num_online_servers = 1000000; - attributes.throttle_connections_per_sec = 1000000; - attributes.autocommit = -1; - attributes.free_connections_pct = 10; - attributes.handle_warnings = -1; - attributes.multiplex = true; - attributes.connection_warming = false; - free(attributes.init_connect); - attributes.init_connect = NULL; - free(attributes.comment); - attributes.comment = NULL; - free(attributes.ignore_session_variables_text); - attributes.ignore_session_variables_text = NULL; - attributes.ignore_session_variables_json = json(); -} - -MyHGC::~MyHGC() { - reset_attributes(); // free all memory - delete mysrvs; -} - using metric_name = std::string; using metric_help = std::string; using metric_tags = std::map; @@ -1296,7 +580,6 @@ hg_metrics_map = std::make_tuple( ); MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() { - pthread_mutex_init(&ev_loop_mutex, NULL); status.client_connections=0; status.client_connections_aborted=0; status.client_connections_created=0; @@ -3137,557 +2420,6 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool_array(MySQL_Connection **ca, wrunlock(); } -MySrvC *MyHGC::get_random_MySrvC(char * gtid_uuid, uint64_t gtid_trxid, int max_lag_ms, MySQL_Session *sess) { - MySrvC *mysrvc=NULL; - unsigned int j; - unsigned int sum=0; - unsigned int TotalUsedConn=0; - unsigned int l=mysrvs->cnt(); - static time_t last_hg_log = 0; -#ifdef TEST_AURORA - unsigned long long a1 = array_mysrvc_total/10000; - array_mysrvc_total += l; - unsigned long long a2 = array_mysrvc_total/10000; - if (a2 > a1) { - fprintf(stderr, "Total: %llu, Candidates: %llu\n", array_mysrvc_total-l, array_mysrvc_cands); - } -#endif // TEST_AURORA - MySrvC *mysrvcCandidates_static[32]; - MySrvC **mysrvcCandidates = mysrvcCandidates_static; - unsigned int num_candidates = 0; - bool max_connections_reached = false; - if (l>32) { - mysrvcCandidates = (MySrvC **)malloc(sizeof(MySrvC *)*l); - } - if (l) { - //int j=0; - for (j=0; jidx(j); - if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { // consider this server only if ONLINE - if (mysrvc->ConnectionsUsed->conns_length() < mysrvc->max_connections) { // consider this server only if didn't reach max_connections - if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far - if (gtid_trxid) { - if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } else { - if (max_lag_ms >= 0) { - if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } else { - sess->thread->status_variables.stvar[st_var_aws_aurora_replicas_skipped_during_query]++; - } - } else { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } - } - } else { - max_connections_reached = true; - } - } else { - if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) { - // try to recover shunned servers - if (mysrvc->shunned_automatic && mysql_thread___shun_recovery_time_sec) { - time_t t; - t=time(NULL); - // we do all these changes without locking . We assume the server is not used from long - // even if the server is still in used and any of the follow command fails it is not critical - // because this is only an attempt to recover a server that is probably dead anyway - - // the next few lines of code try to solve issue #530 - int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/1000 - 1 : mysql_thread___shun_recovery_time_sec ); - if (max_wait_sec < 1) { // min wait time should be at least 1 second - max_wait_sec = 1; - } - if (t > mysrvc->time_last_detected_error && (t - mysrvc->time_last_detected_error) > max_wait_sec) { - if ( - (mysrvc->shunned_and_kill_all_connections==false) // it is safe to bring it back online - || - (mysrvc->shunned_and_kill_all_connections==true && mysrvc->ConnectionsUsed->conns_length()==0 && mysrvc->ConnectionsFree->conns_length()==0) // if shunned_and_kill_all_connections is set, ensure all connections are already dropped - ) { -#ifdef DEBUG - if (GloMTH->variables.hostgroup_manager_verbose >= 3) { - proxy_info("Unshunning server %s:%d.\n", mysrvc->address, mysrvc->port); - } -#endif - mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; - mysrvc->shunned_automatic=false; - mysrvc->shunned_and_kill_all_connections=false; - mysrvc->connect_ERR_at_time_last_detected_error=0; - mysrvc->time_last_detected_error=0; - // note: the following function scans all the hostgroups. - // This is ok for now because we only have a global mutex. - // If one day we implement a mutex per hostgroup (unlikely, - // but possible), this must be taken into consideration - if (mysql_thread___unshun_algorithm == 1) { - MyHGM->unshun_server_all_hostgroups(mysrvc->address, mysrvc->port, t, max_wait_sec, &mysrvc->myhgc->hid); - } - // if a server is taken back online, consider it immediately - if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far - if (gtid_trxid) { - if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } else { - if (max_lag_ms >= 0) { - if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } else { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } - } - } - } - } - } - } - } - if (max_lag_ms > 0) { // we are using AWS Aurora, as this logic is implemented only here - unsigned int min_num_replicas = sess->thread->variables.aurora_max_lag_ms_only_read_from_replicas; - if (min_num_replicas) { - if (num_candidates >= min_num_replicas) { // there are at least N replicas - // we try to remove the writer - unsigned int total_aws_aurora_current_lag_us=0; - for (j=0; jaws_aurora_current_lag_us; - } - if (total_aws_aurora_current_lag_us) { // we are just double checking that we don't have all servers with aws_aurora_current_lag_us==0 - for (j=0; jaws_aurora_current_lag_us==0) { - sum-=mysrvc->weight; - TotalUsedConn-=mysrvc->ConnectionsUsed->conns_length(); - if (j < num_candidates-1) { - mysrvcCandidates[j]=mysrvcCandidates[num_candidates-1]; - } - num_candidates--; - } - } - } - } - } - } - if (sum==0) { - // per issue #531 , we try a desperate attempt to bring back online any shunned server - // we do this lowering the maximum wait time to 10% - // most of the follow code is copied from few lines above - time_t t; - t=time(NULL); - int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/10000 - 1 : mysql_thread___shun_recovery_time_sec/10 ); - if (max_wait_sec < 1) { // min wait time should be at least 1 second - max_wait_sec = 1; - } - if (t - last_hg_log > 1) { // log this at most once per second to avoid spamming the logs - last_hg_log = time(NULL); - - if (gtid_trxid) { - proxy_error("Hostgroup %u has no servers ready for GTID '%s:%ld'. Waiting for replication...\n", hid, gtid_uuid, gtid_trxid); - } else { - proxy_error("Hostgroup %u has no servers available%s! Checking servers shunned for more than %u second%s\n", hid, - (max_connections_reached ? " or max_connections reached for all servers" : ""), max_wait_sec, max_wait_sec == 1 ? "" : "s"); - } - } - for (j=0; jidx(j); - if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED && mysrvc->shunned_automatic==true) { - if ((t - mysrvc->time_last_detected_error) > max_wait_sec) { - mysrvc->status=MYSQL_SERVER_STATUS_ONLINE; - mysrvc->shunned_automatic=false; - mysrvc->connect_ERR_at_time_last_detected_error=0; - mysrvc->time_last_detected_error=0; - // if a server is taken back online, consider it immediately - if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far - if (gtid_trxid) { - if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } else { - if (max_lag_ms >= 0) { - if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } else { - sum+=mysrvc->weight; - TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length(); - mysrvcCandidates[num_candidates]=mysrvc; - num_candidates++; - } - } - } - } - } - } - } - if (sum==0) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n"); - if (l>32) { - free(mysrvcCandidates); - } -#ifdef TEST_AURORA - array_mysrvc_cands += num_candidates; -#endif // TEST_AURORA - return NULL; // if we reach here, we couldn't find any target - } - -/* - unsigned int New_sum=0; - unsigned int New_TotalUsedConn=0; - // we will now scan again to ignore overloaded servers - for (j=0; jConnectionsUsed->conns_length(); - if ((len * sum) <= (TotalUsedConn * mysrvc->weight * 1.5 + 1)) { - - New_sum+=mysrvc->weight; - New_TotalUsedConn+=len; - } else { - // remove the candidate - if (j+1 < num_candidates) { - mysrvcCandidates[j] = mysrvcCandidates[num_candidates-1]; - } - j--; - num_candidates--; - } - } -*/ - - unsigned int New_sum=sum; - - if (New_sum==0) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n"); - if (l>32) { - free(mysrvcCandidates); - } -#ifdef TEST_AURORA - array_mysrvc_cands += num_candidates; -#endif // TEST_AURORA - return NULL; // if we reach here, we couldn't find any target - } - - // latency awareness algorithm is enabled only when compiled with USE_MYSRVC_ARRAY - if (sess && sess->thread->variables.min_num_servers_lantency_awareness) { - if ((int) num_candidates >= sess->thread->variables.min_num_servers_lantency_awareness) { - unsigned int servers_with_latency = 0; - unsigned int total_latency_us = 0; - // scan and verify that all servers have some latency - for (j=0; jcurrent_latency_us) { - servers_with_latency++; - total_latency_us += mysrvc->current_latency_us; - } - } - if (servers_with_latency == num_candidates) { - // all servers have some latency. - // That is good. If any server have no latency, something is wrong - // and we will skip this algorithm - sess->thread->status_variables.stvar[st_var_ConnPool_get_conn_latency_awareness]++; - unsigned int avg_latency_us = 0; - avg_latency_us = total_latency_us/num_candidates; - for (j=0; jcurrent_latency_us > avg_latency_us) { - // remove the candidate - if (j+1 < num_candidates) { - mysrvcCandidates[j] = mysrvcCandidates[num_candidates-1]; - } - j--; - num_candidates--; - } - } - // we scan again to adjust weight - New_sum = 0; - for (j=0; jweight; - } - } - } - } - - - unsigned int k; - if (New_sum > 32768) { - k=rand()%New_sum; - } else { - k=fastrand()%New_sum; - } - k++; - New_sum=0; - - for (j=0; jweight; - if (k<=New_sum) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC %p, server %s:%d\n", mysrvc, mysrvc->address, mysrvc->port); - if (l>32) { - free(mysrvcCandidates); - } -#ifdef TEST_AURORA - array_mysrvc_cands += num_candidates; -#endif // TEST_AURORA - return mysrvc; - } - } - } else { - time_t t = time(NULL); - - if (t - last_hg_log > 1) { - last_hg_log = time(NULL); - proxy_error("Hostgroup %u has no servers available!\n", hid); - } - } - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL\n"); - if (l>32) { - free(mysrvcCandidates); - } -#ifdef TEST_AURORA - array_mysrvc_cands += num_candidates; -#endif // TEST_AURORA - return NULL; // if we reach here, we couldn't find any target -} - -//unsigned int MySrvList::cnt() { -// return servers->len; -//} - -//MySrvC * MySrvList::idx(unsigned int i) { return (MySrvC *)servers->index(i); } - -void MySrvConnList::get_random_MyConn_inner_search(unsigned int start, unsigned int end, unsigned int& conn_found_idx, unsigned int& connection_quality_level, unsigned int& number_of_matching_session_variables, const MySQL_Connection * client_conn) { - char *schema = client_conn->userinfo->schemaname; - MySQL_Connection * conn=NULL; - unsigned int k; - for (k = start; k < end; k++) { - conn = (MySQL_Connection *)conns->index(k); - if (conn->match_tracked_options(client_conn)) { - if (connection_quality_level == 0) { - // this is our best candidate so far - connection_quality_level = 1; - conn_found_idx = k; - } - if (conn->requires_CHANGE_USER(client_conn)==false) { - if (connection_quality_level == 1) { - // this is our best candidate so far - connection_quality_level = 2; - conn_found_idx = k; - } - unsigned int cnt_match = 0; // number of matching session variables - unsigned int not_match = 0; // number of not matching session variables - cnt_match = conn->number_of_matching_session_variables(client_conn, not_match); - if (strcmp(conn->userinfo->schemaname,schema)==0) { - cnt_match++; - } else { - not_match++; - } - if (not_match==0) { - // it seems we found the perfect connection - number_of_matching_session_variables = cnt_match; - connection_quality_level = 3; - conn_found_idx = k; - return; // exit immediately, we found the perfect connection - } else { - // we didn't find the perfect connection - // but maybe is better than what we have so far? - if (cnt_match > number_of_matching_session_variables) { - // this is our best candidate so far - number_of_matching_session_variables = cnt_match; - conn_found_idx = k; - } - } - } else { - if (connection_quality_level == 1) { - int rca = mysql_thread___reset_connection_algorithm; - if (rca==1) { - int ql = GloMTH->variables.connpoll_reset_queue_length; - if (ql==0) { - // if: - // mysql-reset_connection_algorithm=1 and - // mysql-connpoll_reset_queue_length=0 - // we will not return a connection with connection_quality_level == 1 - // because we want to run COM_CHANGE_USER - // This change was introduced to work around Galera bug - // https://github.com/codership/galera/issues/613 - connection_quality_level = 0; - } - } - } - } - } - } -} - - - -MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff) { - MySQL_Connection * conn=NULL; - unsigned int i; - unsigned int conn_found_idx; - unsigned int l=conns_length(); - unsigned int connection_quality_level = 0; - bool needs_warming = false; - // connection_quality_level: - // 0 : not found any good connection, tracked options are not OK - // 1 : tracked options are OK , but CHANGE USER is required - // 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed - // 3 : tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required - unsigned int number_of_matching_session_variables = 0; // this includes session variables AND schema - bool connection_warming = mysql_thread___connection_warming; - int free_connections_pct = mysql_thread___free_connections_pct; - if (mysrvc->myhgc->attributes.configured == true) { - // mysql_hostgroup_attributes takes priority - connection_warming = mysrvc->myhgc->attributes.connection_warming; - free_connections_pct = mysrvc->myhgc->attributes.free_connections_pct; - } - if (connection_warming == true) { - unsigned int total_connections = mysrvc->ConnectionsFree->conns_length()+mysrvc->ConnectionsUsed->conns_length(); - unsigned int expected_warm_connections = free_connections_pct*mysrvc->max_connections/100; - if (total_connections < expected_warm_connections) { - needs_warming = true; - } - } - if (l && ff==false && needs_warming==false) { - if (l>32768) { - i=rand()%l; - } else { - i=fastrand()%l; - } - if (sess && sess->client_myds && sess->client_myds->myconn && sess->client_myds->myconn->userinfo) { - MySQL_Connection * client_conn = sess->client_myds->myconn; - get_random_MyConn_inner_search(i, l, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn); - if (connection_quality_level !=3 ) { // we didn't find the perfect connection - get_random_MyConn_inner_search(0, i, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn); - } - // connection_quality_level: - // 1 : tracked options are OK , but CHANGE USER is required - // 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed - switch (connection_quality_level) { - case 0: // not found any good connection, tracked options are not OK - // we must check if connections need to be freed before - // creating a new connection - { - unsigned int conns_free = mysrvc->ConnectionsFree->conns_length(); - unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length(); - unsigned int pct_max_connections = (3 * mysrvc->max_connections) / 4; - unsigned int connections_to_free = 0; - - if (conns_free >= 1) { - // connection cleanup is triggered when connections exceed 3/4 of the total - // allowed max connections, this cleanup ensures that at least *one connection* - // will be freed. - if (pct_max_connections <= (conns_free + conns_used)) { - connections_to_free = (conns_free + conns_used) - pct_max_connections; - if (connections_to_free == 0) connections_to_free = 1; - } - - while (conns_free && connections_to_free) { - MySQL_Connection* conn = mysrvc->ConnectionsFree->remove(0); - delete conn; - - conns_free = mysrvc->ConnectionsFree->conns_length(); - connections_to_free -= 1; - } - } - - // we must create a new connection - conn = new MySQL_Connection(); - conn->parent=mysrvc; - // if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa - conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG); - __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); - } - break; - case 1: //tracked options are OK , but CHANGE USER is required - // we may consider creating a new connection - { - unsigned int conns_free = mysrvc->ConnectionsFree->conns_length(); - unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length(); - if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) { - conn = new MySQL_Connection(); - conn->parent=mysrvc; - // if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa - conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG); - __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); - } else { - conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx); - } - } - break; - case 2: // tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed - case 3: // tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required - // here we return the best connection we have, no matter if connection_quality_level is 2 or 3 - conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx); - break; - default: // this should never happen - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } - } else { - conn=(MySQL_Connection *)conns->remove_index_fast(i); - } - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); - return conn; - } else { - unsigned long long curtime = monotonic_time(); - curtime = curtime / 1000 / 1000; // convert to second - MyHGC *_myhgc = mysrvc->myhgc; - if (curtime > _myhgc->current_time_now) { - _myhgc->current_time_now = curtime; - _myhgc->new_connections_now = 0; - } - _myhgc->new_connections_now++; - unsigned int throttle_connections_per_sec_to_hostgroup = (unsigned int) mysql_thread___throttle_connections_per_sec_to_hostgroup; - if (_myhgc->attributes.configured == true) { - // mysql_hostgroup_attributes takes priority - throttle_connections_per_sec_to_hostgroup = _myhgc->attributes.throttle_connections_per_sec; - } - if (_myhgc->new_connections_now > (unsigned int) throttle_connections_per_sec_to_hostgroup) { - __sync_fetch_and_add(&MyHGM->status.server_connections_delayed, 1); - return NULL; - } else { - conn = new MySQL_Connection(); - conn->parent=mysrvc; - // if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa - conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG); - __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); - return conn; - } - } - return NULL; // never reach here -} - void MySQL_HostGroups_Manager::unshun_server_all_hostgroups(const char * address, uint16_t port, time_t t, int max_wait_sec, unsigned int *skip_hid) { // we scan all hostgroups looking for a specific server to unshun // if skip_hid is not NULL , the specific hostgroup is skipped diff --git a/lib/MySrvC.cpp b/lib/MySrvC.cpp new file mode 100644 index 0000000000..94401e9016 --- /dev/null +++ b/lib/MySrvC.cpp @@ -0,0 +1,198 @@ +#include "MySQL_HostGroups_Manager.h" +/* +#include "proxysql.h" +#include "cpp.h" + +#include "MySQL_PreparedStatement.h" +#include "MySQL_Data_Stream.h" + +#include +#include +#include + +#include +#include +#include +#include + +#include "prometheus_helpers.h" +#include "proxysql_utils.h" + +#define char_malloc (char *)malloc +#define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); } + +#include "thread.h" +#include "wqueue.h" + +#include "ev.h" + +#include +#include +#include + +using std::function; + +#ifdef TEST_AURORA +static unsigned long long array_mysrvc_total = 0; +static unsigned long long array_mysrvc_cands = 0; +#endif // TEST_AURORA + +#define SAFE_SQLITE3_STEP(_stmt) do {\ + do {\ + rc=(*proxy_sqlite3_step)(_stmt);\ + if (rc!=SQLITE_DONE) {\ + assert(rc==SQLITE_LOCKED);\ + usleep(100);\ + }\ + } while (rc!=SQLITE_DONE);\ +} while (0) + +extern ProxySQL_Admin *GloAdmin; + +extern MySQL_Threads_Handler *GloMTH; + +extern MySQL_Monitor *GloMyMon; +*/ + +class MySrvConnList; +class MySrvC; +class MySrvList; +class MyHGC; + +MySrvC::MySrvC( + char* add, uint16_t p, uint16_t gp, int64_t _weight, enum MySerStatus _status, unsigned int _compression, + int64_t _max_connections, unsigned int _max_replication_lag, int32_t _use_ssl, unsigned int _max_latency_ms, + char* _comment +) { + address=strdup(add); + port=p; + gtid_port=gp; + weight=_weight; + status=_status; + compression=_compression; + max_connections=_max_connections; + max_replication_lag=_max_replication_lag; + use_ssl=_use_ssl; + cur_replication_lag=0; + cur_replication_lag_count=0; + max_latency_us=_max_latency_ms*1000; + current_latency_us=0; + aws_aurora_current_lag_us = 0; + connect_OK=0; + connect_ERR=0; + queries_sent=0; + bytes_sent=0; + bytes_recv=0; + max_connections_used=0; + queries_gtid_sync=0; + time_last_detected_error=0; + connect_ERR_at_time_last_detected_error=0; + shunned_automatic=false; + shunned_and_kill_all_connections=false; // false to default + //charset=_charset; + myhgc=NULL; + comment=strdup(_comment); + ConnectionsUsed=new MySrvConnList(this); + ConnectionsFree=new MySrvConnList(this); +} + +void MySrvC::connect_error(int err_num, bool get_mutex) { + // NOTE: this function operates without any mutex + // although, it is not extremely important if any counter is lost + // as a single connection failure won't make a significant difference + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Connect failed with code '%d'\n", err_num); + __sync_fetch_and_add(&connect_ERR,1); + __sync_fetch_and_add(&MyHGM->status.server_connections_aborted,1); + if (err_num >= 1048 && err_num <= 1052) + return; + if (err_num >= 1054 && err_num <= 1075) + return; + if (err_num >= 1099 && err_num <= 1104) + return; + if (err_num >= 1106 && err_num <= 1113) + return; + if (err_num >= 1116 && err_num <= 1118) + return; + if (err_num == 1136 || (err_num >= 1138 && err_num <= 1149)) + return; + switch (err_num) { + case 1007: // Can't create database + case 1008: // Can't drop database + case 1044: // access denied + case 1045: // access denied +/* + case 1048: // Column cannot be null + case 1049: // Unknown database + case 1050: // Table already exists + case 1051: // Unknown table + case 1052: // Column is ambiguous +*/ + case 1120: + case 1203: // User %s already has more than 'max_user_connections' active connections + case 1226: // User '%s' has exceeded the '%s' resource (current value: %ld) + case 3118: // Access denied for user '%s'. Account is locked.. + return; + break; + default: + break; + } + time_t t=time(NULL); + if (t > time_last_detected_error) { + time_last_detected_error=t; + connect_ERR_at_time_last_detected_error=1; + } else { + if (t < time_last_detected_error) { + // time_last_detected_error is in the future + // this means that monitor has a ping interval too big and tuned that in the future + return; + } + // same time + /** + * @brief The expected configured retries set by 'mysql-connect_retries_on_failure' + '2' extra expected + * connection errors. + * @details This two extra connections errors are expected: + * 1. An initial connection error generated by the datastream and the connection when being created, + * this is, right after the session has requested a connection to the connection pool. This error takes + * places directly in the state machine from 'MySQL_Connection'. Because of this, we consider this + * additional error to be a consequence of the two states machines, and it's not considered for + * 'connect_retries'. + * 2. A second connection connection error, which is the initial connection error generated by 'MySQL_Session' + * when already in the 'CONNECTING_SERVER' state. This error is an 'extra error' to always consider, since + * it's not part of the retries specified by 'mysql_thread___connect_retries_on_failure', thus, we set the + * 'connect_retries' to be 'mysql_thread___connect_retries_on_failure + 1'. + */ + int connect_retries = mysql_thread___connect_retries_on_failure + 1; + int max_failures = mysql_thread___shun_on_failures > connect_retries ? connect_retries : mysql_thread___shun_on_failures; + + if (__sync_add_and_fetch(&connect_ERR_at_time_last_detected_error,1) >= (unsigned int)max_failures) { + bool _shu=false; + if (get_mutex==true) + MyHGM->wrlock(); // to prevent race conditions, lock here. See #627 + if (status==MYSQL_SERVER_STATUS_ONLINE) { + status=MYSQL_SERVER_STATUS_SHUNNED; + shunned_automatic=true; + _shu=true; + } else { + _shu=false; + } + if (get_mutex==true) + MyHGM->wrunlock(); + if (_shu) { + proxy_error("Shunning server %s:%d with %u errors/sec. Shunning for %u seconds\n", address, port, connect_ERR_at_time_last_detected_error , mysql_thread___shun_recovery_time_sec); + } + } + } +} + +void MySrvC::shun_and_killall() { + status=MYSQL_SERVER_STATUS_SHUNNED; + shunned_automatic=true; + shunned_and_kill_all_connections=true; +} + +MySrvC::~MySrvC() { + if (address) free(address); + if (comment) free(comment); + delete ConnectionsUsed; + delete ConnectionsFree; +} diff --git a/lib/MySrvConnList.cpp b/lib/MySrvConnList.cpp new file mode 100644 index 0000000000..abe0c44ee7 --- /dev/null +++ b/lib/MySrvConnList.cpp @@ -0,0 +1,256 @@ +#include "MySQL_HostGroups_Manager.h" + +#include "MySQL_Data_Stream.h" + +extern ProxySQL_Admin *GloAdmin; + +extern MySQL_Threads_Handler *GloMTH; + +extern MySQL_Monitor *GloMyMon; + +class MySrvConnList; +class MySrvC; +class MySrvList; +class MyHGC; + +MySQL_Connection *MySrvConnList::index(unsigned int _k) { + return (MySQL_Connection *)conns->index(_k); +} + +MySQL_Connection * MySrvConnList::remove(int _k) { + return (MySQL_Connection *)conns->remove_index_fast(_k); +} + +MySrvConnList::MySrvConnList(MySrvC *_mysrvc) { + mysrvc=_mysrvc; + conns=new PtrArray(); +} + +void MySrvConnList::add(MySQL_Connection *c) { + conns->add(c); +} + +MySrvConnList::~MySrvConnList() { + mysrvc=NULL; + while (conns_length()) { + MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0); + delete conn; + } + delete conns; +} + +void MySrvConnList::drop_all_connections() { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Dropping all connections (%u total) on MySrvConnList %p for server %s:%d , hostgroup=%d , status=%d\n", conns_length(), this, mysrvc->address, mysrvc->port, mysrvc->myhgc->hid, mysrvc->status); + while (conns_length()) { + MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0); + delete conn; + } +} + +void MySrvConnList::get_random_MyConn_inner_search(unsigned int start, unsigned int end, unsigned int& conn_found_idx, unsigned int& connection_quality_level, unsigned int& number_of_matching_session_variables, const MySQL_Connection * client_conn) { + char *schema = client_conn->userinfo->schemaname; + MySQL_Connection * conn=NULL; + unsigned int k; + for (k = start; k < end; k++) { + conn = (MySQL_Connection *)conns->index(k); + if (conn->match_tracked_options(client_conn)) { + if (connection_quality_level == 0) { + // this is our best candidate so far + connection_quality_level = 1; + conn_found_idx = k; + } + if (conn->requires_CHANGE_USER(client_conn)==false) { + if (connection_quality_level == 1) { + // this is our best candidate so far + connection_quality_level = 2; + conn_found_idx = k; + } + unsigned int cnt_match = 0; // number of matching session variables + unsigned int not_match = 0; // number of not matching session variables + cnt_match = conn->number_of_matching_session_variables(client_conn, not_match); + if (strcmp(conn->userinfo->schemaname,schema)==0) { + cnt_match++; + } else { + not_match++; + } + if (not_match==0) { + // it seems we found the perfect connection + number_of_matching_session_variables = cnt_match; + connection_quality_level = 3; + conn_found_idx = k; + return; // exit immediately, we found the perfect connection + } else { + // we didn't find the perfect connection + // but maybe is better than what we have so far? + if (cnt_match > number_of_matching_session_variables) { + // this is our best candidate so far + number_of_matching_session_variables = cnt_match; + conn_found_idx = k; + } + } + } else { + if (connection_quality_level == 1) { + int rca = mysql_thread___reset_connection_algorithm; + if (rca==1) { + int ql = GloMTH->variables.connpoll_reset_queue_length; + if (ql==0) { + // if: + // mysql-reset_connection_algorithm=1 and + // mysql-connpoll_reset_queue_length=0 + // we will not return a connection with connection_quality_level == 1 + // because we want to run COM_CHANGE_USER + // This change was introduced to work around Galera bug + // https://github.com/codership/galera/issues/613 + connection_quality_level = 0; + } + } + } + } + } + } +} + + + +MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff) { + MySQL_Connection * conn=NULL; + unsigned int i; + unsigned int conn_found_idx; + unsigned int l=conns_length(); + unsigned int connection_quality_level = 0; + bool needs_warming = false; + // connection_quality_level: + // 0 : not found any good connection, tracked options are not OK + // 1 : tracked options are OK , but CHANGE USER is required + // 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed + // 3 : tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required + unsigned int number_of_matching_session_variables = 0; // this includes session variables AND schema + bool connection_warming = mysql_thread___connection_warming; + int free_connections_pct = mysql_thread___free_connections_pct; + if (mysrvc->myhgc->attributes.configured == true) { + // mysql_hostgroup_attributes takes priority + connection_warming = mysrvc->myhgc->attributes.connection_warming; + free_connections_pct = mysrvc->myhgc->attributes.free_connections_pct; + } + if (connection_warming == true) { + unsigned int total_connections = mysrvc->ConnectionsFree->conns_length()+mysrvc->ConnectionsUsed->conns_length(); + unsigned int expected_warm_connections = free_connections_pct*mysrvc->max_connections/100; + if (total_connections < expected_warm_connections) { + needs_warming = true; + } + } + if (l && ff==false && needs_warming==false) { + if (l>32768) { + i=rand()%l; + } else { + i=fastrand()%l; + } + if (sess && sess->client_myds && sess->client_myds->myconn && sess->client_myds->myconn->userinfo) { + MySQL_Connection * client_conn = sess->client_myds->myconn; + get_random_MyConn_inner_search(i, l, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn); + if (connection_quality_level !=3 ) { // we didn't find the perfect connection + get_random_MyConn_inner_search(0, i, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn); + } + // connection_quality_level: + // 1 : tracked options are OK , but CHANGE USER is required + // 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed + switch (connection_quality_level) { + case 0: // not found any good connection, tracked options are not OK + // we must check if connections need to be freed before + // creating a new connection + { + unsigned int conns_free = mysrvc->ConnectionsFree->conns_length(); + unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length(); + unsigned int pct_max_connections = (3 * mysrvc->max_connections) / 4; + unsigned int connections_to_free = 0; + + if (conns_free >= 1) { + // connection cleanup is triggered when connections exceed 3/4 of the total + // allowed max connections, this cleanup ensures that at least *one connection* + // will be freed. + if (pct_max_connections <= (conns_free + conns_used)) { + connections_to_free = (conns_free + conns_used) - pct_max_connections; + if (connections_to_free == 0) connections_to_free = 1; + } + + while (conns_free && connections_to_free) { + MySQL_Connection* conn = mysrvc->ConnectionsFree->remove(0); + delete conn; + + conns_free = mysrvc->ConnectionsFree->conns_length(); + connections_to_free -= 1; + } + } + + // we must create a new connection + conn = new MySQL_Connection(); + conn->parent=mysrvc; + // if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa + conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG); + __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); + } + break; + case 1: //tracked options are OK , but CHANGE USER is required + // we may consider creating a new connection + { + unsigned int conns_free = mysrvc->ConnectionsFree->conns_length(); + unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length(); + if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) { + conn = new MySQL_Connection(); + conn->parent=mysrvc; + // if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa + conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG); + __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); + } else { + conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx); + } + } + break; + case 2: // tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed + case 3: // tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required + // here we return the best connection we have, no matter if connection_quality_level is 2 or 3 + conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx); + break; + default: // this should never happen + // LCOV_EXCL_START + assert(0); + break; + // LCOV_EXCL_STOP + } + } else { + conn=(MySQL_Connection *)conns->remove_index_fast(i); + } + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); + return conn; + } else { + unsigned long long curtime = monotonic_time(); + curtime = curtime / 1000 / 1000; // convert to second + MyHGC *_myhgc = mysrvc->myhgc; + if (curtime > _myhgc->current_time_now) { + _myhgc->current_time_now = curtime; + _myhgc->new_connections_now = 0; + } + _myhgc->new_connections_now++; + unsigned int throttle_connections_per_sec_to_hostgroup = (unsigned int) mysql_thread___throttle_connections_per_sec_to_hostgroup; + if (_myhgc->attributes.configured == true) { + // mysql_hostgroup_attributes takes priority + throttle_connections_per_sec_to_hostgroup = _myhgc->attributes.throttle_connections_per_sec; + } + if (_myhgc->new_connections_now > (unsigned int) throttle_connections_per_sec_to_hostgroup) { + __sync_fetch_and_add(&MyHGM->status.server_connections_delayed, 1); + return NULL; + } else { + conn = new MySQL_Connection(); + conn->parent=mysrvc; + // if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa + conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG); + __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); + return conn; + } + } + return NULL; // never reach here +} + diff --git a/lib/MySrvList.cpp b/lib/MySrvList.cpp new file mode 100644 index 0000000000..22ed9b1427 --- /dev/null +++ b/lib/MySrvList.cpp @@ -0,0 +1,44 @@ +#include "MySQL_HostGroups_Manager.h" + +class MySrvConnList; +class MySrvC; +class MySrvList; +class MyHGC; + +MySrvList::MySrvList(MyHGC *_myhgc) { + myhgc=_myhgc; + servers=new PtrArray(); +} + +void MySrvList::add(MySrvC *s) { + if (s->myhgc==NULL) { + s->myhgc=myhgc; + } + servers->add(s); +} + + +int MySrvList::find_idx(MySrvC *s) { + for (unsigned int i=0; ilen; i++) { + MySrvC *mysrv=(MySrvC *)servers->index(i); + if (mysrv==s) { + return (unsigned int)i; + } + } + return -1; +} + +void MySrvList::remove(MySrvC *s) { + int i=find_idx(s); + assert(i>=0); + servers->remove_index_fast((unsigned int)i); +} + +MySrvList::~MySrvList() { + myhgc=NULL; + while (servers->len) { + MySrvC *mysrvc=(MySrvC *)servers->remove_index_fast(0); + delete mysrvc; + } + delete servers; +}