Skip to content

Commit

Permalink
Merge branch 'v1.4.0' into refactor_headers
Browse files Browse the repository at this point in the history
  • Loading branch information
renecannao authored Dec 21, 2016
2 parents 30dbcc2 + b118592 commit 3665ba0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 102 deletions.
141 changes: 40 additions & 101 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,124 +107,63 @@ static void close_mysql(MYSQL *my) {
}



struct cmp_str {
bool operator()(char const *a, char const *b) const
{
return strcmp(a, b) < 0;
}
};

class MySQL_Monitor_Connection_Pool {
private:
pthread_mutex_t mutex;
int size;
std::map<char *, PtrArray *, cmp_str> my_connections;
public:
MySQL_Monitor_Connection_Pool();
~MySQL_Monitor_Connection_Pool();
private:
std::mutex mutex;
std::map<std::pair<std::string, int>, std::vector<MYSQL*> > my_connections;
public:
MYSQL * get_connection(char *hostname, int port);
void put_connection(char *hostname, int port, MYSQL *my);
void purge_idle_connections();
};

MySQL_Monitor_Connection_Pool::MySQL_Monitor_Connection_Pool() {
size=0;
pthread_mutex_init(&mutex,NULL);
}

MySQL_Monitor_Connection_Pool::~MySQL_Monitor_Connection_Pool() {
// FIXME: destructor is yet not completed
std::map<char *, PtrArray *>::iterator it;
for(it = my_connections.begin(); it != my_connections.end(); it++) {
PtrArray *lst=it->second;
delete lst;
char *host=it->first;
free(host);
}
my_connections.erase(my_connections.begin(),my_connections.end());
}

void MySQL_Monitor_Connection_Pool::purge_idle_connections() {
unsigned long long now=monotonic_time();
pthread_mutex_lock(&mutex);
std::map<char *, PtrArray *>::iterator it;
//fprintf(stderr,"conn pool size: %d\n",my_connections.size());
unsigned int totconn;
totconn=0;
for(it = my_connections.begin(); it != my_connections.end(); it++) {
PtrArray *lst=it->second;
totconn+=lst->len;
}
__loop_purge_idle_connections:
//fprintf(stderr,"tot conn in pool: %d\n",totconn);
for(it = my_connections.begin(); it != my_connections.end(); it++) {
PtrArray *lst=it->second;
if (lst->len) {
unsigned int it3;
for(it3 = 0; it3 < lst->len; it3++) {
MYSQL *my=(MYSQL *)(lst->index(it3));
unsigned long long then=0;
memcpy(&then,my->net.buff,sizeof(unsigned long long));
if (now > (then + mysql_thread___monitor_ping_interval*1000 * 3)) {
MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false);
mmsd->mysql=my;
WorkItem *item;
item=new WorkItem(mmsd,NULL);
GloMyMon->queue.add(item);
lst->remove_index_fast(it3);
it3--;
}
}
unsigned long long now = monotonic_time();
std::lock_guard<std::mutex> lock(mutex);
for(auto it = my_connections.begin(); it != my_connections.end();) {
auto& lst = it->second;
for(auto it3 = lst.begin(); it3 != lst.end();) {
MYSQL *my = *it3;
unsigned long long then = *(unsigned long long*)my->net.buff;
if (now > (then + mysql_thread___monitor_ping_interval*1000 * 3)) {
MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false);
mmsd->mysql=my;
GloMyMon->queue.add(new WorkItem(mmsd,NULL));
std::swap(*it3, lst.back());
if(it3 == lst.end() - 1)
it3 = lst.erase(it3);
else
lst.pop_back();
} else
++it3;
}
if (lst.size()) {
++it;
} else {
my_connections.erase(it);
goto __loop_purge_idle_connections;
it = my_connections.erase(it);
}
}
pthread_mutex_unlock(&mutex);
}


MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) {
std::map<char *, PtrArray * , cmp_str >::iterator it;
char *buf=(char *)malloc(16+strlen(hostname));
sprintf(buf,"%s:%d",hostname,port);
pthread_mutex_lock(&mutex);
it = my_connections.find(buf);
free(buf);
if (it != my_connections.end()) {
PtrArray *lst=it->second;
if (lst->len) {
MYSQL *ret=(MYSQL *)lst->remove_index_fast(0);
size--;
pthread_mutex_unlock(&mutex);
memset(ret->net.buff,0,sizeof(unsigned long long)); // reset what was polluted
return ret;
}
}
pthread_mutex_unlock(&mutex);
return NULL;
std::lock_guard<std::mutex> lock(mutex);
auto it = my_connections.find(std::make_pair(hostname, port));
if (it == my_connections.end() || !it->second.size())
return NULL;
MYSQL *my = it->second.back();
it->second.pop_back();
*(unsigned long long*)my->net.buff = 0;
return my;
}

void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYSQL *my) {
size++;
std::map<char *, PtrArray * , cmp_str >::iterator it;
char * buf=(char *)malloc(16+strlen(hostname));
sprintf(buf,"%s:%d",hostname,port);
unsigned long long now=monotonic_time();
memcpy(my->net.buff,&now,sizeof(unsigned long long)); //mark insert time
pthread_mutex_lock(&mutex);
it = my_connections.find(buf);
PtrArray *lst=NULL;
if (it==my_connections.end()) {
lst=new PtrArray();
my_connections.insert(my_connections.begin(), std::pair<char *, PtrArray *>(buf,lst));
} else {
free(buf);
lst=it->second;
}
lst->add(my);
pthread_mutex_unlock(&mutex);
unsigned long long now = monotonic_time();
std::lock_guard<std::mutex> lock(mutex);
*(unsigned long long*)my->net.buff = now;
auto it = my_connections.emplace(std::piecewise_construct,
std::forward_as_tuple(hostname, port), std::forward_as_tuple()).first;
it->second.push_back(my);
}

MySQL_Monitor_State_Data::MySQL_Monitor_State_Data(char *h, int p, struct event_base *b, bool _use_ssl, int g) {
Expand Down
6 changes: 5 additions & 1 deletion lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,11 @@ int MySQL_Session::handler() {
myconn->async_state_machine=ASYNC_IDLE;
myds->DSS=STATE_MARIADB_GENERIC;
if (transaction_persistent==true) {
transaction_persistent_hostgroup=current_hostgroup;
if (transaction_persistent_hostgroup==-1) { // change only if not set already, do not allow to change it again
if (myds->myconn->IsActiveTransaction()==true) { // only active transaction is important here. Ignore other criterias
transaction_persistent_hostgroup=current_hostgroup;
}
}
}
}
} else {
Expand Down

0 comments on commit 3665ba0

Please sign in to comment.