diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 8b22ecdade..e052479402 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -54,6 +54,7 @@ class MySQL_Monitor_State_Data { void next_event(int new_st, int status); void unregister(); SQLite3DB *mondb; + bool create_new_connection(); // we are copying these from MySQL_Connection // short wait_events; // unsigned long long timeout; diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 14c8507e27..11d1745112 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -123,7 +123,7 @@ static int ping__num_active_connections; static int replication_lag__num_active_connections; static int total_replication_lag__num_active_connections=0; static int read_only__num_active_connections; -static int total_read_only__num_active_connections=0; +//static int total_read_only__num_active_connections=0; struct cmp_str { @@ -815,24 +815,8 @@ void * monitor_connect_thread(void *arg) { mysql_thr->curtime=monotonic_time(); mysql_thr->refresh_variables(); - mmsd->mysql=mysql_init(NULL); - assert(mmsd->mysql); - if (mmsd->use_ssl) { - mysql_ssl_set(mmsd->mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher); - } - unsigned int timeout=mysql_thread___monitor_connect_timeout/1000; - mysql_options(mmsd->mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); - mysql_options(mmsd->mysql, MYSQL_OPT_READ_TIMEOUT, &timeout); - mysql_options(mmsd->mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout); - MYSQL *myrc=NULL; - if (mmsd->port) { - myrc=mysql_real_connect(mmsd->mysql, mmsd->hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, mmsd->port, NULL, 0); - } else { - myrc=mysql_real_connect(mmsd->mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, mmsd->hostname, 0); - } - if (myrc==NULL) { - mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); - } + + mmsd->create_new_connection(); unsigned long long start_time=mysql_thr->curtime; mmsd->t1=start_time; @@ -872,34 +856,10 @@ void * monitor_ping_thread(void *arg) { mmsd->t1=start_time; if (mmsd->mysql==NULL) { // we don't have a connection, let's create it - mmsd->mysql=mysql_init(NULL); - assert(mmsd->mysql); - if (mmsd->use_ssl) { - mysql_ssl_set(mmsd->mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher); - } - unsigned int timeout=mysql_thread___monitor_connect_timeout/1000; - mysql_options(mmsd->mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); - mysql_options(mmsd->mysql, MYSQL_OPT_READ_TIMEOUT, &timeout); - mysql_options(mmsd->mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout); - MYSQL *myrc=NULL; - if (mmsd->port) { - myrc=mysql_real_connect(mmsd->mysql, mmsd->hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, mmsd->port, NULL, 0); - } else { - myrc=mysql_real_connect(mmsd->mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, mmsd->hostname, 0); - } - if (myrc==NULL) { - mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); + bool rc; + rc=mmsd->create_new_connection(); + if (rc==false) { goto __exit_monitor_ping_thread; - } else { - // mariadb client library disables NONBLOCK for SSL connections ... re-enable it! - mysql_options(mmsd->mysql, MYSQL_OPT_NONBLOCK, 0); - int f=fcntl(mmsd->mysql->net.fd, F_GETFL); -#ifdef FD_CLOEXEC - // asynchronously set also FD_CLOEXEC , this to prevent then when a fork happens the FD are duplicated to new process - fcntl(mmsd->mysql->net.fd, F_SETFL, f|O_NONBLOCK|FD_CLOEXEC); -#else - fcntl(mmsd->mysql->net.fd, F_SETFL, f|O_NONBLOCK); -#endif /* FD_CLOEXEC */ } } @@ -924,7 +884,6 @@ void * monitor_ping_thread(void *arg) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); mmsd->mysql=NULL; } - mmsd->t2=monotonic_time(); __exit_monitor_ping_thread: mmsd->t2=monotonic_time(); @@ -945,23 +904,166 @@ void * monitor_ping_thread(void *arg) { rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); sqlite3_finalize(statement); -/* - free(sds); - query=(char *)"INSERT OR REPLACE INTO mysql_server_connect_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; + } +__fast_exit_monitor_ping_thread: + if (mmsd->mysql) { + mysql_close(mmsd->mysql); // if we reached here we didn't put the connection back + } + delete mysql_thr; + return NULL; +} + +bool MySQL_Monitor_State_Data::create_new_connection() { + mysql=mysql_init(NULL); + assert(mysql); + if (use_ssl) { + mysql_ssl_set(mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher); + } + unsigned int timeout=mysql_thread___monitor_connect_timeout/1000; + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); +// mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, &timeout); +// mysql_options(mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout); + MYSQL *myrc=NULL; + if (port) { + myrc=mysql_real_connect(mysql, hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, port, NULL, 0); + } else { + myrc=mysql_real_connect(mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, hostname, 0); + } + if (myrc==NULL) { + mysql_error_msg=strdup(mysql_error(mysql)); + return false; + } else { + // mariadb client library disables NONBLOCK for SSL connections ... re-enable it! + mysql_options(mysql, MYSQL_OPT_NONBLOCK, 0); + int f=fcntl(mysql->net.fd, F_GETFL); +#ifdef FD_CLOEXEC + // asynchronously set also FD_CLOEXEC , this to prevent then when a fork happens the FD are duplicated to new process + fcntl(mysql->net.fd, F_SETFL, f|O_NONBLOCK|FD_CLOEXEC); +#else + fcntl(mysql->net.fd, F_SETFL, f|O_NONBLOCK); +#endif /* FD_CLOEXEC */ + } + return true; +} + +void * monitor_read_only_thread(void *arg) { + MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Thread * mysql_thr = new MySQL_Thread(); + mysql_thr->curtime=monotonic_time(); + mysql_thr->refresh_variables(); + + mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port); + unsigned long long start_time=mysql_thr->curtime; + + + mmsd->t1=start_time; + + if (mmsd->mysql==NULL) { // we don't have a connection, let's create it + bool rc; + rc=mmsd->create_new_connection(); + if (rc==false) { + goto __exit_monitor_read_only_thread; + } + } + + mmsd->t1=monotonic_time(); + //async_exit_status=mysql_change_user_start(&ret_bool, mysql,"msandbox2","msandbox2","information_schema"); + //mmsd->async_exit_status=mysql_ping_start(&mmsd->interr,mmsd->mysql); + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SHOW GLOBAL VARIABLES LIKE 'read_only'"); + while (mmsd->async_exit_status) { + mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + mmsd->async_exit_status=mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status); + unsigned long long now=monotonic_time(); + if (now > mmsd->t1 + mysql_thread___monitor_ping_timeout * 1000) { + mmsd->mysql_error_msg=strdup("timeout check"); + goto __exit_monitor_read_only_thread; + } + if (GloMyMon->shutdown==true) { + goto __fast_exit_monitor_read_only_thread; // exit immediately + } + } + mmsd->async_exit_status=mysql_store_result_start(&mmsd->result,mmsd->mysql); + while (mmsd->async_exit_status) { + mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + mmsd->async_exit_status=mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status); + unsigned long long now=monotonic_time(); + if (now > mmsd->t1 + mysql_thread___monitor_ping_timeout * 1000) { + mmsd->mysql_error_msg=strdup("timeout check"); + goto __exit_monitor_read_only_thread; + } + if (GloMyMon->shutdown==true) { + goto __fast_exit_monitor_read_only_thread; // exit immediately + } + } + if (mmsd->interr) { // ping failed + mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); +// } else { +// GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); +// mmsd->mysql=NULL; + } + +__exit_monitor_read_only_thread: + mmsd->t2=monotonic_time(); + { + sqlite3_stmt *statement; + sqlite3 *mondb=mmsd->mondb->get_db(); + int rc; + char *query=NULL; + query=(char *)"INSERT OR REPLACE INTO mysql_server_read_only_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); assert(rc==SQLITE_OK); + int read_only=1; // as a safety mechanism , read_only=1 is the default rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); - rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + if (mmsd->result) { + int num_fields=0; + int k=0; + MYSQL_FIELD *fields=NULL; + int j=-1; + num_fields = mysql_num_fields(mmsd->result); + fields = mysql_fetch_fields(mmsd->result); + for(k = 0; k < num_fields; k++) { + //if (strcmp("VARIABLE_NAME", fields[k].name)==0) { + if (strcmp("Value", fields[k].name)==0) { + j=k; + } + } + if (j>-1) { + MYSQL_ROW row=mysql_fetch_row(mmsd->result); + if (row) { + if (row[j]) { + if (!strcmp(row[j],"0") || !strcasecmp(row[j],"OFF")) + read_only=0; + } + } + } +// if (repl_lag>=0) { + rc=sqlite3_bind_int64(statement, 5, read_only); assert(rc==SQLITE_OK); +// } else { +// rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); +// } + mysql_free_result(mmsd->result); + mmsd->result=NULL; + } else { + rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); + } + rc=sqlite3_bind_text(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); SAFE_SQLITE3_STEP(statement); rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + + MyHGM->read_only_action(mmsd->hostname, mmsd->port, read_only); + sqlite3_finalize(statement); -*/ } -__fast_exit_monitor_ping_thread: + if (mmsd->interr) { // check failed + } else { + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + mmsd->mysql=NULL; + } +__fast_exit_monitor_read_only_thread: if (mmsd->mysql) { mysql_close(mmsd->mysql); // if we reached here we didn't put the connection back } @@ -1096,9 +1198,6 @@ void * MySQL_Monitor::monitor_ping() { unsigned long long t2; unsigned long long start_time; unsigned long long next_loop_at=0; - //unsigned int t1; - //unsigned int t2; - //t1=monotonic_time(); while (shutdown==false) { @@ -1107,10 +1206,6 @@ void * MySQL_Monitor::monitor_ping() { int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; -// MySQL_Monitor_State_Data **sds=NULL; - //int i=0; - //char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE status!='OFFLINE_HARD'"; - // add support for SSL char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers WHERE status!='OFFLINE_HARD' GROUP BY hostname, port"; t1=monotonic_time(); @@ -1126,15 +1221,7 @@ void * MySQL_Monitor::monitor_ping() { } next_loop_at=t1+1000*mysql_thread___monitor_ping_interval; -/* - struct timeval tv_out; - evutil_gettimeofday(&tv_out, NULL); - start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); -*/ start_time=monotonic_time(); -// ping__num_active_connections=0; -// // create libevent base -// libevent_base= event_base_new(); proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); @@ -1145,39 +1232,17 @@ void * MySQL_Monitor::monitor_ping() { if (resultset->rows_count==0) { goto __end_monitor_ping_loop; } -// sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *)); for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; MySQL_Monitor_State_Data *mmsd = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); -/* - sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2])); - sds[i]->task_id=MON_PING; - ping__num_active_connections++; - total_ping__num_active_connections++; - MySQL_Monitor_State_Data *_mmsd=sds[i]; -*/ -// mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port); mmsd->mondb=monitordb; pthread_t thr_; if ( pthread_create(&thr_, &attr, monitor_ping_thread, (void *)mmsd) != 0 ) { perror("Thread creation monitor_ping_thread"); } -/* - if (_mmsd->mysql==NULL) { - state_machine_handler(-1,-1,_mmsd); - } else { - int fd=mysql_get_socket(_mmsd->mysql); - _mmsd->ST=7; - state_machine_handler(fd,-1,_mmsd); - } - i++; -*/ } } - // start libevent loop -// event_base_dispatch(libevent_base); - __end_monitor_ping_loop: /* if (sds) */ { sqlite3_stmt *statement; @@ -1196,276 +1261,6 @@ void * MySQL_Monitor::monitor_ping() { rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); sqlite3_finalize(statement); -/* - query=(char *)"INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; - rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); - assert(rc==SQLITE_OK); - while (i>0) { - i--; - MySQL_Monitor_State_Data *mmsd=sds[i]; - rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); - rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); - SAFE_SQLITE3_STEP(statement); - rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); - rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); - delete mmsd; - } - sqlite3_finalize(statement); - free(sds); -*/ - } - - if (resultset) { - delete resultset; - resultset=NULL; - } - -// event_base_free(libevent_base); - - // now it is time to shun all problematic hosts - query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NOT NULL"; - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); - admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (error) { - proxy_error("Error on %s : %s\n", query, error); - } else { - // get all addresses and ports - int i=0; - int j=0; - char **addresses=(char **)malloc(resultset->rows_count * sizeof(char *)); - char **ports=(char **)malloc(resultset->rows_count * sizeof(char *)); - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - addresses[i]=strdup(r->fields[0]); - ports[i]=strdup(r->fields[1]); - i++; - } - if (resultset) { - delete resultset; - resultset=NULL; - } - char *new_query=NULL; - new_query=(char *)"SELECT 1 FROM (SELECT hostname,port,ping_error FROM mysql_server_ping_log WHERE hostname='%s' AND port='%s' ORDER BY time_start DESC LIMIT %d) a WHERE ping_error IS NOT NULL GROUP BY hostname,port HAVING COUNT(*)=%d"; - for (j=0;jexecute_statement(buff, &error , &cols , &affected_rows , &resultset); - if (!error) { - if (resultset) { - if (resultset->rows_count) { - // disable host - proxy_error("Server %s:%s missed %d heartbeats, shunning it and killing all the connections\n", addresses[j], ports[j], max_failures); - MyHGM->shun_and_killall(addresses[j],atoi(ports[j])); - } - delete resultset; - resultset=NULL; - } - } else { - proxy_error("Error on %s : %s\n", query, error); - } - free(buff); - } - - while (i) { // now free all the addresses/ports - i--; - free(addresses[i]); - free(ports[i]); - } - free(addresses); - free(ports); - } - - - // now it is time to update current_lantency_ms - query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NULL"; - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); - admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (error) { - proxy_error("Error on %s : %s\n", query, error); - } else { - // get all addresses and ports - int i=0; - int j=0; - char **addresses=(char **)malloc(resultset->rows_count * sizeof(char *)); - char **ports=(char **)malloc(resultset->rows_count * sizeof(char *)); - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - addresses[i]=strdup(r->fields[0]); - ports[i]=strdup(r->fields[1]); - i++; - } - if (resultset) { - delete resultset; - resultset=NULL; - } - char *new_query=NULL; - - new_query=(char *)"SELECT hostname,port,COALESCE(CAST(AVG(ping_success_time) AS INTEGER),10000) FROM (SELECT hostname,port,ping_success_time,ping_error FROM mysql_server_ping_log WHERE hostname='%s' AND port='%s' ORDER BY time_start DESC LIMIT 3) a WHERE ping_error IS NULL GROUP BY hostname,port"; - for (j=0;jexecute_statement(buff, &error , &cols , &affected_rows , &resultset); - if (!error) { - if (resultset) { - if (resultset->rows_count) { - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; // this should be called just once, but we create a generic for loop - // update current_latency_ms - MyHGM->set_server_current_latency_us(addresses[j],atoi(ports[j]), atoi(r->fields[2])); - } - } - delete resultset; - resultset=NULL; - } - } else { - proxy_error("Error on %s : %s\n", query, error); - } - free(buff); - } - } - -__sleep_monitor_ping_loop: - t2=monotonic_time(); - if (t2 500000) { - st = 500000; - } - usleep(st); - } - } - if (mysql_thr) { - delete mysql_thr; - mysql_thr=NULL; - } - return NULL; -} -/* -void * MySQL_Monitor::monitor_ping() { - // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) - struct event_base *libevent_base; - unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; - MySQL_Thread * mysql_thr = new MySQL_Thread(); - mysql_thr->curtime=monotonic_time(); - MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); - mysql_thr->refresh_variables(); - - unsigned long long t1; - unsigned long long t2; - unsigned long long start_time; - unsigned long long next_loop_at=0; - //unsigned int t1; - //unsigned int t2; - //t1=monotonic_time(); - - while (shutdown==false) { - - unsigned int glover; - char *error=NULL; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - MySQL_Monitor_State_Data **sds=NULL; - int i=0; - //char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE status!='OFFLINE_HARD'"; - // add support for SSL - char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers WHERE status!='OFFLINE_HARD' GROUP BY hostname, port"; - t1=monotonic_time(); - - glover=GloMTH->get_global_version(); - if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { - MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; - mysql_thr->refresh_variables(); - next_loop_at=0; - } - - if (t1 < next_loop_at) { - goto __sleep_monitor_ping_loop; - } - next_loop_at=t1+1000*mysql_thread___monitor_ping_interval; - - struct timeval tv_out; - evutil_gettimeofday(&tv_out, NULL); - start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); - - ping__num_active_connections=0; - // create libevent base - libevent_base= event_base_new(); - - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); - admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (error) { - proxy_error("Error on %s : %s\n", query, error); - goto __end_monitor_ping_loop; - } else { - if (resultset->rows_count==0) { - goto __end_monitor_ping_loop; - } - sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *)); - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2])); - sds[i]->task_id=MON_PING; - ping__num_active_connections++; - total_ping__num_active_connections++; - MySQL_Monitor_State_Data *_mmsd=sds[i]; - _mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(_mmsd->hostname, _mmsd->port); - if (_mmsd->mysql==NULL) { - state_machine_handler(-1,-1,_mmsd); - } else { - int fd=mysql_get_socket(_mmsd->mysql); - _mmsd->ST=7; - state_machine_handler(fd,-1,_mmsd); - } - i++; - } - } - - // start libevent loop - event_base_dispatch(libevent_base); - -__end_monitor_ping_loop: - if (sds) { - sqlite3_stmt *statement; - sqlite3 *mondb=monitordb->get_db(); - int rc; - char *query=NULL; - query=(char *)"DELETE FROM mysql_server_ping_log WHERE time_start < ?1"; - rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); - assert(rc==SQLITE_OK); - if (mysql_thread___monitor_history < mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 )) { // issue #626 - if (mysql_thread___monitor_ping_interval < 3600000) - mysql_thread___monitor_history = mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 ); - } - rc=sqlite3_bind_int64(statement, 1, start_time-mysql_thread___monitor_history*1000); assert(rc==SQLITE_OK); - SAFE_SQLITE3_STEP(statement); - rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); - rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); - sqlite3_finalize(statement); - - query=(char *)"INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; - rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); - assert(rc==SQLITE_OK); - while (i>0) { - i--; - MySQL_Monitor_State_Data *mmsd=sds[i]; - rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); - rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); - SAFE_SQLITE3_STEP(statement); - rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); - rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); - delete mmsd; - } - sqlite3_finalize(statement); - free(sds); } if (resultset) { @@ -1473,8 +1268,6 @@ void * MySQL_Monitor::monitor_ping() { resultset=NULL; } - event_base_free(libevent_base); - // now it is time to shun all problematic hosts query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NOT NULL"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); @@ -1595,38 +1388,36 @@ void * MySQL_Monitor::monitor_ping() { } return NULL; } -*/ void * MySQL_Monitor::monitor_read_only() { // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) - struct event_base *libevent_base; +// struct event_base *libevent_base; unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); mysql_thr->refresh_variables(); + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); +// pthread_attr_setstacksize (&attr, 192*1024); + unsigned long long t1; unsigned long long t2; unsigned long long start_time; unsigned long long next_loop_at=0; - unsigned int num_fields=0; - unsigned int k=0; - MYSQL_FIELD *fields=NULL; while (shutdown==false) { unsigned int glover; char *error=NULL; -// int cols=0; -// int affected_rows=0; SQLite3_result *resultset=NULL; - MySQL_Monitor_State_Data **sds=NULL; - int i=0; //char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status!='OFFLINE_HARD'"; // add support for SSL char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status!='OFFLINE_HARD' GROUP BY hostname, port"; t1=monotonic_time(); + start_time=t1; glover=GloMTH->get_global_version(); if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { @@ -1639,15 +1430,6 @@ void * MySQL_Monitor::monitor_read_only() { goto __sleep_monitor_read_only; } next_loop_at=t1+1000*mysql_thread___monitor_read_only_interval; - - struct timeval tv_out; - evutil_gettimeofday(&tv_out, NULL); - start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); - - read_only__num_active_connections=0; - // create libevent base - libevent_base= event_base_new(); - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); // admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); resultset = MyHGM->execute_query(query, &error); @@ -1659,33 +1441,19 @@ void * MySQL_Monitor::monitor_read_only() { if (resultset->rows_count==0) { goto __end_monitor_read_only_loop; } - sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *)); for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; - sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2])); - sds[i]->task_id=MON_READ_ONLY; -// sds[i]->hostgroup_id=atoi(r->fields[0]); -// sds[i]->repl_lag=atoi(r->fields[3]); - read_only__num_active_connections++; - total_read_only__num_active_connections++; - MySQL_Monitor_State_Data *_mmsd=sds[i]; - _mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(_mmsd->hostname, _mmsd->port); - if (_mmsd->mysql==NULL) { - state_machine_handler(-1,-1,_mmsd); - } else { - int fd=mysql_get_socket(_mmsd->mysql); - _mmsd->ST=20; - state_machine_handler(fd,-1,_mmsd); + MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); + mmsd->mondb=monitordb; + pthread_t thr_; + if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) { + perror("Thread creation monitor_read_only_thread"); } - i++; } } - // start libevent loop - event_base_dispatch(libevent_base); - __end_monitor_read_only_loop: - if (sds) { + /* if (sds) */ { sqlite3_stmt *statement=NULL; sqlite3 *mondb=monitordb->get_db(); int rc; @@ -1702,70 +1470,12 @@ void * MySQL_Monitor::monitor_read_only() { rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); sqlite3_finalize(statement); - - query=(char *)"INSERT OR REPLACE INTO mysql_server_read_only_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; - rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); - assert(rc==SQLITE_OK); - while (i>0) { - i--; - int read_only=1; // as a safety mechanism , read_only=1 is the default - MySQL_Monitor_State_Data *mmsd=NULL; - mmsd=sds[i]; - rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); - rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); - if (mmsd->result) { - num_fields=0; - k=0; - fields=NULL; - int j=-1; - num_fields = mysql_num_fields(mmsd->result); - fields = mysql_fetch_fields(mmsd->result); - for(k = 0; k < num_fields; k++) { - //if (strcmp("VARIABLE_NAME", fields[k].name)==0) { - if (strcmp("Value", fields[k].name)==0) { - j=k; - } - } - if (j>-1) { - MYSQL_ROW row=mysql_fetch_row(mmsd->result); - if (row) { - if (row[j]) { - if (!strcmp(row[j],"0") || !strcasecmp(row[j],"OFF")) - read_only=0; - } - } - } -// if (repl_lag>=0) { - rc=sqlite3_bind_int64(statement, 5, read_only); assert(rc==SQLITE_OK); -// } else { -// rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); -// } - mysql_free_result(mmsd->result); - mmsd->result=NULL; - } else { - rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); - } - rc=sqlite3_bind_text(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); - SAFE_SQLITE3_STEP(statement); - rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); - rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); - - MyHGM->read_only_action(mmsd->hostname, mmsd->port, read_only); - - delete mmsd; - } - sqlite3_finalize(statement); - free(sds); - } if (resultset) delete resultset; - event_base_free(libevent_base); __sleep_monitor_read_only: @@ -1980,7 +1690,7 @@ void * MySQL_Monitor::run() { mysql_thr->refresh_variables(); std::thread * monitor_connect_thread = new std::thread(&MySQL_Monitor::monitor_connect,this); std::thread * monitor_ping_thread = new std::thread(&MySQL_Monitor::monitor_ping,this); -// std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this); + std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this); // std::thread * monitor_replication_lag_thread = new std::thread(&MySQL_Monitor::monitor_replication_lag,this); while (shutdown==false) { unsigned int glover=GloMTH->get_global_version(); @@ -1994,7 +1704,7 @@ void * MySQL_Monitor::run() { } monitor_connect_thread->join(); monitor_ping_thread->join(); -// monitor_read_only_thread->join(); + monitor_read_only_thread->join(); // monitor_replication_lag_thread->join(); if (mysql_thr) { delete mysql_thr;