Skip to content

Commit

Permalink
ProxySQL reads GTID information from proxysql_mysqlbinlog
Browse files Browse the repository at this point in the history
This is the first commit to pull data from proxysql_mysqlbinlog.
It is still in Alpha phase, as it is missing a log of important logics, like error handling, retry mechanism, timeouts, etc
  • Loading branch information
renecannao committed Jan 16, 2018
1 parent a15917e commit f4a0c4a
Show file tree
Hide file tree
Showing 7 changed files with 583 additions and 7 deletions.
213 changes: 211 additions & 2 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
#include "proxysql_gtid.h"

#include <thread>
#include <iostream>

#include "thread.h"
#include "wqueue.h"


#include "ev.h"

/*
Enabling STRESSTEST_POOL ProxySQL will do a lot of loops in the connection pool
This is for internal testing ONLY!!!!
Expand Down Expand Up @@ -43,6 +47,201 @@ enum MySerStatus {
MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG
};


std::string gtid_executed_to_string(gtid_set_t& gtid_executed);
void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed);

class GTID_Server_Data {
public:
char *address;
uint16_t port;
uint16_t mysql_port;
char *data;
size_t len;
size_t size;
size_t pos;
struct ev_io *w;
char uuid_server[64];
gtid_set_t gtid_executed;
bool active;
GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
active = true;
w = _w;
size = 1024; // 1KB buffer
data = (char *)malloc(size);
uuid_server[0] = 0;
pos = 0;
len = 0;
address = strdup(_address);
port = _port;
mysql_port = _mysql_port;
}
void resize(size_t _s) {
char *data_ = (char *)malloc(_s);
memcpy(data_, data, (_s > size ? size : _s));
size = _s;
free(data);
data = data_;
}
~GTID_Server_Data() {
free(address);
free(data);
}
bool readall() {
bool ret = true;
if (size == len) {
// buffer is full, expand
resize(len*2);
}
int rc = 0;
rc = read(w->fd,data+len,size-len);
if (rc > 0) {
len += rc;
} else {
int myerr = errno;
if (
(rc == 0) ||
(rc==-1 && myerr != EINTR && myerr != EAGAIN)
) {
ret = false;
}
}
return ret;
}
bool writeout() {
bool ret = true;
if (len==0) {
return ret;
}
int rc = 0;
rc = write(w->fd,data+pos,len-pos);
if (rc > 0) {
pos += rc;
if (pos >= len/2) {
memmove(data,data+pos,len-pos);
len = len-pos;
pos = 0;
}
}
return ret;
}
bool read_next_gtid() {
if (len==0) {
return false;
}
void *nlp = NULL;
nlp = memchr(data+pos,'\n',len-pos);
if (nlp == NULL) {
return false;
}
int l = (char *)nlp - (data+pos);
char rec_msg[80];
if (strncmp(data+pos,(char *)"ST=",3)==0) {
// we are reading the bootstrap
char *bs = (char *)malloc(l+1-3); // length + 1 (null byte) - 3 (header)
memcpy(bs,data+pos+3,l+1-3);
char *saveptr1=NULL;
char *saveptr2=NULL;
//char *saveptr3=NULL;
char *token = NULL;
char *subtoken = NULL;
//char *subtoken2 = NULL;
char *str1 = NULL;
char *str2 = NULL;
//char *str3 = NULL;
for (str1 = bs; ; str1 = NULL) {
token = strtok_r(str1, ",", &saveptr1);
if (token == NULL) {
break;
}
int j = 0;
for (str2 = token; ; str2 = NULL) {
subtoken = strtok_r(str2, ":", &saveptr2);
if (subtoken == NULL) {
break;
}
j++;
if (j%2 == 1) { // we are reading the uuid
char *p = uuid_server;
for (unsigned int k=0; k<strlen(subtoken); k++) {
if (subtoken[k]!='-') {
*p = subtoken[k];
p++;
}
}
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids
uint64_t trx_from;
uint64_t trx_to;
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
std::string s = uuid_server;
gtid_executed[s].emplace_back(trx_from, trx_to);
}
}
}
pos += l+1;
free(bs);
//return true;
} else {
strncpy(rec_msg,data+pos,l);
pos += l+1;
rec_msg[l]=0;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if (rec_msg[0]=='I') {
//char rec_uuid[80];
uint64_t rec_trxid;
char *a = NULL;
int ul = 0;
switch (rec_msg[1]) {
case '1':
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr(rec_msg+3,':');
ul = a-rec_msg-3;
strncpy(uuid_server,rec_msg+3,ul);
uuid_server[ul] = 0;
rec_trxid=atoll(a+1);
break;
case '2':
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid=atoll(rec_msg+3);
break;
default:
break;
}
fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std::string s = uuid_server;
gtid_t new_gtid = std::make_pair(s,rec_trxid);
addGtid(new_gtid,gtid_executed);
//return true;
}
}
std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true;
}
void read_all_gtids() {
while (read_next_gtid()) {
}
}
void dump() {
if (len==0) {
return;
}
read_all_gtids();
//int rc = write(1,data+pos,len-pos);
fflush(stdout);
///pos += rc;
if (pos >= len/2) {
memmove(data,data+pos,len-pos);
len = len-pos;
pos = 0;
}
}
};



class MySrvConnList {
private:
PtrArray *conns;
Expand Down Expand Up @@ -184,10 +383,16 @@ class MySQL_HostGroups_Manager {

std::thread *HGCU_thread;

std::unordered_map <string, Gtid_Server_Info *> gtid_map;
pthread_rwlock_t gtid_rwlock;
std::thread *GTID_syncer_thread;
//pthread_t GTID_syncer_thread_id;
//pthread_t HGCU_thread_id;


public:
pthread_rwlock_t gtid_rwlock;
std::unordered_map <string, GTID_Server_Data *> gtid_map;
struct ev_async * gtid_ev_async;
struct ev_loop * gtid_ev_loop;
struct {
unsigned int servers_table_version;
pthread_mutex_t servers_table_version_lock;
Expand Down Expand Up @@ -220,6 +425,7 @@ class MySQL_HostGroups_Manager {
wqueue<MySQL_Connection *> queue;
MySQL_HostGroups_Manager();
~MySQL_HostGroups_Manager();
void init();
void wrlock();
void wrunlock();
bool server_add(unsigned int hid, char *add, uint16_t p=3306, uint16_t gp=0, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, unsigned int _comp=0, unsigned int _max_connections=100, unsigned int _max_replication_lag=0, unsigned int _use_ssl=0, unsigned int _max_latency_ms=0, char *comment=NULL);
Expand Down Expand Up @@ -258,6 +464,9 @@ class MySQL_HostGroups_Manager {
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
void converge_group_replication_config(int _writer_hostgroup);

SQLite3_result * get_stats_mysql_gtid_executed();
void generate_mysql_gtid_executed_tables();
};

#endif /* __CLASS_MYSQL_HOSTGROUPS_MANAGER_H */
1 change: 1 addition & 0 deletions include/proxysql_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ class ProxySQL_Admin {
void stats___proxysql_servers_checksums();
void stats___proxysql_servers_metrics();
void stats___mysql_prepared_statements_info();
void stats___mysql_gtid_executed();

int Read_Global_Variables_from_configfile(const char *prefix);
int Read_MySQL_Users_from_configfile();
Expand Down
14 changes: 13 additions & 1 deletion include/proxysql_gtid.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,24 @@ typedef std::pair<std::string, int64_t> gtid_t;
typedef std::pair<int64_t, int64_t> gtid_interval_t;
typedef std::unordered_map<std::string, std::list<gtid_interval_t>> gtid_set_t;

/*
class Gtid_Server_Info {
public:
gtid_set_t executed_gtid_set;
char *address;
char *hostname;
uint16_t mysql_port;
uint16_t gtid_port;
bool active;
Gtid_Server_Info(char *_h, uint16_t _mp, uint16_t _gp) {
hostname = strdup(_h);
mysql_port = _mp;
gtid_port = _gp;
active = true;
};
~Gtid_Server_Info() {
free(hostname);
};
};
*/

#endif /* PROXYSQL_GTID */
6 changes: 5 additions & 1 deletion lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ MICROHTTPD_IDIR=-I$(MICROHTTPD_DIR) -I$(MICROHTTPD_DIR)/src/include
CURL_DIR=$(DEPS_PATH)/curl/curl
CURL_IDIR=-I$(CURL_DIR)/include

EV_DIR=$(DEPS_PATH)/libev/libev/
EV_IDIR=$(EV_DIR)


IDIR=../include

IDIRS=-I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) $(MICROHTTPD_IDIR) $(CURL_IDIR)
IDIRS=-I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) $(MICROHTTPD_IDIR) $(CURL_IDIR) -I$(EV_DIR)

LDIRS=-L$(JEMALLOC_PATH)/lib -L$(RE2_PATH)/obj -L$(INJECTION_PATH)

Expand Down
Loading

0 comments on commit f4a0c4a

Please sign in to comment.