Skip to content

Commit

Permalink
Merge pull request #3946 from sysown/v2.x-3923
Browse files Browse the repository at this point in the history
WIP: Fix 'mysql-auto_increment_delay_multiplex_timeout_ms' - Closes #3923
  • Loading branch information
renecannao authored Sep 14, 2022
2 parents 8c48b37 + 45d2bf2 commit 83ffb72
Show file tree
Hide file tree
Showing 7 changed files with 958 additions and 173 deletions.
17 changes: 16 additions & 1 deletion include/MySQL_Session.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#ifndef __CLASS_MYSQL_SESSION_H
#define __CLASS_MYSQL_SESSION_H

#include <functional>
#include <vector>

#include "proxysql.h"
#include "cpp.h"
#include "MySQL_Variables.h"
Expand Down Expand Up @@ -155,7 +159,11 @@ class MySQL_Session
void init();
void reset();
void add_ldap_comment_to_pkt(PtrSize_t *);

/**
* @brief Performs the required housekeeping operations over the session and its connections before
* performing any processing on received client packets.
*/
void housekeeping_before_pkts();
int get_pkts_from_client(bool&, PtrSize_t&);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t&);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(PtrSize_t&);
Expand Down Expand Up @@ -215,6 +223,12 @@ class MySQL_Session
PtrArray *mybes;
MySQL_Data_Stream *client_myds;
MySQL_Data_Stream *server_myds;
/*
* @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the
* maintenance thread. These values will be used to release the retained connections in the specific
* hostgroups in housekeeping operations, before client packet processing. Currently 'housekeeping_before_pkts'.
*/
std::vector<int32_t> hgs_expired_conns {};
char * default_schema;
char * user_attributes;

Expand Down Expand Up @@ -319,6 +333,7 @@ class MySQL_Session
void Memory_Stats();
void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds);
bool handle_command_query_kill(PtrSize_t *);
void update_expired_conns(const std::vector<std::function<bool(MySQL_Connection*)>>&);
/**
* @brief Performs the final operations after current query has finished to be executed. It updates the session
* 'transaction_persistent_hostgroup', and updates the 'MySQL_Data_Stream' and 'MySQL_Connection' before
Expand Down
2 changes: 1 addition & 1 deletion include/mysql_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class MySQL_Connection {
bool IsServerOffline();
bool IsAutoCommit();
bool AutocommitFalse_AndSavepoint();
bool MultiplexDisabled();
bool MultiplexDisabled(bool check_delay_token = true);
bool IsKeepMultiplexEnabledVariables(char *query_digest_text);
void ProcessQueryAndSetStatusFlags(char *query_digest_text);
void optimize();
Expand Down
1 change: 1 addition & 0 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,7 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lo
MySrvC *mysrvc=NULL;
if (_lock)
wrlock();
c->auto_increment_delay_token = 0;
status.myconnpoll_push++;
mysrvc=(MySrvC *)c->parent;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
Expand Down
84 changes: 81 additions & 3 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

#define EXPMARIA

using std::function;
using std::vector;

static inline char is_digit(char c) {
if(c >= '0' && c <= '9')
Expand Down Expand Up @@ -694,6 +696,29 @@ MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) {
return NULL; // NULL = backend not found
};

void MySQL_Session::update_expired_conns(const vector<function<bool(MySQL_Connection*)>>& checks) {
for (uint32_t i = 0; i < mybes->len; i++) {
MySQL_Backend* mybe = static_cast<MySQL_Backend*>(mybes->index(i));
MySQL_Data_Stream* myds = mybe != nullptr ? mybe->server_myds : nullptr;
MySQL_Connection* myconn = myds != nullptr ? myds->myconn : nullptr;

if (myconn != nullptr) {
const bool is_active_transaction = myconn->IsActiveTransaction();
const bool multiplex_disabled = myconn->MultiplexDisabled(false);
const bool is_idle = myconn->async_state_machine == ASYNC_IDLE;

// Make sure the connection is reusable before performing any check
if (myconn->reusable==true && is_active_transaction==false && multiplex_disabled==false && is_idle) {
for (const function<bool(MySQL_Connection*)>& check : checks) {
if (check(myconn)) {
this->hgs_expired_conns.push_back(mybe->hostgroup_id);
break;
}
}
}
}
}
}

MySQL_Backend * MySQL_Session::create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) {
MySQL_Backend *_mybe=new MySQL_Backend();
Expand Down Expand Up @@ -4395,6 +4420,9 @@ int MySQL_Session::RunQuery(MySQL_Data_Stream *myds, MySQL_Connection *myconn) {

// this function was inline
void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
// NOTE: Maintenance of 'multiplex_delayed' has been moved to 'housekeeping_before_pkts'. The previous impl
// is left below as an example of how to perform a more passive maintenance over session connections.
/*
if (mybes) {
MySQL_Backend *_mybe;
unsigned int i;
Expand All @@ -4415,6 +4443,34 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
}
}
}
*/
}

void MySQL_Session::housekeeping_before_pkts() {
if (mysql_thread___multiplexing) {
for (const int hg_id : hgs_expired_conns) {
MySQL_Backend* mybe = find_backend(hg_id);

if (mybe != nullptr) {
MySQL_Data_Stream* myds = mybe->server_myds;

if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) {
if (mysql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
} else {
myds->return_MySQL_Connection_To_Pool();
}
}
}
// We are required to perform a cleanup after consuming the elements, thus preventing any subsequent
// 'handler' call to perform recomputing of the already processed elements.
if (hgs_expired_conns.empty() == false) {
hgs_expired_conns.clear();
}
}
}

// this function was inline
Expand Down Expand Up @@ -4469,6 +4525,7 @@ int MySQL_Session::handler() {
}
}

housekeeping_before_pkts();
handler_ret = get_pkts_from_client(wrong_pass, pkt);
if (handler_ret != 0) {
return handler_ret;
Expand Down Expand Up @@ -7307,6 +7364,7 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_
new_myds->myprot.init(&new_myds, new_myds->myconn->userinfo, NULL);
new_sess->status = RESETTING_CONNECTION;
mc->async_state_machine = ASYNC_IDLE; // may not be true, but is used to correctly perform error handling
mc->auto_increment_delay_token = 0;
new_myds->DSS = STATE_MARIADB_QUERY;
thread->register_session_connection_handler(new_sess,true);
if (new_myds->mypolls==NULL) {
Expand Down Expand Up @@ -7427,9 +7485,29 @@ void MySQL_Session::finishQuery(MySQL_Data_Stream *myds, MySQL_Connection *mycon
myds->myconn->set_status(true, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX);
}
}
if (mysql_thread___multiplexing && (myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
if (mysql_thread___connection_delay_multiplex_ms && mirror==false) {
myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000;

const bool is_active_transaction = myds->myconn->IsActiveTransaction();
const bool multiplex_disabled_by_status = myds->myconn->MultiplexDisabled(false);

const bool multiplex_delayed = myds->myconn->auto_increment_delay_token > 0;
const bool multiplex_delayed_with_timeout =
!multiplex_disabled_by_status && multiplex_delayed && mysql_thread___auto_increment_delay_multiplex_timeout_ms > 0;

const bool multiplex_disabled = !multiplex_disabled_by_status && (!multiplex_delayed || multiplex_delayed_with_timeout);
const bool conn_is_reusable = myds->myconn->reusable == true && !is_active_transaction && multiplex_disabled;

if (mysql_thread___multiplexing && conn_is_reusable) {
if ((mysql_thread___connection_delay_multiplex_ms || multiplex_delayed_with_timeout) && mirror==false) {
if (multiplex_delayed_with_timeout) {
uint64_t delay_multiplex_us = mysql_thread___connection_delay_multiplex_ms * 1000;
uint64_t auto_increment_delay_us = mysql_thread___auto_increment_delay_multiplex_timeout_ms * 1000;
uint64_t delay_us = delay_multiplex_us > auto_increment_delay_us ? delay_multiplex_us : auto_increment_delay_us;

myds->wait_until=thread->curtime + delay_us;
} else {
myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000;
}

myconn->async_state_machine=ASYNC_IDLE;
myconn->multiplex_delayed=true;
myds->DSS=STATE_MARIADB_GENERIC;
Expand Down
40 changes: 35 additions & 5 deletions lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
//#define __CLASS_STANDARD_MYSQL_THREAD_H

#include <functional>
#include <vector>

#include "MySQL_HostGroups_Manager.h"
#include "prometheus_helpers.h"
#define MYSQL_THREAD_IMPLEMENTATION
Expand All @@ -17,6 +21,9 @@
#include "MySQL_PreparedStatement.h"
#include "MySQL_Logger.hpp"

using std::vector;
using std::function;

#ifdef DEBUG
MySQL_Session *sess_stopat;
#endif
Expand Down Expand Up @@ -1641,6 +1648,14 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi
// integer variable ?
std::unordered_map<std::string, std::tuple<int *, int, int, bool>>::const_iterator it = VariablesPointers_int.find(nameS);
if (it != VariablesPointers_int.end()) {
// Log warnings for variables with possibly wrong values
if (nameS == "auto_increment_delay_multiplex_timeout_ms") {
int intv = atoi(value);
if (intv <= 60) {
proxy_warning("'mysql-auto_increment_delay_multiplex_timeout_ms' is set to a low value: %ums. Remember value is in 'ms'\n", intv);
}
}

bool special_variable = std::get<3>(it->second); // if special_variable is true, min and max values are ignored, and more input validation is needed
if (special_variable == false) {
int intv=atoi(value);
Expand Down Expand Up @@ -3733,12 +3748,27 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig
}
}

if (sess->mybe && sess->mybe->server_myds && sess->mybe->server_myds->myconn) {
MySQL_Connection* myconn = sess->mybe->server_myds->myconn;
// Perform the maintenance for expired connections on the session
if (mysql_thread___multiplexing) {
const auto auto_incr_delay_multiplex_check = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool {
const uint64_t multiplex_timeout_ms = mysql_thread___auto_increment_delay_multiplex_timeout_ms;
const bool multiplex_delayed_enabled = multiplex_timeout_ms != 0 && myconn->auto_increment_delay_token > 0;
const bool timeout_expired = multiplex_delayed_enabled && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime;
return timeout_expired;
};

if (mysql_thread___auto_increment_delay_multiplex_timeout_ms != 0 && (sess_time/1000 > (unsigned long long)mysql_thread___auto_increment_delay_multiplex_timeout_ms)) {
myconn->auto_increment_delay_token = 0;
}
const auto conn_delay_multiplex = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool {
const bool multiplex_delayed = mysql_thread___connection_delay_multiplex_ms != 0 && myconn->multiplex_delayed == true;
const bool timeout_expired = multiplex_delayed && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime;
return timeout_expired;
};

const vector<function<bool(MySQL_Connection*)>> expire_conn_checks {
auto_incr_delay_multiplex_check,
conn_delay_multiplex
};

sess->update_expired_conns(expire_conn_checks);
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/mysql_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2321,14 +2321,14 @@ bool MySQL_Connection::IsAutoCommit() {
return ret;
}

bool MySQL_Connection::MultiplexDisabled() {
bool MySQL_Connection::MultiplexDisabled(bool check_delay_token) {
// status_flags stores information about the status of the connection
// can be used to determine if multiplexing can be enabled or not
bool ret=false;
if (status_flags & (STATUS_MYSQL_CONNECTION_TRANSACTION|STATUS_MYSQL_CONNECTION_USER_VARIABLE|STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT|STATUS_MYSQL_CONNECTION_LOCK_TABLES|STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE|STATUS_MYSQL_CONNECTION_GET_LOCK|STATUS_MYSQL_CONNECTION_NO_MULTIPLEX|STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0|STATUS_MYSQL_CONNECTION_FOUND_ROWS|STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) ) {
ret=true;
}
if (auto_increment_delay_token) return true;
if (check_delay_token && auto_increment_delay_token) return true;
return ret;
}

Expand Down
Loading

0 comments on commit 83ffb72

Please sign in to comment.