Skip to content

Commit

Permalink
Implemented data packet history queue for storing packets sent/receiv…
Browse files Browse the repository at this point in the history
…ed by frontend and backend.
  • Loading branch information
rahim-kanji committed May 19, 2023
1 parent 490c445 commit fb7edb5
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 5 deletions.
2 changes: 2 additions & 0 deletions include/MySQL_Data_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class MySQL_Data_Stream

PtrSizeArray *PSarrayIN;
PtrSizeArray *PSarrayOUT;
FixedSizeQueue data_packets_history_IN;
FixedSizeQueue data_packets_history_OUT;
//PtrSizeArray *PSarrayOUTpending;
PtrSizeArray *resultset;
unsigned int resultset_length;
Expand Down
1 change: 1 addition & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ class MySQL_Threads_Handler
bool enable_server_deprecate_eof;
bool enable_load_data_local_infile;
bool log_mysql_warnings_enabled;
int data_packets_history_size;
} variables;
struct {
unsigned int mirror_sessions_current;
Expand Down
82 changes: 81 additions & 1 deletion include/gen_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#define __CLASS_PTR_ARRAY_H

#include <memory>

#include <queue>
#include "proxysql.h"
#include "sqlite3db.h"

Expand Down Expand Up @@ -189,6 +189,86 @@ class PtrSizeArray {
return intsize;
}
};

typedef struct {
void* data = nullptr;
size_t len = 0;
size_t capacity = 0;
} buffer_t;

class FixedSizeQueue : public std::queue<buffer_t> {
private:
using std::queue<buffer_t>::push;
using std::queue<buffer_t>::emplace;
using std::queue<buffer_t>::swap;
size_t _max_size = 0;

public:
FixedSizeQueue() = default;
FixedSizeQueue(size_t max_size) : _max_size(max_size) {}
~FixedSizeQueue() {
while (empty() == false) {
auto& node = front();
l_free(node.len, node.data);
pop();
}
}

inline
size_t get_max_size() const {
return _max_size;
}

void set_max_size(size_t max_size) {
if (_max_size == max_size)
return;

_max_size = max_size;

if (size() > max_size) {
while (size() != max_size) {
auto& node = front();
l_free(node.len, node.data);
pop();
}
}
}

// using template here to create compile-time separate definition of push, one for true and one for false
template<bool ALLOC_MEM = true>
void push(void* buff, size_t len) {
if (_max_size == 0) return;
assert(buff && len);

buffer_t mybuff;

if (size() == _max_size) {
mybuff = front();
pop();
}

if (ALLOC_MEM == true) {
if (mybuff.capacity < len) {
if (mybuff.data) free(mybuff.data);

mybuff.data = l_alloc(len);
mybuff.capacity = len;
}

memcpy(mybuff.data, buff, len);
mybuff.len = len;

} else {
if (mybuff.data) free(mybuff.data);

mybuff.data = buff;
mybuff.capacity = mybuff.len = len;
}

emplace(mybuff);
}
};

#endif /* __CLASS_PTR_ARRAY_H */


Expand Down
1 change: 1 addition & 0 deletions include/proxysql_glovars.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class ProxySQL_GlobalVariables {
char * ssl_key_pem_mem;
char * ssl_cert_pem_mem;
bool sqlite3_server;
int data_packets_history_size;
#ifdef PROXYSQLCLICKHOUSE
bool clickhouse_server;
#endif /* PROXYSQLCLICKHOUSE */
Expand Down
14 changes: 13 additions & 1 deletion lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"stats_time_backend_query",
(char *)"stats_time_query_processor",
(char *)"query_cache_stores_empty_result",
(char *)"data_packets_history_size",
NULL
};

Expand Down Expand Up @@ -1198,6 +1199,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.enable_server_deprecate_eof=true;
variables.enable_load_data_local_infile=false;
variables.log_mysql_warnings_enabled=false;
variables.data_packets_history_size=0;
// status variables
status_variables.mirror_sessions_current=0;
__global_MySQL_Thread_Variables_version=1;
Expand Down Expand Up @@ -2081,6 +2083,16 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi
}
return false;
}
if (!strcasecmp(name,"data_packets_history_size")) {
int intv=atoi(value);
if (intv >= 0 && intv < INT_MAX) {
variables.data_packets_history_size = intv;
GloVars.global.data_packets_history_size = intv;
return true;
} else {
return false;
}
}
return false;
}

Expand Down Expand Up @@ -2259,7 +2271,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["binlog_reader_connect_retry_msec"] = make_tuple(&variables.binlog_reader_connect_retry_msec, 0, 0, true);
VariablesPointers_int["eventslog_format"] = make_tuple(&variables.eventslog_format, 0, 0, true);
VariablesPointers_int["wait_timeout"] = make_tuple(&variables.wait_timeout, 0, 0, true);

VariablesPointers_int["data_packets_history_size"] = make_tuple(&variables.data_packets_history_size, 0, 0, true);

}

Expand Down
1 change: 1 addition & 0 deletions lib/ProxySQL_GloVars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() :
#endif

global.sqlite3_server=false;
global.data_packets_history_size=0;
#ifdef PROXYSQLCLICKHOUSE
global.clickhouse_server=false;
#endif /* PROXYSQLCLICKHOUSE */
Expand Down
32 changes: 29 additions & 3 deletions lib/mysql_data_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,23 @@ static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len)
#define queue_r_ptr(_q) ((unsigned char *)_q.buffer+_q.tail)
#define queue_w_ptr(_q) ((unsigned char *)_q.buffer+_q.head)

#define add_to_data_packet_history(_o,_p,_s) if (unlikely(GloVars.global.data_packets_history_size)) { \
if (static_cast<int>(_o.get_max_size()) != GloVars.global.data_packets_history_size) { \
_o.set_max_size(GloVars.global.data_packets_history_size); \
} \
_o.push(_p,_s);\
}


// memory deallocation responsibility is now transferred to the queue as the buffer is directly assigned to it.
// if the size of data_packet_history is 0, the memory will be released.
#define add_to_data_packet_history_without_alloc(_o,_p,_s) if (unlikely(GloVars.global.data_packets_history_size)) { \
if (static_cast<int>(_o.get_max_size()) != GloVars.global.data_packets_history_size) { \
_o.set_max_size(GloVars.global.data_packets_history_size); \
} \
_o.push<false>(_p,_s);\
} else { \
l_free(_s,_p); \
}
//enum sslstatus { SSLSTATUS_OK, SSLSTATUS_WANT_IO, SSLSTATUS_FAIL};

static enum sslstatus get_sslstatus(SSL* ssl, int n)
Expand Down Expand Up @@ -417,6 +432,11 @@ void MySQL_Data_Stream::init() {
if (PSarrayOUT==NULL) PSarrayOUT= new PtrSizeArray();
// if (PSarrayOUTpending==NULL) PSarrayOUTpending= new PtrSizeArray();
if (resultset==NULL) resultset = new PtrSizeArray();

if (unlikely(GloVars.global.data_packets_history_size)) {
data_packets_history_IN.set_max_size(GloVars.global.data_packets_history_size);
data_packets_history_OUT.set_max_size(GloVars.global.data_packets_history_size);
}
}
if (myds_type!=MYDS_FRONTEND) {
queue_destroy(queueIN);
Expand Down Expand Up @@ -941,6 +961,7 @@ int MySQL_Data_Stream::buffer2array() {
memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size);
queue_r(queueIN, queueIN.pkt.size);
PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size);
add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size);
queueIN.pkt.ptr = NULL;
} else {
if (PSarrayIN->len == 0) {
Expand All @@ -951,6 +972,7 @@ int MySQL_Data_Stream::buffer2array() {
memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size);
queue_r(queueIN, queueIN.pkt.size);
PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size);
add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size);
queueIN.pkt.ptr = NULL;
} else {
// get a pointer to the last entry in PSarrayIN
Expand All @@ -963,6 +985,7 @@ int MySQL_Data_Stream::buffer2array() {
memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size);
queue_r(queueIN, queueIN.pkt.size);
PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size);
add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size);
queueIN.pkt.ptr = NULL;
} else {
// we append the packet at the end of the previous packet
Expand Down Expand Up @@ -1125,6 +1148,7 @@ int MySQL_Data_Stream::buffer2array() {
queueIN.pkt.ptr=NULL;
} else {
PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size);
add_to_data_packet_history(data_packets_history_IN,queueIN.pkt.ptr,queueIN.pkt.size);
pkts_recv++;
queueIN.pkt.size=0;
queueIN.pkt.ptr=NULL;
Expand Down Expand Up @@ -1245,7 +1269,8 @@ int MySQL_Data_Stream::array2buffer() {
if (PSarrayOUT->len-idx) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Removing a packet from array\n", sess, this);
if (queueOUT.pkt.ptr) {
l_free(queueOUT.pkt.size,queueOUT.pkt.ptr);
//l_free(queueOUT.pkt.size,queueOUT.pkt.ptr);
add_to_data_packet_history_without_alloc(data_packets_history_OUT,queueOUT.pkt.ptr,queueOUT.pkt.size);
queueOUT.pkt.ptr=NULL;
}
//VALGRIND_ENABLE_ERROR_REPORTING;
Expand Down Expand Up @@ -1290,7 +1315,8 @@ int MySQL_Data_Stream::array2buffer() {
ret=b;
if (queueOUT.partial==queueOUT.pkt.size) {
if (queueOUT.pkt.ptr) {
l_free(queueOUT.pkt.size,queueOUT.pkt.ptr);
//l_free(queueOUT.pkt.size,queueOUT.pkt.ptr);
add_to_data_packet_history_without_alloc(data_packets_history_OUT,queueOUT.pkt.ptr,queueOUT.pkt.size);
queueOUT.pkt.ptr=NULL;
}
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Packet completely written into send buffer\n", sess, this);
Expand Down

0 comments on commit fb7edb5

Please sign in to comment.