From f3899950e5623f49600f3b71e0b4fd1bbb0912c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 23 Jul 2016 12:31:53 +0000 Subject: [PATCH] Adding thread pool for Monitor Ping --- include/thread.h | 47 ++++++++++++++++++++++++++ include/wqueue.h | 71 +++++++++++++++++++++++++++++++++++++++ lib/Makefile | 2 +- lib/MySQL_Monitor.cpp | 45 ++++++++++++++++++++++--- lib/thread.cpp | 77 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 237 insertions(+), 5 deletions(-) create mode 100644 include/thread.h create mode 100644 include/wqueue.h create mode 100644 lib/thread.cpp diff --git a/include/thread.h b/include/thread.h new file mode 100644 index 0000000000..8745917670 --- /dev/null +++ b/include/thread.h @@ -0,0 +1,47 @@ +/* + thread.h + + Header for a Java style thread class in C++. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __thread_h__ +#define __thread_h__ + +#include + +class Thread +{ + public: + Thread(); + virtual ~Thread(); + + int start(); + int join(); + int detach(); + pthread_t self(); + + virtual void* run() = 0; + + private: + pthread_t m_tid; + int m_running; + int m_detached; +}; + +#endif diff --git a/include/wqueue.h b/include/wqueue.h new file mode 100644 index 0000000000..272d53afcb --- /dev/null +++ b/include/wqueue.h @@ -0,0 +1,71 @@ +/* + wqueue.h + + Worker thread queue based on the Standard C++ library list + template class. + + ------------------------------------------ + + Copyright @ 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __wqueue_h__ +#define __wqueue_h__ + +#include +#include + +using namespace std; + +template class wqueue +{ + list m_queue; + pthread_mutex_t m_mutex; + pthread_cond_t m_condv; + + public: + wqueue() { + pthread_mutex_init(&m_mutex, NULL); + pthread_cond_init(&m_condv, NULL); + } + ~wqueue() { + pthread_mutex_destroy(&m_mutex); + pthread_cond_destroy(&m_condv); + } + void add(T item) { + pthread_mutex_lock(&m_mutex); + m_queue.push_back(item); + pthread_cond_signal(&m_condv); + pthread_mutex_unlock(&m_mutex); + } + T remove() { + pthread_mutex_lock(&m_mutex); + while (m_queue.size() == 0) { + pthread_cond_wait(&m_condv, &m_mutex); + } + T item = m_queue.front(); + m_queue.pop_front(); + pthread_mutex_unlock(&m_mutex); + return item; + } + int size() { + pthread_mutex_lock(&m_mutex); + int size = m_queue.size(); + pthread_mutex_unlock(&m_mutex); + return size; + } +}; + +#endif diff --git a/lib/Makefile b/lib/Makefile index 95f13c2cbf..cc4d8708d6 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -50,7 +50,7 @@ default: libproxysql.a _OBJ = c_tokenizer.o OBJ = $(patsubst %,$(ODIR)/%,$(_OBJ)) -_OBJ_CPP = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo simple_kv.oo sqlite3db.oo global_variables.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo MySQL_Monitor.oo MySQL_Logger.oo +_OBJ_CPP = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo simple_kv.oo sqlite3db.oo global_variables.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo OBJ_CPP = $(patsubst %,$(ODIR)/%,$(_OBJ_CPP)) %.ko: %.cpp diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 11d1745112..05dd20de01 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -4,6 +4,8 @@ #include "proxysql.h" #include "cpp.h" +#include "thread.h" +#include "wqueue.h" #ifdef DEBUG #define DEB "_DEBUG" @@ -13,6 +15,8 @@ #define MYSQL_MONITOR_VERSION "0.2.0902" DEB + +#define MONTHREADS 4 #include extern ProxySQL_Admin *GloAdmin; @@ -39,6 +43,28 @@ static MySQL_Monitor *GloMyMon; static void state_machine_handler(int fd, short event, void *arg); +class ConsumerThreadPing : public Thread { + wqueue& m_queue; + void *(*routine) (void *); + int thrn; + public: + ConsumerThreadPing(wqueue& queue, void *(*start_routine) (void *), int _n) : m_queue(queue) { + routine=start_routine; + thrn=_n; + } + void* run() { + // Remove 1 item at a time and process it. Blocks if no items are + // available to process. + for (int i = 0;; i++) { + printf("thread %d, loop %d - waiting for item...\n", thrn, i); + MySQL_Monitor_State_Data* mmsd = (MySQL_Monitor_State_Data*)m_queue.remove(); + printf("thread %d, loop %d - got one item\n", thrn, i); + routine((void *)mmsd); + } + return NULL; + } +}; + static int wait_for_mysql(MYSQL *mysql, int status) { struct pollfd pfd; @@ -1198,6 +1224,12 @@ void * MySQL_Monitor::monitor_ping() { unsigned long long t2; unsigned long long start_time; unsigned long long next_loop_at=0; + wqueue queue; + ConsumerThreadPing **threads= (ConsumerThreadPing **)malloc(sizeof(ConsumerThreadPing *)*MONTHREADS); + for (int i=0;istart(); + } while (shutdown==false) { @@ -1236,10 +1268,11 @@ void * MySQL_Monitor::monitor_ping() { SQLite3_row *r=*it; MySQL_Monitor_State_Data *mmsd = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); mmsd->mondb=monitordb; - pthread_t thr_; - if ( pthread_create(&thr_, &attr, monitor_ping_thread, (void *)mmsd) != 0 ) { - perror("Thread creation monitor_ping_thread"); - } +// pthread_t thr_; +// if ( pthread_create(&thr_, &attr, monitor_ping_thread, (void *)mmsd) != 0 ) { +// perror("Thread creation monitor_ping_thread"); +// } + queue.add(mmsd); } } @@ -1386,6 +1419,10 @@ void * MySQL_Monitor::monitor_ping() { delete mysql_thr; mysql_thr=NULL; } + for (int i=0;ijoin(); + } + free(threads); return NULL; } diff --git a/lib/thread.cpp b/lib/thread.cpp new file mode 100644 index 0000000000..fb3403d773 --- /dev/null +++ b/lib/thread.cpp @@ -0,0 +1,77 @@ +/* + thread.cpp + + Definition of a Java style thread class in C++. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "thread.h" + +static void* runThread(void* arg) +{ + return ((Thread*)arg)->run(); +} + +Thread::Thread() : m_tid(0), m_running(0), m_detached(0) {} + +Thread::~Thread() +{ + if (m_running == 1 && m_detached == 0) { + pthread_detach(m_tid); + } + if (m_running == 1) { + pthread_cancel(m_tid); + } +} + +int Thread::start() +{ + int result = pthread_create(&m_tid, NULL, runThread, this); + if (result == 0) { + m_running = 1; + } + return result; +} + +int Thread::join() +{ + int result = -1; + if (m_running == 1) { + result = pthread_join(m_tid, NULL); + if (result == 0) { + m_detached = 0; + } + } + return result; +} + +int Thread::detach() +{ + int result = -1; + if (m_running == 1 && m_detached == 0) { + result = pthread_detach(m_tid); + if (result == 0) { + m_detached = 1; + } + } + return result; +} + +pthread_t Thread::self() { + return m_tid; +}