diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 7f24206605..85c2b91280 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -113,6 +113,8 @@ class MySQL_Data_Stream PtrSizeArray *PSarrayIN; PtrSizeArray *PSarrayOUT; + FixedSizeQueue data_packets_history_IN; + FixedSizeQueue data_packets_history_OUT; //PtrSizeArray *PSarrayOUTpending; PtrSizeArray *resultset; unsigned int resultset_length; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index c64a25ff7f..942a548c31 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -584,6 +584,7 @@ class MySQL_Threads_Handler bool enable_server_deprecate_eof; bool enable_load_data_local_infile; bool log_mysql_warnings_enabled; + int data_packets_history_size; } variables; struct { unsigned int mirror_sessions_current; diff --git a/include/gen_utils.h b/include/gen_utils.h index 571c797f60..314de078c9 100644 --- a/include/gen_utils.h +++ b/include/gen_utils.h @@ -2,7 +2,7 @@ #define __CLASS_PTR_ARRAY_H #include - +#include #include "proxysql.h" #include "sqlite3db.h" @@ -189,6 +189,86 @@ class PtrSizeArray { return intsize; } }; + +typedef struct { + void* data = nullptr; + size_t len = 0; + size_t capacity = 0; +} buffer_t; + +class FixedSizeQueue : public std::queue { +private: + using std::queue::push; + using std::queue::emplace; + using std::queue::swap; + size_t _max_size = 0; + +public: + FixedSizeQueue() = default; + FixedSizeQueue(size_t max_size) : _max_size(max_size) {} + ~FixedSizeQueue() { + while (empty() == false) { + auto& node = front(); + l_free(node.len, node.data); + pop(); + } + } + + inline + size_t get_max_size() const { + return _max_size; + } + + void set_max_size(size_t max_size) { + if (_max_size == max_size) + return; + + _max_size = max_size; + + if (size() > max_size) { + while (size() != max_size) { + auto& node = front(); + l_free(node.len, node.data); + pop(); + } + } + } + + // using template here to create compile-time separate definition of push, one for true and one for false + template + void push(void* buff, size_t len) { + if (_max_size == 0) return; + assert(buff && len); + + buffer_t mybuff; + + if (size() == _max_size) { + mybuff = front(); + pop(); + } + + if (ALLOC_MEM == true) { + if (mybuff.capacity < len) { + if (mybuff.data) free(mybuff.data); + + mybuff.data = l_alloc(len); + mybuff.capacity = len; + } + + memcpy(mybuff.data, buff, len); + mybuff.len = len; + + } else { + if (mybuff.data) free(mybuff.data); + + mybuff.data = buff; + mybuff.capacity = mybuff.len = len; + } + + emplace(mybuff); + } +}; + #endif /* __CLASS_PTR_ARRAY_H */ diff --git a/include/proxysql_glovars.hpp b/include/proxysql_glovars.hpp index 8a0dbe02c6..7e861ce550 100644 --- a/include/proxysql_glovars.hpp +++ b/include/proxysql_glovars.hpp @@ -121,6 +121,7 @@ class ProxySQL_GlobalVariables { char * ssl_key_pem_mem; char * ssl_cert_pem_mem; bool sqlite3_server; + int data_packets_history_size; #ifdef PROXYSQLCLICKHOUSE bool clickhouse_server; #endif /* PROXYSQLCLICKHOUSE */ diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index c70024fb7e..f353974c2c 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -579,6 +579,7 @@ static char * mysql_thread_variables_names[]= { (char *)"stats_time_backend_query", (char *)"stats_time_query_processor", (char *)"query_cache_stores_empty_result", + (char *)"data_packets_history_size", NULL }; @@ -1198,6 +1199,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.enable_server_deprecate_eof=true; variables.enable_load_data_local_infile=false; variables.log_mysql_warnings_enabled=false; + variables.data_packets_history_size=0; // status variables status_variables.mirror_sessions_current=0; __global_MySQL_Thread_Variables_version=1; @@ -2081,6 +2083,16 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi } return false; } + if (!strcasecmp(name,"data_packets_history_size")) { + int intv=atoi(value); + if (intv >= 0 && intv < INT_MAX) { + variables.data_packets_history_size = intv; + GloVars.global.data_packets_history_size = intv; + return true; + } else { + return false; + } + } return false; } @@ -2259,7 +2271,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["binlog_reader_connect_retry_msec"] = make_tuple(&variables.binlog_reader_connect_retry_msec, 0, 0, true); VariablesPointers_int["eventslog_format"] = make_tuple(&variables.eventslog_format, 0, 0, true); VariablesPointers_int["wait_timeout"] = make_tuple(&variables.wait_timeout, 0, 0, true); - + VariablesPointers_int["data_packets_history_size"] = make_tuple(&variables.data_packets_history_size, 0, 0, true); } diff --git a/lib/ProxySQL_GloVars.cpp b/lib/ProxySQL_GloVars.cpp index 66cb8f8362..490fe51aec 100644 --- a/lib/ProxySQL_GloVars.cpp +++ b/lib/ProxySQL_GloVars.cpp @@ -144,6 +144,7 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() : #endif global.sqlite3_server=false; + global.data_packets_history_size=0; #ifdef PROXYSQLCLICKHOUSE global.clickhouse_server=false; #endif /* PROXYSQLCLICKHOUSE */ diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 4302dace71..9d762715f4 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -141,8 +141,23 @@ static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) #define queue_r_ptr(_q) ((unsigned char *)_q.buffer+_q.tail) #define queue_w_ptr(_q) ((unsigned char *)_q.buffer+_q.head) +#define add_to_data_packet_history(_o,_p,_s) if (unlikely(GloVars.global.data_packets_history_size)) { \ + if (static_cast(_o.get_max_size()) != GloVars.global.data_packets_history_size) { \ + _o.set_max_size(GloVars.global.data_packets_history_size); \ + } \ + _o.push(_p,_s);\ +} - +// memory deallocation responsibility is now transferred to the queue as the buffer is directly assigned to it. +// if the size of data_packet_history is 0, the memory will be released. +#define add_to_data_packet_history_without_alloc(_o,_p,_s) if (unlikely(GloVars.global.data_packets_history_size)) { \ + if (static_cast(_o.get_max_size()) != GloVars.global.data_packets_history_size) { \ + _o.set_max_size(GloVars.global.data_packets_history_size); \ + } \ + _o.push(_p,_s);\ +} else { \ + l_free(_s,_p); \ +} //enum sslstatus { SSLSTATUS_OK, SSLSTATUS_WANT_IO, SSLSTATUS_FAIL}; static enum sslstatus get_sslstatus(SSL* ssl, int n) @@ -417,6 +432,11 @@ void MySQL_Data_Stream::init() { if (PSarrayOUT==NULL) PSarrayOUT= new PtrSizeArray(); // if (PSarrayOUTpending==NULL) PSarrayOUTpending= new PtrSizeArray(); if (resultset==NULL) resultset = new PtrSizeArray(); + + if (unlikely(GloVars.global.data_packets_history_size)) { + data_packets_history_IN.set_max_size(GloVars.global.data_packets_history_size); + data_packets_history_OUT.set_max_size(GloVars.global.data_packets_history_size); + } } if (myds_type!=MYDS_FRONTEND) { queue_destroy(queueIN); @@ -941,6 +961,7 @@ int MySQL_Data_Stream::buffer2array() { memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size); queue_r(queueIN, queueIN.pkt.size); PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size); queueIN.pkt.ptr = NULL; } else { if (PSarrayIN->len == 0) { @@ -951,6 +972,7 @@ int MySQL_Data_Stream::buffer2array() { memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size); queue_r(queueIN, queueIN.pkt.size); PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size); queueIN.pkt.ptr = NULL; } else { // get a pointer to the last entry in PSarrayIN @@ -963,6 +985,7 @@ int MySQL_Data_Stream::buffer2array() { memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size); queue_r(queueIN, queueIN.pkt.size); PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size); queueIN.pkt.ptr = NULL; } else { // we append the packet at the end of the previous packet @@ -1125,6 +1148,7 @@ int MySQL_Data_Stream::buffer2array() { queueIN.pkt.ptr=NULL; } else { PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size); pkts_recv++; queueIN.pkt.size=0; queueIN.pkt.ptr=NULL; @@ -1245,7 +1269,8 @@ int MySQL_Data_Stream::array2buffer() { if (PSarrayOUT->len-idx) { proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Removing a packet from array\n", sess, this); if (queueOUT.pkt.ptr) { - l_free(queueOUT.pkt.size,queueOUT.pkt.ptr); + //l_free(queueOUT.pkt.size,queueOUT.pkt.ptr); + add_to_data_packet_history_without_alloc(data_packets_history_OUT,queueOUT.pkt.ptr,queueOUT.pkt.size); queueOUT.pkt.ptr=NULL; } //VALGRIND_ENABLE_ERROR_REPORTING; @@ -1290,7 +1315,8 @@ int MySQL_Data_Stream::array2buffer() { ret=b; if (queueOUT.partial==queueOUT.pkt.size) { if (queueOUT.pkt.ptr) { - l_free(queueOUT.pkt.size,queueOUT.pkt.ptr); + //l_free(queueOUT.pkt.size,queueOUT.pkt.ptr); + add_to_data_packet_history_without_alloc(data_packets_history_OUT,queueOUT.pkt.ptr,queueOUT.pkt.size); queueOUT.pkt.ptr=NULL; } proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Packet completely written into send buffer\n", sess, this);