diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index a616dfb5bf..8b22ecdade 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -53,6 +53,17 @@ class MySQL_Monitor_State_Data { int handler(int fd, short event); void next_event(int new_st, int status); void unregister(); + SQLite3DB *mondb; + // we are copying these from MySQL_Connection +// short wait_events; +// unsigned long long timeout; + MDB_ASYNC_ST async_state_machine; + int async_exit_status; +// int async_ping(short); +// void ping_start(); +// void ping_cont(short event); +// MDB_ASYNC_ST handler2(short); +// void next_event(MDB_ASYNC_ST new_st); }; @@ -73,6 +84,7 @@ class MySQL_Monitor { ~MySQL_Monitor(); void print_version(); void * monitor_connect(); +// void * monitor_connect_thread(void *); void * monitor_ping(); void * monitor_read_only(); void * monitor_replication_lag(); diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index bfd74423e8..14c8507e27 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -23,6 +23,10 @@ static MySQL_Monitor *GloMyMon; #define NEXT_IMMEDIATE(new_st) do { ST= new_st; goto again; } while (0) +/* +#define NEXT_IMMEDIATE2(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) +*/ + #define SAFE_SQLITE3_STEP(_stmt) do {\ do {\ rc=sqlite3_step(_stmt);\ @@ -35,6 +39,48 @@ static MySQL_Monitor *GloMyMon; static void state_machine_handler(int fd, short event, void *arg); + +static int wait_for_mysql(MYSQL *mysql, int status) { + struct pollfd pfd; + int timeout, res; + + pfd.fd = mysql_get_socket(mysql); + pfd.events = + (status & MYSQL_WAIT_READ ? POLLIN : 0) | + (status & MYSQL_WAIT_WRITE ? POLLOUT : 0) | + (status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0); + timeout = 10; + res = poll(&pfd, 1, timeout); + if (res == 0) + return MYSQL_WAIT_TIMEOUT; + else if (res < 0) + return MYSQL_WAIT_TIMEOUT; + else { + int status = 0; + if (pfd.revents & POLLIN) status |= MYSQL_WAIT_READ; + if (pfd.revents & POLLOUT) status |= MYSQL_WAIT_WRITE; + if (pfd.revents & POLLPRI) status |= MYSQL_WAIT_EXCEPT; + return status; + } +} + +/* +static int +mysql_status2(short event, short cont) { + int status= 0; + if (event & POLLIN) + status|= MYSQL_WAIT_READ; + if (event & POLLOUT) + status|= MYSQL_WAIT_WRITE; +// if (event==0 && cont==true) { +// status |= MYSQL_WAIT_TIMEOUT; +// } +// FIXME: handle timeout +// if (event & PROXY_TIMEOUT) +// status|= MYSQL_WAIT_TIMEOUT; + return status; +} +*/ static void close_mysql(MYSQL *my) { if (my->net.vio) { char buff[5]; @@ -71,9 +117,9 @@ struct state_data { */ static int connect__num_active_connections; -static int total_connect__num_active_connections=0; +//static int total_connect__num_active_connections=0; static int ping__num_active_connections; -static int total_ping__num_active_connections=0; +//static int total_ping__num_active_connections=0; static int replication_lag__num_active_connections; static int total_replication_lag__num_active_connections=0; static int read_only__num_active_connections; @@ -763,15 +809,183 @@ void MySQL_Monitor::check_and_build_standard_tables(SQLite3DB *db, std::vectorexecute("PRAGMA foreign_keys = ON"); }; +void * monitor_connect_thread(void *arg) { + MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Thread * mysql_thr = new MySQL_Thread(); + mysql_thr->curtime=monotonic_time(); + mysql_thr->refresh_variables(); + + mmsd->mysql=mysql_init(NULL); + assert(mmsd->mysql); + if (mmsd->use_ssl) { + mysql_ssl_set(mmsd->mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher); + } + unsigned int timeout=mysql_thread___monitor_connect_timeout/1000; + mysql_options(mmsd->mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + mysql_options(mmsd->mysql, MYSQL_OPT_READ_TIMEOUT, &timeout); + mysql_options(mmsd->mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout); + MYSQL *myrc=NULL; + if (mmsd->port) { + myrc=mysql_real_connect(mmsd->mysql, mmsd->hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, mmsd->port, NULL, 0); + } else { + myrc=mysql_real_connect(mmsd->mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, mmsd->hostname, 0); + } + if (myrc==NULL) { + mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); + } + + unsigned long long start_time=mysql_thr->curtime; + mmsd->t1=start_time; + mmsd->t2=monotonic_time(); + + sqlite3_stmt *statement; + sqlite3 *mondb=mmsd->mondb->get_db(); + int rc; + char *query=NULL; + query=(char *)"INSERT OR REPLACE INTO mysql_server_connect_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; + rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); + assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + sqlite3_finalize(statement); + + mysql_close(mmsd->mysql); + delete mysql_thr; + return NULL; +} + +void * monitor_ping_thread(void *arg) { + MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Thread * mysql_thr = new MySQL_Thread(); + mysql_thr->curtime=monotonic_time(); + mysql_thr->refresh_variables(); + + mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port); + unsigned long long start_time=mysql_thr->curtime; + + mmsd->t1=start_time; + + if (mmsd->mysql==NULL) { // we don't have a connection, let's create it + mmsd->mysql=mysql_init(NULL); + assert(mmsd->mysql); + if (mmsd->use_ssl) { + mysql_ssl_set(mmsd->mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher); + } + unsigned int timeout=mysql_thread___monitor_connect_timeout/1000; + mysql_options(mmsd->mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + mysql_options(mmsd->mysql, MYSQL_OPT_READ_TIMEOUT, &timeout); + mysql_options(mmsd->mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout); + MYSQL *myrc=NULL; + if (mmsd->port) { + myrc=mysql_real_connect(mmsd->mysql, mmsd->hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, mmsd->port, NULL, 0); + } else { + myrc=mysql_real_connect(mmsd->mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, mmsd->hostname, 0); + } + if (myrc==NULL) { + mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); + goto __exit_monitor_ping_thread; + } else { + // mariadb client library disables NONBLOCK for SSL connections ... re-enable it! + mysql_options(mmsd->mysql, MYSQL_OPT_NONBLOCK, 0); + int f=fcntl(mmsd->mysql->net.fd, F_GETFL); +#ifdef FD_CLOEXEC + // asynchronously set also FD_CLOEXEC , this to prevent then when a fork happens the FD are duplicated to new process + fcntl(mmsd->mysql->net.fd, F_SETFL, f|O_NONBLOCK|FD_CLOEXEC); +#else + fcntl(mmsd->mysql->net.fd, F_SETFL, f|O_NONBLOCK); +#endif /* FD_CLOEXEC */ + } + } + + mmsd->t1=monotonic_time(); + //async_exit_status=mysql_change_user_start(&ret_bool, mysql,"msandbox2","msandbox2","information_schema"); + mmsd->async_exit_status=mysql_ping_start(&mmsd->interr,mmsd->mysql); + while (mmsd->async_exit_status) { + mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + mmsd->async_exit_status=mysql_ping_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status); + unsigned long long now=monotonic_time(); + if (now > mmsd->t1 + mysql_thread___monitor_ping_timeout * 1000) { + mmsd->mysql_error_msg=strdup("timeout during ping"); + goto __exit_monitor_ping_thread; + } + if (GloMyMon->shutdown==true) { + goto __fast_exit_monitor_ping_thread; // exit immediately + } + } + if (mmsd->interr) { // ping failed + mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); + } else { + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + mmsd->mysql=NULL; + } + mmsd->t2=monotonic_time(); + +__exit_monitor_ping_thread: + mmsd->t2=monotonic_time(); + { + sqlite3_stmt *statement; + sqlite3 *mondb=mmsd->mondb->get_db(); + int rc; + char *query=NULL; + query=(char *)"INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; + rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); + assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + sqlite3_finalize(statement); +/* + free(sds); + query=(char *)"INSERT OR REPLACE INTO mysql_server_connect_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; + rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); + assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + sqlite3_finalize(statement); +*/ + } +__fast_exit_monitor_ping_thread: + if (mmsd->mysql) { + mysql_close(mmsd->mysql); // if we reached here we didn't put the connection back + } + delete mysql_thr; + return NULL; +} + void * MySQL_Monitor::monitor_connect() { // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) - struct event_base *libevent_base; + //struct event_base *libevent_base; unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); mysql_thr->refresh_variables(); + + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); +// pthread_attr_setstacksize (&attr, 192*1024); + + unsigned long long t1; unsigned long long t2; unsigned long long next_loop_at=0; @@ -782,9 +996,6 @@ void * MySQL_Monitor::monitor_connect() { int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; - int i=0; - MySQL_Monitor_State_Data **sds=NULL; - //char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers"; // add support for SSL char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers GROUP BY hostname, port"; unsigned int glover; @@ -802,13 +1013,7 @@ void * MySQL_Monitor::monitor_connect() { } next_loop_at=t1+1000*mysql_thread___monitor_connect_interval; - struct timeval tv_out; - evutil_gettimeofday(&tv_out, NULL); - start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); - - connect__num_active_connections=0; - // create libevent base - libevent_base= event_base_new(); + start_time=monotonic_time(); proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); @@ -820,24 +1025,20 @@ void * MySQL_Monitor::monitor_connect() { if (resultset->rows_count==0) { goto __end_monitor_connect_loop; } - sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *)); for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; - sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2])); - sds[i]->task_id=MON_CONNECT; - connect__num_active_connections++; - total_connect__num_active_connections++; - state_machine_handler(-1,-1,sds[i]); - i++; + MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); + mmsd->mondb=monitordb; + pthread_t thr_; + if ( pthread_create(&thr_, &attr, monitor_connect_thread, (void *)mmsd) != 0 ) { + perror("Thread creation monitor_connect_thread"); + } } } - // start libevent loop - event_base_dispatch(libevent_base); - __end_monitor_connect_loop: - if (sds) { + /* if (sds) */ { sqlite3_stmt *statement; sqlite3 *mondb=monitordb->get_db(); int rc; @@ -854,8 +1055,149 @@ void * MySQL_Monitor::monitor_connect() { rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); sqlite3_finalize(statement); + } + if (resultset) + delete resultset; + +__sleep_monitor_connect_loop: + t2=monotonic_time(); + if (t2 500000) { + st = 500000; + } + usleep(st); + } + } + if (mysql_thr) { + delete mysql_thr; + mysql_thr=NULL; + } + return NULL; +} + + +void * MySQL_Monitor::monitor_ping() { + // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) +// struct event_base *libevent_base; + unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; + MySQL_Thread * mysql_thr = new MySQL_Thread(); + mysql_thr->curtime=monotonic_time(); + MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); + mysql_thr->refresh_variables(); + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); +// pthread_attr_setstacksize (&attr, 192*1024); + + unsigned long long t1; + unsigned long long t2; + unsigned long long start_time; + unsigned long long next_loop_at=0; + //unsigned int t1; + //unsigned int t2; + //t1=monotonic_time(); + + while (shutdown==false) { + + unsigned int glover; + char *error=NULL; + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; +// MySQL_Monitor_State_Data **sds=NULL; + //int i=0; + //char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE status!='OFFLINE_HARD'"; + // add support for SSL + char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers WHERE status!='OFFLINE_HARD' GROUP BY hostname, port"; + t1=monotonic_time(); + + glover=GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; + mysql_thr->refresh_variables(); + next_loop_at=0; + } + + if (t1 < next_loop_at) { + goto __sleep_monitor_ping_loop; + } + next_loop_at=t1+1000*mysql_thread___monitor_ping_interval; + +/* + struct timeval tv_out; + evutil_gettimeofday(&tv_out, NULL); + start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); +*/ + start_time=monotonic_time(); +// ping__num_active_connections=0; +// // create libevent base +// libevent_base= event_base_new(); + + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + goto __end_monitor_ping_loop; + } else { + if (resultset->rows_count==0) { + goto __end_monitor_ping_loop; + } +// sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *)); + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + MySQL_Monitor_State_Data *mmsd = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); +/* + sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2])); + sds[i]->task_id=MON_PING; + ping__num_active_connections++; + total_ping__num_active_connections++; + MySQL_Monitor_State_Data *_mmsd=sds[i]; +*/ +// mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port); + mmsd->mondb=monitordb; + pthread_t thr_; + if ( pthread_create(&thr_, &attr, monitor_ping_thread, (void *)mmsd) != 0 ) { + perror("Thread creation monitor_ping_thread"); + } +/* + if (_mmsd->mysql==NULL) { + state_machine_handler(-1,-1,_mmsd); + } else { + int fd=mysql_get_socket(_mmsd->mysql); + _mmsd->ST=7; + state_machine_handler(fd,-1,_mmsd); + } + i++; +*/ + } + } + + // start libevent loop +// event_base_dispatch(libevent_base); - query=(char *)"INSERT OR REPLACE INTO mysql_server_connect_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; +__end_monitor_ping_loop: + /* if (sds) */ { + sqlite3_stmt *statement; + sqlite3 *mondb=monitordb->get_db(); + int rc; + char *query=NULL; + query=(char *)"DELETE FROM mysql_server_ping_log WHERE time_start < ?1"; + rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); + assert(rc==SQLITE_OK); + if (mysql_thread___monitor_history < mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 )) { // issue #626 + if (mysql_thread___monitor_ping_interval < 3600000) + mysql_thread___monitor_history = mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 ); + } + rc=sqlite3_bind_int64(statement, 1, start_time-mysql_thread___monitor_history*1000); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + sqlite3_finalize(statement); +/* + query=(char *)"INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); assert(rc==SQLITE_OK); while (i>0) { @@ -873,13 +1215,120 @@ void * MySQL_Monitor::monitor_connect() { } sqlite3_finalize(statement); free(sds); +*/ } - if (resultset) + + if (resultset) { delete resultset; + resultset=NULL; + } - event_base_free(libevent_base); +// event_base_free(libevent_base); -__sleep_monitor_connect_loop: + // now it is time to shun all problematic hosts + query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NOT NULL"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + // get all addresses and ports + int i=0; + int j=0; + char **addresses=(char **)malloc(resultset->rows_count * sizeof(char *)); + char **ports=(char **)malloc(resultset->rows_count * sizeof(char *)); + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + addresses[i]=strdup(r->fields[0]); + ports[i]=strdup(r->fields[1]); + i++; + } + if (resultset) { + delete resultset; + resultset=NULL; + } + char *new_query=NULL; + new_query=(char *)"SELECT 1 FROM (SELECT hostname,port,ping_error FROM mysql_server_ping_log WHERE hostname='%s' AND port='%s' ORDER BY time_start DESC LIMIT %d) a WHERE ping_error IS NOT NULL GROUP BY hostname,port HAVING COUNT(*)=%d"; + for (j=0;jexecute_statement(buff, &error , &cols , &affected_rows , &resultset); + if (!error) { + if (resultset) { + if (resultset->rows_count) { + // disable host + proxy_error("Server %s:%s missed %d heartbeats, shunning it and killing all the connections\n", addresses[j], ports[j], max_failures); + MyHGM->shun_and_killall(addresses[j],atoi(ports[j])); + } + delete resultset; + resultset=NULL; + } + } else { + proxy_error("Error on %s : %s\n", query, error); + } + free(buff); + } + + while (i) { // now free all the addresses/ports + i--; + free(addresses[i]); + free(ports[i]); + } + free(addresses); + free(ports); + } + + + // now it is time to update current_lantency_ms + query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NULL"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + // get all addresses and ports + int i=0; + int j=0; + char **addresses=(char **)malloc(resultset->rows_count * sizeof(char *)); + char **ports=(char **)malloc(resultset->rows_count * sizeof(char *)); + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + addresses[i]=strdup(r->fields[0]); + ports[i]=strdup(r->fields[1]); + i++; + } + if (resultset) { + delete resultset; + resultset=NULL; + } + char *new_query=NULL; + + new_query=(char *)"SELECT hostname,port,COALESCE(CAST(AVG(ping_success_time) AS INTEGER),10000) FROM (SELECT hostname,port,ping_success_time,ping_error FROM mysql_server_ping_log WHERE hostname='%s' AND port='%s' ORDER BY time_start DESC LIMIT 3) a WHERE ping_error IS NULL GROUP BY hostname,port"; + for (j=0;jexecute_statement(buff, &error , &cols , &affected_rows , &resultset); + if (!error) { + if (resultset) { + if (resultset->rows_count) { + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; // this should be called just once, but we create a generic for loop + // update current_latency_ms + MyHGM->set_server_current_latency_us(addresses[j],atoi(ports[j]), atoi(r->fields[2])); + } + } + delete resultset; + resultset=NULL; + } + } else { + proxy_error("Error on %s : %s\n", query, error); + } + free(buff); + } + } + +__sleep_monitor_ping_loop: t2=monotonic_time(); if (t2refresh_variables(); std::thread * monitor_connect_thread = new std::thread(&MySQL_Monitor::monitor_connect,this); std::thread * monitor_ping_thread = new std::thread(&MySQL_Monitor::monitor_ping,this); - std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this); - std::thread * monitor_replication_lag_thread = new std::thread(&MySQL_Monitor::monitor_replication_lag,this); +// std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this); +// std::thread * monitor_replication_lag_thread = new std::thread(&MySQL_Monitor::monitor_replication_lag,this); while (shutdown==false) { unsigned int glover=GloMTH->get_global_version(); if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { @@ -1544,11 +1994,137 @@ void * MySQL_Monitor::run() { } monitor_connect_thread->join(); monitor_ping_thread->join(); - monitor_read_only_thread->join(); - monitor_replication_lag_thread->join(); +// monitor_read_only_thread->join(); +// monitor_replication_lag_thread->join(); if (mysql_thr) { delete mysql_thr; mysql_thr=NULL; } return NULL; }; + + + +/* +MDB_ASYNC_ST MySQL_Monitor_State_Data::handler2(short event) { + +handler_again: + switch (async_state_machine) { + case ASYNC_PING_START: + ping_start(); + if (async_exit_status) { + next_event(ASYNC_PING_CONT); + } else { + NEXT_IMMEDIATE2(ASYNC_PING_END); + } + break; + case ASYNC_PING_CONT: +// assert(myds->sess->status==PINGING_SERVER); + if (event) { + ping_cont(event); + } +// if (async_exit_status) { +// if (myds->sess->thread->curtime >= myds->wait_until) { +// NEXT_IMMEDIATE(ASYNC_PING_TIMEOUT); +// } else { +// next_event(ASYNC_PING_CONT); +// } +// } else { +// NEXT_IMMEDIATE(ASYNC_PING_END); +// } + break; + case ASYNC_PING_END: + if (interr) { + NEXT_IMMEDIATE2(ASYNC_PING_FAILED); + } else { + NEXT_IMMEDIATE2(ASYNC_PING_SUCCESSFUL); + } + break; + case ASYNC_PING_SUCCESSFUL: + break; + case ASYNC_PING_FAILED: + break; + case ASYNC_PING_TIMEOUT: + break; + default: + break; + } + return async_state_machine; +} + +int MySQL_Monitor_State_Data::async_ping(short event) { + PROXY_TRACE(); + assert(mysql); + switch (async_state_machine) { + case ASYNC_PING_SUCCESSFUL: + async_state_machine=ASYNC_IDLE; + return 0; + break; + case ASYNC_PING_FAILED: + return -1; + break; + case ASYNC_PING_TIMEOUT: + return -2; + break; + case ASYNC_IDLE: + async_state_machine=ASYNC_PING_START; + default: + handler2(event); + break; + } + + // check again + switch (async_state_machine) { + case ASYNC_PING_SUCCESSFUL: + async_state_machine=ASYNC_IDLE; + return 0; + break; + case ASYNC_PING_FAILED: + return -1; + break; + case ASYNC_PING_TIMEOUT: + return -2; + break; + default: + return 1; + break; + } + return 1; +} + +void MySQL_Monitor_State_Data::next_event(MDB_ASYNC_ST new_st) { +#ifdef DEBUG + int fd; +#endif // DEBUG + wait_events=0; + + if (async_exit_status & MYSQL_WAIT_READ) + wait_events |= POLLIN; + if (async_exit_status & MYSQL_WAIT_WRITE) + wait_events|= POLLOUT; + if (wait_events) +#ifdef DEBUG + fd= mysql_get_socket(mysql); +#else + mysql_get_socket(mysql); +#endif // DEBUG + else +#ifdef DEBUG + fd= -1; +#endif // DEBUG + if (async_exit_status & MYSQL_WAIT_TIMEOUT) { + timeout=10000; + } else { + } + proxy_debug(PROXY_DEBUG_NET, 8, "fd=%d, wait_events=%d , old_ST=%d, new_ST=%d\n", fd, wait_events, async_state_machine, new_st); + async_state_machine = new_st; +} + +void MySQL_Monitor_State_Data::ping_start() { + async_exit_status = mysql_ping_start(&interr,mysql); +} + +void MySQL_Monitor_State_Data::ping_cont(short event) { + async_exit_status = mysql_ping_cont(&interr,mysql, mysql_status2(event, true)); +} +*/