From 6691a867faa5db46ade3273bf0d367503616638e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 20 Jul 2019 01:16:07 +1000 Subject: [PATCH] Fix issues/FR #2120 , #2121 and #2125 Issue #2120 : Send SESSION_TRACK_GTIDS to client Issue #2121 : Track CLIENT_FOUND_ROWS required by the client Issue #2125 : Track CLIENT_MULTI_STATEMENTS required by the client --- include/MySQL_Session.h | 4 +++ include/MySQL_Thread.h | 1 + include/mysql_backend.h | 2 +- include/mysql_connection.h | 2 ++ include/proxysql_structs.h | 2 ++ lib/MySQL_HostGroups_Manager.cpp | 47 +++++++++++++++++++++------- lib/MySQL_Protocol.cpp | 49 ++++++++++++++++++++++++++--- lib/MySQL_Session.cpp | 24 ++++++++++++++- lib/MySQL_Thread.cpp | 18 +++++++++++ lib/mysql_connection.cpp | 53 ++++++++++++++++++++++++++------ 10 files changed, 176 insertions(+), 26 deletions(-) diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index ac370cbe49..f3492ad7e3 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -190,6 +190,10 @@ class MySQL_Session bool with_gtid; + char gtid_buf[128]; + //uint64_t gtid_trxid; + int gtid_hid; + MySQL_STMTs_meta *sess_STMTs_meta; StmtLongDataHandler *SLDH; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index b0a7d7ca46..eef645b53e 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -458,6 +458,7 @@ class MySQL_Threads_Handler bool stats_time_query_processor; bool query_cache_stores_empty_result; bool kill_backend_connection_when_disconnect; + bool client_session_track_gtid; } variables; struct { unsigned int mirror_sessions_current; diff --git a/include/mysql_backend.h b/include/mysql_backend.h index 855b2e05eb..4b8860e901 100644 --- a/include/mysql_backend.h +++ b/include/mysql_backend.h @@ -10,7 +10,7 @@ class MySQL_Backend void * operator new(size_t); void operator delete(void *); int hostgroup_id; - char gtid_uuid[64]; + char gtid_uuid[128]; uint64_t gtid_trxid; MySQL_Data_Stream *server_myds; // mysql_cp_entry_t *server_mycpe; diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 2b5aead880..86e79fff6f 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -200,5 +200,7 @@ class MySQL_Connection { bool get_gtid(char *buff, uint64_t *trx_id); void reduce_auto_increment_delay_token() { if (auto_increment_delay_token) auto_increment_delay_token--; }; + + bool match_tracked_options(MySQL_Connection *c); }; #endif /* __CLASS_MYSQL_CONNECTION_H */ diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 8e7ec0733c..3610cacaa7 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -674,6 +674,7 @@ __thread bool mysql_thread___sessions_sort; __thread bool mysql_thread___session_idle_ms; __thread int mysql_thread___hostgroup_manager_verbose; __thread bool mysql_thread___kill_backend_connection_when_disconnect; +__thread bool mysql_thread___client_session_track_gtid; /* variables used for Query Cache */ __thread int mysql_thread___query_cache_size_MB; @@ -805,6 +806,7 @@ extern __thread bool mysql_thread___sessions_sort; extern __thread bool mysql_thread___session_idle_ms; extern __thread int mysql_thread___hostgroup_manager_verbose; extern __thread bool mysql_thread___kill_backend_connection_when_disconnect; +extern __thread bool mysql_thread___client_session_track_gtid; /* variables used for Query Cache */ extern __thread int mysql_thread___query_cache_size_MB; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index e595408996..275646ddbf 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -2614,37 +2614,60 @@ MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff // try to match schemaname AND username char *schema = sess->client_myds->myconn->userinfo->schemaname; char *username = sess->client_myds->myconn->userinfo->username; + MySQL_Connection * client_conn = sess->client_myds->myconn; bool conn_found = false; unsigned int k; + unsigned int options_matching_idx; + bool options_matching_found = false; for (k = i; conn_found == false && k < l; k++) { conn = (MySQL_Connection *)conns->index(k); - if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) { - conn_found = true; - i = k; + if (conn->match_tracked_options(client_conn)) { + if (options_matching_found == false) { + options_matching_found = true; + options_matching_idx = k; + } + if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) { + conn_found = true; + i = k; + } } } if (conn_found == false ) { for (k = 0; conn_found == false && k < i; k++) { conn = (MySQL_Connection *)conns->index(k); - if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) { - conn_found = true; - i = k; + if (conn->match_tracked_options(client_conn)) { + if (options_matching_found == false) { + options_matching_found = true; + options_matching_idx = k; + } + if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) { + conn_found = true; + i = k; + } } } } if (conn_found == true) { conn=(MySQL_Connection *)conns->remove_index_fast(i); } else { - // we may consider creating a new connection - unsigned int conns_free = mysrvc->ConnectionsFree->conns_length(); - unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length(); - if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) { + if (options_matching_found == false) { + // we must create a new connection conn = new MySQL_Connection(); conn->parent=mysrvc; __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); } else { - conn=(MySQL_Connection *)conns->remove_index_fast(i); + // we may consider creating a new connection + unsigned int conns_free = mysrvc->ConnectionsFree->conns_length(); + unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length(); + if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) { + conn = new MySQL_Connection(); + conn->parent=mysrvc; + __sync_fetch_and_add(&MyHGM->status.server_connections_created, 1); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); + } else { + conn=(MySQL_Connection *)conns->remove_index_fast(i); + } } } } else { @@ -3016,6 +3039,8 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Free_Connections() { j["charset"] = _my->charset->nr; j["options"]["charset_name"] = _my->options.charset_name; j["options"]["use_ssl"] = _my->options.use_ssl; + j["client_flag"]["client_found_rows"] = (_my->client_flag & CLIENT_FOUND_ROWS); + j["client_flag"]["client_multi_statements"] = (_my->client_flag & CLIENT_MULTI_STATEMENTS); j["net"]["last_errno"] = _my->net.last_errno; j["net"]["fd"] = _my->net.fd; j["net"]["max_packet_size"] = _my->net.max_packet_size; diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 677fc5a7e0..9ce0f4b666 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -609,14 +609,40 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u char msg_prefix; uint8_t msg_len_len=mysql_encode_length(msg_len, &msg_prefix); + bool client_session_track=false; + //char gtid_buf[128]; + char gtid_prefix; + uint8_t gtid_len=0; + uint8_t gtid_len_len=0; + mysql_hdr myhdr; myhdr.pkt_id=sequence_id; myhdr.pkt_length=1+affected_rows_len+last_insert_id_len+sizeof(uint16_t)+sizeof(uint16_t)+msg_len; if (msg_len) myhdr.pkt_length+=msg_len_len; - unsigned int size=myhdr.pkt_length+sizeof(mysql_hdr); - unsigned char *_ptr=(unsigned char *)l_alloc(size); - memcpy(_ptr, &myhdr, sizeof(mysql_hdr)); - int l=sizeof(mysql_hdr); + + if (*myds && (*myds)->myconn) { + if ((*myds)->myconn->options.client_flag & CLIENT_SESSION_TRACKING) { + if (mysql_thread___client_session_track_gtid) { + if (sess) { + if (sess->gtid_hid >= 0) { + myhdr.pkt_length++; + client_session_track=true; + gtid_len = strlen(sess->gtid_buf); + gtid_len_len = mysql_encode_length(gtid_len, >id_prefix); + myhdr.pkt_length += gtid_len_len; + myhdr.pkt_length += gtid_len; + myhdr.pkt_length += 4; // headers related to GTID + } + } + } + } + } + + + unsigned int size=myhdr.pkt_length+sizeof(mysql_hdr); + unsigned char *_ptr=(unsigned char *)l_alloc(size); + memcpy(_ptr, &myhdr, sizeof(mysql_hdr)); + int l=sizeof(mysql_hdr); _ptr[l]=0x00; l++; l+=write_encoded_length(_ptr+l, affected_rows, affected_rows_len, affected_rows_prefix); l+=write_encoded_length(_ptr+l, last_insert_id, last_insert_id_len, last_insert_id_prefix); @@ -647,6 +673,21 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u l+=write_encoded_length(_ptr+l, msg_len, msg_len_len, msg_prefix); memcpy(_ptr+l, msg, msg_len); } + l+=msg_len; + if (client_session_track == true) { + _ptr[l]=0x00; l++; + if (gtid_len) { + unsigned char gtid_prefix_h1 = gtid_prefix+2; + unsigned char state_change_prefix = gtid_prefix_h1+2; + _ptr[l] = state_change_prefix; l++; + _ptr[l]=0x03; l++; // SESSION_TRACK_GTIDS + _ptr[l] = gtid_prefix_h1; l++; + _ptr[l]=0x00; l++; + // l+=write_encoded_length(_ptr+l, gtid_len, gtid_len_len, gtid_prefix); // overcomplicated + _ptr[l] = gtid_prefix; l++; + memcpy(_ptr+l, sess->gtid_buf, gtid_len); + } + } if (send==true) { (*myds)->PSarrayOUT->add((void *)_ptr,size); switch ((*myds)->DSS) { diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 36904be0e5..b724766d42 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -422,6 +422,10 @@ MySQL_Session::MySQL_Session() { with_gtid = false; use_ssl = false; + //gtid_trxid = 0; + gtid_hid = -1; + memset(gtid_buf,0,sizeof(gtid_buf)); + match_regexes=NULL; /* match_regexes=(Session_Regex **)malloc(sizeof(Session_Regex *)*3); @@ -464,6 +468,12 @@ void MySQL_Session::reset() { mybes=NULL; } mybe=NULL; + + with_gtid = false; + + //gtid_trxid = 0; + gtid_hid = -1; + memset(gtid_buf,0,sizeof(gtid_buf)); } MySQL_Session::~MySQL_Session() { @@ -839,6 +849,8 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) { j["autocommit_on_hostgroup"] = autocommit_on_hostgroup; j["last_insert_id"] = last_insert_id; j["last_HG_affected_rows"] = last_HG_affected_rows; + j["gtid"]["hid"] = gtid_hid; + j["gtid"]["last"] = ( strlen(gtid_buf) ? gtid_buf : "" ); j["client"]["userinfo"]["username"] = ( client_myds->myconn->userinfo->username ? client_myds->myconn->userinfo->username : "" ); #ifdef DEBUG j["client"]["userinfo"]["password"] = ( client_myds->myconn->userinfo->password ? client_myds->myconn->userinfo->password : "" ); @@ -860,6 +872,7 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) { j["conn"]["charset"] = client_myds->myconn->options.charset; j["conn"]["sql_log_bin"] = client_myds->myconn->options.sql_log_bin; j["conn"]["autocommit"] = client_myds->myconn->options.autocommit; + j["conn"]["client_flag"] = client_myds->myconn->options.client_flag; j["conn"]["no_backslash_escapes"] = client_myds->myconn->options.no_backslash_escapes; j["conn"]["status"]["compression"] = client_myds->myconn->get_status_compression(); j["conn"]["status"]["transaction"] = client_myds->myconn->get_status_transaction(); @@ -869,6 +882,7 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) { _mybe=(MySQL_Backend *)mybes->index(k); unsigned int i = _mybe->hostgroup_id; j["backends"][i]["hostgroup_id"] = i; + j["backends"][i]["gtid"] = ( strlen(_mybe->gtid_uuid) ? _mybe->gtid_uuid : "" ); if (_mybe->server_myds) { MySQL_Data_Stream *_myds=_mybe->server_myds; sprintf(buff,"%p",_myds); @@ -887,6 +901,7 @@ void MySQL_Session::generate_proxysql_internal_session_json(json &j) { MySQL_Connection * _myconn = _myds->myconn; sprintf(buff,"%p",_myconn); j["backends"][i]["conn"]["address"] = buff; + j["backends"][i]["conn"]["auto_increment_delay_token"] = _myconn->auto_increment_delay_token; j["backends"][i]["conn"]["bytes_recv"] = _myconn->bytes_info.bytes_recv; j["backends"][i]["conn"]["bytes_sent"] = _myconn->bytes_info.bytes_sent; j["backends"][i]["conn"]["questions"] = _myconn->statuses.questions; @@ -3350,8 +3365,15 @@ int MySQL_Session::handler() { (endt.tv_sec*1000000000+endt.tv_nsec) - (begint.tv_sec*1000000000+begint.tv_nsec); } + gtid_hid = -1; if (rc==0) { - myconn->get_gtid(mybe->gtid_uuid,&mybe->gtid_trxid); + if (myconn->get_gtid(mybe->gtid_uuid,&mybe->gtid_trxid)) { + if (mysql_thread___client_session_track_gtid) { + gtid_hid = current_hostgroup; + memcpy(gtid_buf,mybe->gtid_uuid,sizeof(gtid_buf)); + } + } + // check if multiplexing needs to be disabled char *qdt=CurrentQuery.get_digest_text(); if (qdt) diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index cb8280362f..feb0bf765c 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -302,6 +302,7 @@ static char * mysql_thread_variables_names[]= { (char *)"server_version", (char *)"keep_multiplexing_variables", (char *)"kill_backend_connection_when_disconnect", + (char *)"client_session_track_gtid", (char *)"sessions_sort", #ifdef IDLE_THREADS (char *)"session_idle_show_processlist", @@ -461,6 +462,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.stats_time_query_processor=false; variables.query_cache_stores_empty_result=true; variables.kill_backend_connection_when_disconnect=true; + variables.client_session_track_gtid=true; variables.sessions_sort=true; #ifdef IDLE_THREADS variables.session_idle_ms=1000; @@ -804,6 +806,7 @@ int MySQL_Threads_Handler::get_variable_int(const char *name) { if (!strcmp(name,"stats_time_query_processor")) return (int)variables.stats_time_query_processor; if (!strcmp(name,"query_cache_stores_empty_result")) return (int)variables.query_cache_stores_empty_result; if (!strcmp(name,"kill_backend_connection_when_disconnect")) return (int)variables.kill_backend_connection_when_disconnect; + if (!strcmp(name,"client_session_track_gtid")) return (int)variables.client_session_track_gtid; if (!strcmp(name,"sessions_sort")) return (int)variables.sessions_sort; #ifdef IDLE_THREADS if (!strcmp(name,"session_idle_show_processlist")) return (int)variables.session_idle_show_processlist; @@ -1270,6 +1273,9 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f if (!strcasecmp(name,"kill_backend_connection_when_disconnect")) { return strdup((variables.kill_backend_connection_when_disconnect ? "true" : "false")); } + if (!strcasecmp(name,"client_session_track_gtid")) { + return strdup((variables.client_session_track_gtid ? "true" : "false")); + } if (!strcasecmp(name,"sessions_sort")) { return strdup((variables.sessions_sort ? "true" : "false")); } @@ -2501,6 +2507,17 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t } return false; } + if (!strcasecmp(name,"client_session_track_gtid")) { + if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { + variables.client_session_track_gtid=true; + return true; + } + if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) { + variables.client_session_track_gtid=false; + return true; + } + return false; + } if (!strcasecmp(name,"servers_stats")) { if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { variables.servers_stats=true; @@ -3934,6 +3951,7 @@ void MySQL_Thread::refresh_variables() { variables.query_cache_stores_empty_result=(bool)GloMTH->get_variable_int((char *)"query_cache_stores_empty_result"); mysql_thread___hostgroup_manager_verbose = GloMTH->get_variable_int((char *)"hostgroup_manager_verbose"); mysql_thread___kill_backend_connection_when_disconnect=(bool)GloMTH->get_variable_int((char *)"kill_backend_connection_when_disconnect"); + mysql_thread___client_session_track_gtid=(bool)GloMTH->get_variable_int((char *)"client_session_track_gtid"); mysql_thread___sessions_sort=(bool)GloMTH->get_variable_int((char *)"sessions_sort"); #ifdef IDLE_THREADS mysql_thread___session_idle_show_processlist=(bool)GloMTH->get_variable_int((char *)"session_idle_show_processlist"); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index c688b7ad5e..0509c71882 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -451,6 +451,19 @@ bool MySQL_Connection::get_status_sql_log_bin0() { return status_flags & STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0; } +bool MySQL_Connection::match_tracked_options(MySQL_Connection *c) { + uint16_t cf1 = options.client_flag; // own client flags + uint16_t cf2 = c->options.client_flag; // other client flags + if ((cf1 & CLIENT_FOUND_ROWS) == (cf2 & CLIENT_FOUND_ROWS)) { + if ((cf1 & CLIENT_MULTI_STATEMENTS) == (cf2 & CLIENT_MULTI_STATEMENTS)) { + if ((cf1 & CLIENT_IGNORE_SPACE) == (cf2 & CLIENT_IGNORE_SPACE)) { + return true; + } + } + } + return false; +} + // non blocking API void MySQL_Connection::connect_start() { PROXY_TRACE(); @@ -470,12 +483,32 @@ void MySQL_Connection::connect_start() { } mysql_options(mysql, MYSQL_SET_CHARSET_NAME, c->csname); unsigned long client_flags = 0; - if (mysql_thread___client_found_rows) - client_flags += CLIENT_FOUND_ROWS; + //if (mysql_thread___client_found_rows) + // client_flags += CLIENT_FOUND_ROWS; if (parent->compression) client_flags += CLIENT_COMPRESS; - if (mysql_thread___client_multi_statements) - client_flags += CLIENT_MULTI_STATEMENTS; + //if (mysql_thread___client_multi_statements) + // client_flags += CLIENT_MULTI_STATEMENTS; + + if (myds) { + if (myds->sess) { + if (myds->sess->client_myds) { + if (myds->sess->client_myds->myconn) { + uint16_t orig_client_flags = myds->sess->client_myds->myconn->options.client_flag; + if (orig_client_flags & CLIENT_FOUND_ROWS) { + client_flags += CLIENT_FOUND_ROWS; + } + if (orig_client_flags & CLIENT_MULTI_STATEMENTS) { + client_flags += CLIENT_MULTI_STATEMENTS; + } + if (orig_client_flags & CLIENT_IGNORE_SPACE) { + client_flags += CLIENT_IGNORE_SPACE; + } + } + } + } + } + char *auth_password=NULL; if (userinfo->password) { if (userinfo->password[0]=='*') { // we don't have the real password, let's pass sha1 @@ -1980,11 +2013,13 @@ bool MySQL_Connection::get_gtid(char *buff, uint64_t *trx_id) { const char *data; size_t length; if (mysql_session_track_get_first(mysql, SESSION_TRACK_GTIDS, &data, &length) == 0) { - memcpy(buff,data,length); - buff[length]=0; - //fprintf(stderr,"GTID=%s\n",buff); - __sync_fetch_and_add(&myds->sess->thread->status_variables.gtid_session_collected,1); - ret = true; + if (memcmp(buff,data,length)) { + memcpy(buff,data,length); + buff[length]=0; + //fprintf(stderr,"GTID=%s\n",buff); + __sync_fetch_and_add(&myds->sess->thread->status_variables.gtid_session_collected,1); + ret = true; + } } } return ret;