From f4a0c4a2b5e58d1ad503251c12ba6149c5f65914 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 16 Jan 2018 18:28:17 +0100 Subject: [PATCH] ProxySQL reads GTID information from proxysql_mysqlbinlog This is the first commit to pull data from proxysql_mysqlbinlog. It is still in Alpha phase, as it is missing a log of important logics, like error handling, retry mechanism, timeouts, etc --- include/MySQL_HostGroups_Manager.h | 213 ++++++++++++++++++++- include/proxysql_admin.h | 1 + include/proxysql_gtid.h | 14 +- lib/Makefile | 6 +- lib/MySQL_HostGroups_Manager.cpp | 290 +++++++++++++++++++++++++++++ lib/ProxySQL_Admin.cpp | 65 ++++++- src/main.cpp | 1 + 7 files changed, 583 insertions(+), 7 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 3e909da1d8..12fe5a29d0 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -5,10 +5,14 @@ #include "proxysql_gtid.h" #include +#include #include "thread.h" #include "wqueue.h" + +#include "ev.h" + /* Enabling STRESSTEST_POOL ProxySQL will do a lot of loops in the connection pool This is for internal testing ONLY!!!! @@ -43,6 +47,201 @@ enum MySerStatus { MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG }; + +std::string gtid_executed_to_string(gtid_set_t& gtid_executed); +void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed); + +class GTID_Server_Data { + public: + char *address; + uint16_t port; + uint16_t mysql_port; + char *data; + size_t len; + size_t size; + size_t pos; + struct ev_io *w; + char uuid_server[64]; + gtid_set_t gtid_executed; + bool active; + GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) { + active = true; + w = _w; + size = 1024; // 1KB buffer + data = (char *)malloc(size); + uuid_server[0] = 0; + pos = 0; + len = 0; + address = strdup(_address); + port = _port; + mysql_port = _mysql_port; + } + void resize(size_t _s) { + char *data_ = (char *)malloc(_s); + memcpy(data_, data, (_s > size ? size : _s)); + size = _s; + free(data); + data = data_; + } + ~GTID_Server_Data() { + free(address); + free(data); + } + bool readall() { + bool ret = true; + if (size == len) { + // buffer is full, expand + resize(len*2); + } + int rc = 0; + rc = read(w->fd,data+len,size-len); + if (rc > 0) { + len += rc; + } else { + int myerr = errno; + if ( + (rc == 0) || + (rc==-1 && myerr != EINTR && myerr != EAGAIN) + ) { + ret = false; + } + } + return ret; + } + bool writeout() { + bool ret = true; + if (len==0) { + return ret; + } + int rc = 0; + rc = write(w->fd,data+pos,len-pos); + if (rc > 0) { + pos += rc; + if (pos >= len/2) { + memmove(data,data+pos,len-pos); + len = len-pos; + pos = 0; + } + } + return ret; + } + bool read_next_gtid() { + if (len==0) { + return false; + } + void *nlp = NULL; + nlp = memchr(data+pos,'\n',len-pos); + if (nlp == NULL) { + return false; + } + int l = (char *)nlp - (data+pos); + char rec_msg[80]; + if (strncmp(data+pos,(char *)"ST=",3)==0) { + // we are reading the bootstrap + char *bs = (char *)malloc(l+1-3); // length + 1 (null byte) - 3 (header) + memcpy(bs,data+pos+3,l+1-3); + char *saveptr1=NULL; + char *saveptr2=NULL; + //char *saveptr3=NULL; + char *token = NULL; + char *subtoken = NULL; + //char *subtoken2 = NULL; + char *str1 = NULL; + char *str2 = NULL; + //char *str3 = NULL; + for (str1 = bs; ; str1 = NULL) { + token = strtok_r(str1, ",", &saveptr1); + if (token == NULL) { + break; + } + int j = 0; + for (str2 = token; ; str2 = NULL) { + subtoken = strtok_r(str2, ":", &saveptr2); + if (subtoken == NULL) { + break; + } + j++; + if (j%2 == 1) { // we are reading the uuid + char *p = uuid_server; + for (unsigned int k=0; k= len/2) { + memmove(data,data+pos,len-pos); + len = len-pos; + pos = 0; + } + } +}; + + + class MySrvConnList { private: PtrArray *conns; @@ -184,10 +383,16 @@ class MySQL_HostGroups_Manager { std::thread *HGCU_thread; - std::unordered_map gtid_map; - pthread_rwlock_t gtid_rwlock; + std::thread *GTID_syncer_thread; + //pthread_t GTID_syncer_thread_id; + //pthread_t HGCU_thread_id; + public: + pthread_rwlock_t gtid_rwlock; + std::unordered_map gtid_map; + struct ev_async * gtid_ev_async; + struct ev_loop * gtid_ev_loop; struct { unsigned int servers_table_version; pthread_mutex_t servers_table_version_lock; @@ -220,6 +425,7 @@ class MySQL_HostGroups_Manager { wqueue queue; MySQL_HostGroups_Manager(); ~MySQL_HostGroups_Manager(); + void init(); void wrlock(); void wrunlock(); bool server_add(unsigned int hid, char *add, uint16_t p=3306, uint16_t gp=0, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, unsigned int _comp=0, unsigned int _max_connections=100, unsigned int _max_replication_lag=0, unsigned int _use_ssl=0, unsigned int _max_latency_ms=0, char *comment=NULL); @@ -258,6 +464,9 @@ class MySQL_HostGroups_Manager { void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); void converge_group_replication_config(int _writer_hostgroup); + + SQLite3_result * get_stats_mysql_gtid_executed(); + void generate_mysql_gtid_executed_tables(); }; #endif /* __CLASS_MYSQL_HOSTGROUPS_MANAGER_H */ diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index e689f23ccc..6fecd86d12 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -243,6 +243,7 @@ class ProxySQL_Admin { void stats___proxysql_servers_checksums(); void stats___proxysql_servers_metrics(); void stats___mysql_prepared_statements_info(); + void stats___mysql_gtid_executed(); int Read_Global_Variables_from_configfile(const char *prefix); int Read_MySQL_Users_from_configfile(); diff --git a/include/proxysql_gtid.h b/include/proxysql_gtid.h index bace8aaeff..b2e11614ce 100644 --- a/include/proxysql_gtid.h +++ b/include/proxysql_gtid.h @@ -10,12 +10,24 @@ typedef std::pair gtid_t; typedef std::pair gtid_interval_t; typedef std::unordered_map> gtid_set_t; +/* class Gtid_Server_Info { + public: gtid_set_t executed_gtid_set; - char *address; + char *hostname; uint16_t mysql_port; uint16_t gtid_port; bool active; + Gtid_Server_Info(char *_h, uint16_t _mp, uint16_t _gp) { + hostname = strdup(_h); + mysql_port = _mp; + gtid_port = _gp; + active = true; + }; + ~Gtid_Server_Info() { + free(hostname); + }; }; +*/ #endif /* PROXYSQL_GTID */ diff --git a/lib/Makefile b/lib/Makefile index b122483c19..0bd9edb8b2 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -32,9 +32,13 @@ MICROHTTPD_IDIR=-I$(MICROHTTPD_DIR) -I$(MICROHTTPD_DIR)/src/include CURL_DIR=$(DEPS_PATH)/curl/curl CURL_IDIR=-I$(CURL_DIR)/include +EV_DIR=$(DEPS_PATH)/libev/libev/ +EV_IDIR=$(EV_DIR) + + IDIR=../include -IDIRS=-I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) $(MICROHTTPD_IDIR) $(CURL_IDIR) +IDIRS=-I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) $(MICROHTTPD_IDIR) $(CURL_IDIR) -I$(EV_DIR) LDIRS=-L$(JEMALLOC_PATH)/lib -L$(RE2_PATH)/obj -L$(INJECTION_PATH) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index de09198b98..83f334035d 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -8,6 +8,8 @@ #include "thread.h" #include "wqueue.h" +#include "ev.h" + #define SAFE_SQLITE3_STEP(_stmt) do {\ do {\ rc=sqlite3_step(_stmt);\ @@ -29,6 +31,20 @@ class MySrvC; class MySrvList; class MyHGC; +//static struct ev_async * gtid_ev_async; + +static pthread_mutex_t ev_loop_mutex; + +//static std::unordered_map gtid_map; + +static void gtid_async_cb(struct ev_loop *loop, struct ev_async *watcher, int revents) { + pthread_mutex_lock(&ev_loop_mutex); + MyHGM->generate_mysql_gtid_executed_tables(); + pthread_mutex_unlock(&ev_loop_mutex); + return; +} + + static int wait_for_mysql(MYSQL *mysql, int status) { struct pollfd pfd; int timeout, res; @@ -53,6 +69,173 @@ static int wait_for_mysql(MYSQL *mysql, int status) { } } +void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { + pthread_mutex_lock(&ev_loop_mutex); + if (revents & EV_READ) { + GTID_Server_Data *sd = (GTID_Server_Data *)w->data; + bool rc = true; + rc = sd->readall(); + if (rc == false) { + delete sd; + ev_io_stop(MyHGM->gtid_ev_loop, w); + free(w); + } else { + sd->dump(); + } + } + pthread_mutex_unlock(&ev_loop_mutex); +} + +void connect_cb(EV_P_ ev_io *w, int revents) { + struct ev_io * c = w; + if (revents & EV_WRITE) { + int optval = 0; + socklen_t optlen = sizeof(optval); + if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) || + (optval != 0)) { + /* Connection failed; try the next address in the list. */ + int errnum = optval ? optval : errno; + ev_io_stop(MyHGM->gtid_ev_loop, w); + close(w->fd); + GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data; + delete custom_data; + free(c); + } else { + ev_io_stop(MyHGM->gtid_ev_loop, w); + int fd=w->fd; + free(w); + struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io)); + ev_io_init(new_w, reader_cb, fd, EV_READ); + ev_io_start(MyHGM->gtid_ev_loop, new_w); + } + } +} + +struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) { + struct sockaddr_in a; + int s; + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + perror("socket"); + close(s); + return NULL; + } + memset(&a, 0, sizeof(a)); + a.sin_port = htons(gtid_port); + a.sin_family = AF_INET; + if (!inet_aton(address, (struct in_addr *) &a.sin_addr.s_addr)) { + perror("bad IP address format"); + close(s); + return NULL; + } + ioctl_FIONBIO(s,1); + + int status = connect(s, (struct sockaddr *) &a, sizeof(a)); + if ((status == 0) || ((status == -1) && (errno == EINPROGRESS))) { + struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io)); + if (c) { + ev_io_init(c, connect_cb, s, EV_WRITE); + GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port); + c->data = (void *)custom_data; + return c; + } + /* else error */ + } + return NULL; +} + + +std::string gtid_executed_to_string(gtid_set_t& gtid_executed) { + std::string gtid_set; + for (auto it=gtid_executed.begin(); it!=gtid_executed.end(); ++it) { + std::string s = it->first; + s.insert(8,"-"); + s.insert(13,"-"); + s.insert(18,"-"); + s.insert(23,"-"); + s = s + ":"; + for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) { + std::string s2 = s; + s2 = s2 + std::to_string(itr->first); + s2 = s2 + "-"; + s2 = s2 + std::to_string(itr->second); + s2 = s2 + ","; + gtid_set = gtid_set + s2; + } + } + gtid_set.pop_back(); + return gtid_set; +} + + + +void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) { + auto it = gtid_executed.find(gtid.first); + if (it == gtid_executed.end()) + { + gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second); + return; + } + + bool flag = true; + for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) + { + if (gtid.second >= itr->first && gtid.second <= itr->second) + return; + if (gtid.second + 1 == itr->first) + { + --itr->first; + flag = false; + break; + } + else if (gtid.second == itr->second + 1) + { + ++itr->second; + flag = false; + break; + } + else if (gtid.second < itr->first) + { + it->second.emplace(itr, gtid.second, gtid.second); + return; + } + } + + if (flag) + it->second.emplace_back(gtid.second, gtid.second); + + for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) + { + auto next_itr = std::next(itr); + if (next_itr != it->second.end() && itr->second + 1 == next_itr->first) + { + itr->second = next_itr->second; + it->second.erase(next_itr); + break; + } + } +} + +static void * GTID_syncer_run() { + //struct ev_loop * gtid_ev_loop; + //gtid_ev_loop = NULL; + MyHGM->gtid_ev_loop = ev_loop_new (EVBACKEND_POLL | EVFLAG_NOENV); + if (MyHGM->gtid_ev_loop == NULL) { + proxy_error("could not initialise GTID sync loop\n"); + exit(EXIT_FAILURE); + } + MyHGM->gtid_ev_async = (struct ev_async *)malloc(sizeof(struct ev_async)); + //ev_async_init(gtid_ev_async, gtid_async_cb); + //ev_async_start(gtid_ev_loop, gtid_ev_async); + ev_async_init(MyHGM->gtid_ev_async, gtid_async_cb); + ev_async_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_async); + //ev_ref(gtid_ev_loop); + ev_run(MyHGM->gtid_ev_loop, 0); + //sleep(1000); + return NULL; +} + +//static void * HGCU_thread_run() { static void * HGCU_thread_run() { PtrArray *conn_array=new PtrArray(); while(1) { @@ -380,12 +563,28 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() { MyHostGroups=new PtrArray(); incoming_replication_hostgroups=NULL; incoming_group_replication_hostgroups=NULL; + pthread_rwlock_init(>id_rwlock, NULL); + gtid_ev_async = (struct ev_async *)malloc(sizeof(struct ev_async)); +} +void MySQL_HostGroups_Manager::init() { + //conn_reset_queue = NULL; + //conn_reset_queue = new wqueue(); HGCU_thread = new std::thread(&HGCU_thread_run); + //pthread_create(&HGCU_thread_id, NULL, HGCU_thread_run , NULL); + + // gtid initialization; + GTID_syncer_thread = new std::thread(>ID_syncer_run); + + //pthread_create(>ID_syncer_thread_id, NULL, GTID_syncer_run , NULL); } MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() { queue.add(NULL); HGCU_thread->join(); + //pthread_join(HGCU_thread_id, NULL); + //pthread_join(GTID_syncer_thread_id, NULL); + GTID_syncer_thread->join(); + free(gtid_ev_async); while (MyHostGroups->len) { MyHGC *myhgc=(MyHGC *)MyHostGroups->remove_index_fast(0); delete myhgc; @@ -866,6 +1065,8 @@ bool MySQL_HostGroups_Manager::commit() { pthread_mutex_unlock(&GloVars.checksum_mutex); } + ev_async_send(gtid_ev_loop, gtid_ev_async); + __sync_fetch_and_add(&status.servers_table_version,1); pthread_cond_broadcast(&status.servers_table_version_cond); pthread_mutex_unlock(&status.servers_table_version_lock); @@ -877,6 +1078,64 @@ bool MySQL_HostGroups_Manager::commit() { } +void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { + pthread_rwlock_wrlock(>id_rwlock); + // first, set them all as active = false + std::unordered_map::iterator it = gtid_map.begin(); + while(it != gtid_map.end()) { + GTID_Server_Data * gtid_si = it->second; + gtid_si->active = false; + it++; + } + + for (unsigned int i=0; ilen; i++) { + MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); + MySrvC *mysrvc=NULL; + for (unsigned int j=0; jmysrvs->servers->len; j++) { + mysrvc=myhgc->mysrvs->idx(j); + if (mysrvc->gtid_port) { + std::string s1 = mysrvc->address; + s1.append(":"); + s1.append(std::to_string(mysrvc->port)); + std::unordered_map ::iterator it2; + it2 = gtid_map.find(s1); + GTID_Server_Data *gtid_is=NULL; + if (it2!=gtid_map.end()) { + gtid_is=it2->second; + gtid_is->active = true; + } else { + // we didn't find it. Create it + /* + struct ev_io *watcher = (struct ev_io *)malloc(sizeof(struct ev_io)); + gtid_is = new GTID_Server_Data(watcher, mysrvc->address, mysrvc->port, mysrvc->gtid_port); + gtid_map.emplace(s1,gtid_is); + */ + struct ev_io * c = NULL; + c = new_connector(mysrvc->address, mysrvc->gtid_port, mysrvc->port); + if (c) { + gtid_is = (GTID_Server_Data *)c->data; + gtid_map.emplace(s1,gtid_is); + ev_io_start(MyHGM->gtid_ev_loop,c); + } + } + } + } + } + std::vector to_remove; + it = gtid_map.begin(); + while(it != gtid_map.end()) { + GTID_Server_Data * gtid_si = it->second; + if (gtid_si->active == false) { + to_remove.push_back(it->first); + } + it++; + } + for (std::vector::iterator it3=to_remove.begin(); it3!=to_remove.end(); ++it3) { + gtid_map.erase(*it3); + } + pthread_rwlock_unlock(>id_rwlock); +} + void MySQL_HostGroups_Manager::purge_mysql_servers_table() { for (unsigned int i=0; ilen; i++) { MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); @@ -2600,3 +2859,34 @@ void MySQL_HostGroups_Manager::converge_group_replication_config(int _writer_hos } pthread_mutex_unlock(&Group_Replication_Info_mutex); } + +SQLite3_result * MySQL_HostGroups_Manager::get_stats_mysql_gtid_executed() { + const int colnum = 3; + SQLite3_result * result = new SQLite3_result(colnum); + result->add_column_definition(SQLITE_TEXT,"hostname"); + result->add_column_definition(SQLITE_TEXT,"port"); + result->add_column_definition(SQLITE_TEXT,"gtid_executed"); + int k; + pthread_rwlock_wrlock(>id_rwlock); + std::unordered_map::iterator it = gtid_map.begin(); + while(it != gtid_map.end()) { + GTID_Server_Data * gtid_si = it->second; + char buf[64]; + char **pta=(char **)malloc(sizeof(char *)*colnum); + pta[0]=strdup(gtid_si->address); + sprintf(buf,"%d", (int)gtid_si->mysql_port); + pta[1]=strdup(buf); + //sprintf(buf,"%d", mysrvc->port); + string s1 = gtid_executed_to_string(gtid_si->gtid_executed); + pta[2]=strdup(s1.c_str()); + result->add_row(pta); + for (k=0; k=0) NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )" // mysql_servers in v2.0.0 -#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS_V2_0_0 "CREATE TABLE mysql_servers (hostgroup_id INT CHECK (hostgroup_id>=0) NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , gtid_port INT NOT NULL DEFAULT 0 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )" +#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS_V2_0_0 "CREATE TABLE mysql_servers (hostgroup_id INT CHECK (hostgroup_id>=0) NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , gtid_port INT CHECK (gtid_port <> port) NOT NULL DEFAULT 0 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )" #define ADMIN_SQLITE_TABLE_MYSQL_SERVERS ADMIN_SQLITE_TABLE_MYSQL_SERVERS_V2_0_0 @@ -276,6 +276,8 @@ static int http_handler(void *cls, struct MHD_Connection *connection, const char #define STATS_SQLITE_TABLE_MEMORY_METRICS "CREATE TABLE stats_memory_metrics (Variable_Name VARCHAR NOT NULL PRIMARY KEY , Variable_Value VARCHAR NOT NULL)" +#define STATS_SQLITE_TABLE_MYSQL_GTID_EXECUTED "CREATE TABLE stats_mysql_gtid_executed (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , gtid_executed VARCHAR)" + #ifdef DEBUG #define ADMIN_SQLITE_TABLE_DEBUG_LEVELS "CREATE TABLE debug_levels (module VARCHAR NOT NULL PRIMARY KEY , verbosity INT NOT NULL DEFAULT 0)" #endif /* DEBUG */ @@ -1849,6 +1851,7 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign bool stats_mysql_commands_counters=false; bool stats_mysql_query_rules=false; bool stats_mysql_users=false; + bool stats_mysql_gtid_executed=false; bool dump_global_variables=false; bool runtime_scheduler=false; @@ -1898,6 +1901,8 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign { stats_mysql_query_rules=true; refresh=true; } if (strstr(query_no_space,"stats_mysql_users")) { stats_mysql_users=true; refresh=true; } + if (strstr(query_no_space,"stats_mysql_gtid_executed")) + { stats_mysql_gtid_executed=true; refresh=true; } if (strstr(query_no_space,"stats_proxysql_servers_checksums")) { stats_proxysql_servers_checksums = true; refresh = true; } @@ -1975,6 +1980,8 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign stats___mysql_commands_counters(); if (stats_mysql_users) stats___mysql_users(); + if (stats_mysql_gtid_executed) + stats___mysql_gtid_executed(); // cluster if (stats_proxysql_servers_metrics) { @@ -3483,6 +3490,7 @@ bool ProxySQL_Admin::init() { insert_into_tables_defs(tables_defs_stats,"stats_mysql_query_digest", STATS_SQLITE_TABLE_MYSQL_QUERY_DIGEST); insert_into_tables_defs(tables_defs_stats,"stats_mysql_query_digest_reset", STATS_SQLITE_TABLE_MYSQL_QUERY_DIGEST_RESET); insert_into_tables_defs(tables_defs_stats,"stats_mysql_global", STATS_SQLITE_TABLE_MYSQL_GLOBAL); + insert_into_tables_defs(tables_defs_stats,"stats_mysql_gtid_executed", STATS_SQLITE_TABLE_MYSQL_GTID_EXECUTED); insert_into_tables_defs(tables_defs_stats,"stats_memory_metrics", STATS_SQLITE_TABLE_MEMORY_METRICS); insert_into_tables_defs(tables_defs_stats,"stats_mysql_users", STATS_SQLITE_TABLE_MYSQL_USERS); insert_into_tables_defs(tables_defs_stats,"global_variables", ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES); // workaround for issue #708 @@ -6483,6 +6491,57 @@ void ProxySQL_Admin::stats___mysql_users() { free(ads); } +void ProxySQL_Admin::stats___mysql_gtid_executed() { + int i; + statsdb->execute("DELETE FROM stats_mysql_gtid_executed"); + SQLite3_result *resultset=NULL; + resultset = MyHGM->get_stats_mysql_gtid_executed(); + if (resultset) { + int rc; + sqlite3_stmt *statement1=NULL; + sqlite3_stmt *statement32=NULL; + sqlite3 *mydb3=statsdb->get_db(); + char *query1=NULL; + char *query32=NULL; + query1=(char *)"INSERT INTO stats_mysql_gtid_executed VALUES (?1, ?2, ?3)"; + query32=(char *)"INSERT INTO stats_mysql_gtid_executed VALUES (?1, ?2, ?3), (?4, ?5, ?6), (?7, ?8, ?9), (?10, ?11, ?12), (?13, ?14, ?15), (?16, ?17, ?18), (?19, ?20, ?21), (?22, ?23, ?24), (?25, ?26, ?27), (?28, ?29, ?30), (?31, ?32, ?33), (?34, ?35, ?36), (?37, ?38, ?39), (?40, ?41, ?42), (?43, ?44, ?45), (?46, ?47, ?48), (?49, ?50, ?51), (?52, ?53, ?54), (?55, ?56, ?57), (?58, ?59, ?60), (?61, ?62, ?63), (?64, ?65, ?66), (?67, ?68, ?69), (?70, ?71, ?72), (?73, ?74, ?75), (?76, ?77, ?78), (?79, ?80, ?81), (?82, ?83, ?84), (?85, ?86, ?87), (?88, ?89, ?90), (?91, ?92, ?93), (?94, ?95, ?96)"; + + rc=sqlite3_prepare_v2(mydb3, query1, -1, &statement1, 0); + assert(rc==SQLITE_OK); + rc=sqlite3_prepare_v2(mydb3, query32, -1, &statement32, 0); + assert(rc==SQLITE_OK); + int row_idx=0; + int max_bulk_row_idx=resultset->rows_count/32; + max_bulk_row_idx=max_bulk_row_idx*32; + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r1=*it; + int idx=row_idx%32; + if (row_idxfields[0], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement32, (idx*3)+2, atoi(r1->fields[1])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement32, (idx*3)+3, r1->fields[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + if (idx==31) { + SAFE_SQLITE3_STEP(statement32); + rc=sqlite3_clear_bindings(statement32); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement32); assert(rc==SQLITE_OK); + } + } else { // single row + rc=sqlite3_bind_text(statement1, 1, r1->fields[0], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement1, 2, atoi(r1->fields[1])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement1, 3, r1->fields[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement1); + rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK); + } + row_idx++; + } + sqlite3_finalize(statement1); + sqlite3_finalize(statement32); + delete resultset; + resultset = NULL; + } +} + void ProxySQL_Admin::save_scheduler_runtime_to_database(bool _runtime) { char *query=NULL; if (_runtime) { @@ -7374,7 +7433,7 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() { } } server.lookupValue("port", port); - server.lookupValue("gtid_port", port); + server.lookupValue("gtid_port", gtid_port); if (server.lookupValue("hostgroup", hostgroup)==false) continue; server.lookupValue("status", status); if ( @@ -7395,7 +7454,7 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() { char *o1=strdup(comment.c_str()); char *o=escape_string_single_quotes(o1, false); char *query=(char *)malloc(strlen(q)+strlen(status.c_str())+strlen(address.c_str())+strlen(o)+128); - sprintf(query,q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag, use_ssl, max_latency_ms, o); + sprintf(query,q, address.c_str(), port, gtid_port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag, use_ssl, max_latency_ms, o); //fprintf(stderr, "%s\n", query); admindb->execute(query); if (o!=o1) free(o); diff --git a/src/main.cpp b/src/main.cpp index 693d13a17f..2e54acd264 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -402,6 +402,7 @@ void ProxySQL_Main_init_main_modules() { } MyHGM=new MySQL_HostGroups_Manager(); + MyHGM->init(); GloMTH=new MySQL_Threads_Handler(); GloMyLogger = new MySQL_Logger(); GloMyStmt=new MySQL_STMT_Manager_v14();