diff --git a/include/MySQL_PreparedStatement.h b/include/MySQL_PreparedStatement.h index d2048b65d3..709f41dd89 100644 --- a/include/MySQL_PreparedStatement.h +++ b/include/MySQL_PreparedStatement.h @@ -59,6 +59,7 @@ class MySQL_STMT_Global_info { uint16_t num_params; uint16_t warning_count; MYSQL_FIELD **fields; + char* first_comment; // struct { // int cache_ttl; // int timeout; @@ -66,7 +67,7 @@ class MySQL_STMT_Global_info { // } properties; bool is_select_NOT_for_update; MYSQL_BIND **params; // seems unused (?) - MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h); + MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, char *fc, MYSQL_STMT *stmt, uint64_t _h); void update_metadata(MYSQL_STMT *stmt); ~MySQL_STMT_Global_info(); }; @@ -259,7 +260,7 @@ class MySQL_STMT_Manager_v14 { void unlock() { pthread_rwlock_unlock(&rwlock_); } void ref_count_client(uint64_t _stmt, int _v, bool lock=true); void ref_count_server(uint64_t _stmt, int _v, bool lock=true); - MySQL_STMT_Global_info * add_prepared_statement(char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock=true); + MySQL_STMT_Global_info * add_prepared_statement(char *u, char *s, char *q, unsigned int ql, char *fc, MYSQL_STMT *stmt, bool lock=true); void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached, uint64_t *s_unique, uint64_t *s_total); SQLite3_result * get_prepared_statements_global_infos(); }; diff --git a/lib/MySQL_PreparedStatement.cpp b/lib/MySQL_PreparedStatement.cpp index 5483e3b70e..c8616e4575 100644 --- a/lib/MySQL_PreparedStatement.cpp +++ b/lib/MySQL_PreparedStatement.cpp @@ -133,6 +133,7 @@ void *StmtLongDataHandler::get(uint32_t _stmt_id, uint16_t _param_id, MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, + char *fc, MYSQL_STMT *stmt, uint64_t _h) { pthread_rwlock_init(&rwlock_, NULL); statement_id = id; @@ -145,6 +146,11 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, memcpy(query, q, ql); query[ql] = '\0'; // add NULL byte query_length = ql; + if (fc) { + first_comment = strdup(fc); + } else { + first_comment = NULL; + } MyComQueryCmd = MYSQL_COM_QUERY__UNINITIALIZED; num_params = stmt->param_count; num_columns = stmt->field_count; @@ -476,6 +482,9 @@ MySQL_STMT_Global_info::~MySQL_STMT_Global_info() { free(username); free(schemaname); free(query); + if (first_comment) { + free(first_comment); + } if (num_columns) { uint16_t i; for (i = 0; i < num_columns; i++) { @@ -812,10 +821,10 @@ bool MySQL_STMTs_local_v14::client_close(uint32_t client_statement_id) { MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement( char *u, char *s, char *q, unsigned int ql, - MYSQL_STMT *stmt, bool lock) { + char *fc, MYSQL_STMT *stmt, bool lock) { MySQL_STMT_Global_info *ret = NULL; uint64_t hash = stmt_compute_hash( - u, s, q, ql); // this identifies the prepared statement + u, s, q, ql); // this identifies the prepared statement if (lock) { pthread_rwlock_wrlock(&rwlock_); } @@ -847,7 +856,7 @@ MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement( //next_statement_id++; MySQL_STMT_Global_info *a = - new MySQL_STMT_Global_info(next_id, u, s, q, ql, stmt, hash); + new MySQL_STMT_Global_info(next_id, u, s, q, ql, fc, stmt, hash); // insert it in both maps map_stmt_id_to_info.insert(std::make_pair(a->statement_id, a)); map_stmt_hash_to_info.insert(std::make_pair(a->hash, a)); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 67fdcb1577..d8d9594c8d 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -3052,7 +3052,12 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C if (client_myds->myconn->local_stmts==NULL) { client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true); } - uint64_t hash=client_myds->myconn->local_stmts->compute_hash((char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); + uint64_t hash=client_myds->myconn->local_stmts->compute_hash( + (char *)client_myds->myconn->userinfo->username, + (char *)client_myds->myconn->userinfo->schemaname, + (char *)CurrentQuery.QueryPointer, + CurrentQuery.QueryLength + ); MySQL_STMT_Global_info *stmt_info=NULL; // we first lock GloStmt GloMyStmt->wrlock(); @@ -3137,7 +3142,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C if (thread->variables.stats_time_query_processor) { clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint); } - qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); + qpo=GloQPro->process_mysql_query(this,NULL,0,&CurrentQuery); if (qpo->max_lag_ms >= 0) { thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; } @@ -3858,6 +3863,7 @@ bool MySQL_Session::handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, (char *)client_myds->myconn->userinfo->schemaname, (char *)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, + CurrentQuery.QueryParserArgs.first_comment, CurrentQuery.mysql_stmt, false); if (CurrentQuery.QueryParserArgs.digest_text) { @@ -4449,6 +4455,12 @@ int MySQL_Session::handler() { stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(CurrentQuery.stmt_global_id); CurrentQuery.QueryLength=stmt_info->query_length; CurrentQuery.QueryPointer=(unsigned char *)stmt_info->query; + // NOTE: Update 'first_comment' with the the from the retrieved + // 'stmt_info' from the found prepared statement. 'CurrentQuery' requires its + // own copy of 'first_comment' because it will later be free by 'QueryInfo::end'. + if (stmt_info->first_comment) { + CurrentQuery.QueryParserArgs.first_comment=strdup(stmt_info->first_comment); + } previous_status.push(PROCESSING_STMT_EXECUTE); NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE); if (CurrentQuery.stmt_global_id!=stmt_info->statement_id) { diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index 63b52a465c..9d92602bf3 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -1319,7 +1319,7 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses qp=&stmt_exec_qp; qp->digest = qi->stmt_info->digest; qp->digest_text = qi->stmt_info->digest_text; - qp->first_comment = NULL; + qp->first_comment = qi->stmt_info->first_comment; } } #define stackbuffer_size 128 @@ -1710,7 +1710,9 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses if (len < stackbuffer_size) { // query is in the stack } else { - l_free(len+1,query); + if (ptr) { + l_free(len+1,query); + } } if (sess->mirror==false) { // we process comments only on original queries, not on mirrors if (qp && qp->first_comment) { diff --git a/test/tap/tests/reg_test_3427-stmt_first_comment1-param-t.cpp b/test/tap/tests/reg_test_3427-stmt_first_comment1-param-t.cpp new file mode 120000 index 0000000000..e365158207 --- /dev/null +++ b/test/tap/tests/reg_test_3427-stmt_first_comment1-param-t.cpp @@ -0,0 +1 @@ +reg_test_3427-stmt_first_comment1-t.cpp \ No newline at end of file diff --git a/test/tap/tests/reg_test_3427-stmt_first_comment1-t.cpp b/test/tap/tests/reg_test_3427-stmt_first_comment1-t.cpp new file mode 100644 index 0000000000..91bed356e5 --- /dev/null +++ b/test/tap/tests/reg_test_3427-stmt_first_comment1-t.cpp @@ -0,0 +1,273 @@ +/** + * @file reg_test_3427-stmt_first_comment-t.cpp + * @brief This test is a regression test for exercising all code related to + * 'first_comment' changes added in PR #3453. + * @details Testing revealed that the fix introduced for proper routing of + * prepared statements with query rules has invalid interaction with query + * annotation 'hostgroup' feature. + * For solving the issue, 'first_comment' was made part of 'MySQL_STMT_Global_info'. + * This test aims to exercise all the parts of ProxySQL affected by this change. + * + * Procedure: + * ========= + * + * The test creates a number of prepared statements and execute them, until passing + * the limit of prepared statements allowed per connection. After the connection + * has been reset by ProxySQL because of the limit exceeding, it tries to execute + * the same prepared statements again. This way those prepared statements wont be + * available in the connection and will need to be fetched by ProxySQL for the + * reset connection. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "proxysql_utils.h" +#include "tap.h" +#include "command_line.h" +#include "utils.h" +#include "errno.h" + +/** + * @brief String size of the columns created for the testing table. + */ +const int STRING_SIZE=32; +/** + * @brief Number of max stmt per connection to be configured for + * ProxySQL. + */ +const uint32_t MAX_STMT_NUM_QUERIES = 20; +/** + * @brief Number of queries to RESET the connection being target, + * it's simply: MAX_STMT_NUM_QUERIES + 1 + */ +const uint32_t RESET_CONNECTION_QUERIES = 2*MAX_STMT_NUM_QUERIES; +/** + * @brief Id for the current writer hostgroup. + */ +const uint32_t WRITER_HOSTGROUP_ID = 0; + +int main(int argc, char** argv) { + int res = EXIT_SUCCESS; + + CommandLine cl; + + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + bool param = false; + { + // we parse argv[0] to see if filename includes "param" + std::string str = std::string(argv[0]); + std::size_t found = str.find("param"); + if (found!=std::string::npos) { + param = true; + } + } + + MYSQL_STMT* stmt = nullptr; + MYSQL* proxysql_mysql = mysql_init(NULL); + MYSQL* proxysql_admin = mysql_init(NULL); + + if (!mysql_real_connect(proxysql_mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_mysql)); + return -1; + } + + if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + + stmt = mysql_stmt_init(proxysql_mysql); + if (!stmt) { + diag("mysql_stmt_init(), out of memory"); + res = EXIT_FAILURE; + goto exit; + } + + // Insert data in the table to be queried + // ************************************************************************* + + MYSQL_QUERY(proxysql_mysql, "CREATE DATABASE IF NOT EXISTS test"); + MYSQL_QUERY(proxysql_mysql, "DROP TABLE IF EXISTS test.reg_test_3427"); + MYSQL_QUERY( + proxysql_mysql, + "CREATE TABLE IF NOT EXISTS test.reg_test_3427" + " (id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, `c1` BIGINT, `c2` varchar(32))" + ); + MYSQL_QUERY(proxysql_mysql, "INSERT INTO test.reg_test_3427(c1, c2) VALUES (100, 'abcde')"); + + mysql_close(proxysql_mysql); + + // Initialize the connection again + proxysql_mysql = mysql_init(NULL); + + if (!mysql_real_connect(proxysql_mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_mysql)); + return -1; + } + + // ************************************************************************* + + { + // Set the number of maximum connections for servers in the writer hostgroup + std::string t_update_mysql_servers { + "UPDATE mysql_servers SET max_connections=1 WHERE hostgroup_id=%d" + }; + std::string update_mysql_queries {}; + string_format(t_update_mysql_servers, update_mysql_queries, WRITER_HOSTGROUP_ID); + MYSQL_QUERY(proxysql_admin, update_mysql_queries.c_str()); + MYSQL_QUERY(proxysql_admin, "LOAD MYSQL SERVERS TO RUNTIME"); + + // Set the number of maximum prepared statements per connection + std::string t_max_stmt_query { + "SET mysql-max_stmts_per_connection=%d" + }; + std::string max_stmt_query {}; + string_format(t_max_stmt_query, max_stmt_query, MAX_STMT_NUM_QUERIES); + MYSQL_QUERY(proxysql_admin, max_stmt_query.c_str()); + MYSQL_QUERY(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + + uint32_t query_id = 0; + + for (uint32_t i = 0; i < RESET_CONNECTION_QUERIES; i++) { + if (i <= MAX_STMT_NUM_QUERIES) { + query_id = i; + } else if (i == MAX_STMT_NUM_QUERIES + 1) { + query_id = 0; + } else { + query_id += 1; + } + + // create unique stmt + std::string query_t {}; + + if (param) { + query_t = "SELECT /*+ ;hostgroup=0;%d */ * FROM test.reg_test_3427 WHERE id IN (?)"; + } else { + query_t = "SELECT /*+ ;hostgroup=0;%d */ * FROM test.reg_test_3427"; + } + + std::string query {}; + string_format(query_t, query, query_id); + + if (mysql_stmt_prepare(stmt, query.c_str(), strlen(query.c_str()))) { + diag("mysql_stmt_prepare at line %d failed: %s", __LINE__ , mysql_error(proxysql_mysql)); + mysql_close(proxysql_mysql); + res = EXIT_FAILURE; + goto exit; + } + + if (param) { + MYSQL_BIND bind_params; + int64_t data_param = 1; + + memset(&bind_params, 0, sizeof(MYSQL_BIND)); + bind_params.buffer_type = MYSQL_TYPE_LONGLONG; + bind_params.buffer = (char *)&data_param; + bind_params.buffer_length = sizeof(int64_t); + + if (mysql_stmt_bind_param(stmt, &bind_params)) { + diag( + "mysql_stmt_bind_result at line %d failed: %s", __LINE__ , + mysql_stmt_error(stmt) + ); + res = EXIT_FAILURE; + goto exit; + } + } + + if (mysql_stmt_execute(stmt)) { + diag( + "mysql_stmt_execute at line %d failed: %s", __LINE__ , + mysql_stmt_error(stmt) + ); + res = EXIT_FAILURE; + goto exit; + } + + MYSQL_BIND bind[3]; + int data_id; + int64_t data_c1; + char data_c2[STRING_SIZE]; + char is_null[3]; + long unsigned int length[3]; + char error[3]; + memset(bind, 0, sizeof(bind)); + + bind[0].buffer_type = MYSQL_TYPE_LONG; + bind[0].buffer = (char *)&data_id; + bind[0].buffer_length = sizeof(int); + bind[0].is_null = &is_null[0]; + bind[0].length = &length[0]; + + bind[1].buffer_type = MYSQL_TYPE_LONGLONG; + bind[1].buffer = (char *)&data_c1; + bind[1].buffer_length = sizeof(int64_t); + bind[1].is_null = &is_null[1]; + bind[1].length = &length[1]; + + bind[2].buffer_type = MYSQL_TYPE_STRING; + bind[2].buffer = (char *)&data_c2; + bind[2].buffer_length = STRING_SIZE; + bind[2].is_null = &is_null[2]; + bind[2].length = &length[2]; + bind[2].error = &error[2]; + + if (mysql_stmt_bind_result(stmt, bind)) { + diag( + "mysql_stmt_bind_result at line %d failed: %s", __LINE__, + mysql_stmt_error(stmt) + ); + res = EXIT_FAILURE; + goto exit; + } + + if (mysql_stmt_fetch(stmt) == 1) { + diag( + "mysql_stmt_fetch at line %d failed: %s", __LINE__, + mysql_stmt_error(stmt) + ); + res = EXIT_FAILURE; + goto exit; + } + + bool data_match_expected = + (data_id == static_cast(1)) && + (data_c1 == static_cast(100)) && + (strcmp(data_c2, "abcde") == 0); + + if (data_match_expected == false) { + diag( + "Prepared statement SELECT result didn't matched expected -" + " Exp=(id:1, c1:100, c2:'abcde'), Act=(id:%d, c1:%ld, c2:'%s')", + data_id, + data_c1, + data_c2 + ); + res = EXIT_FAILURE; + goto exit; + } + } + } + +exit: + if (stmt) { mysql_stmt_close(stmt); } + mysql_close(proxysql_mysql); + mysql_close(proxysql_admin); + + return exit_status(); +} diff --git a/test/tap/tests/reg_test_3427-stmt_first_comment2-param-t.cpp b/test/tap/tests/reg_test_3427-stmt_first_comment2-param-t.cpp new file mode 120000 index 0000000000..e365158207 --- /dev/null +++ b/test/tap/tests/reg_test_3427-stmt_first_comment2-param-t.cpp @@ -0,0 +1 @@ +reg_test_3427-stmt_first_comment1-t.cpp \ No newline at end of file diff --git a/test/tap/tests/reg_test_3427-stmt_first_comment2-t.cpp b/test/tap/tests/reg_test_3427-stmt_first_comment2-t.cpp new file mode 120000 index 0000000000..e365158207 --- /dev/null +++ b/test/tap/tests/reg_test_3427-stmt_first_comment2-t.cpp @@ -0,0 +1 @@ +reg_test_3427-stmt_first_comment1-t.cpp \ No newline at end of file diff --git a/test/tap/tests/test_query_rules_routing-t.cpp b/test/tap/tests/test_query_rules_routing-t.cpp new file mode 100644 index 0000000000..ba5ec6037b --- /dev/null +++ b/test/tap/tests/test_query_rules_routing-t.cpp @@ -0,0 +1,594 @@ +/** + * @file test_query_rules_routing-t.cpp + * @brief This test is an initial version for testing query routing to + * different hostgroups through 'query rules'. It aims to check that + * arbitrary query rules are properly matched and queries are executed in + * the target hostgroups for both 'text protocol' and 'prepared statements'. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "command_line.h" +#include "proxysql_utils.h" +#include "tap.h" +#include "utils.h" + +int g_seed = 0; + +inline int fastrand() { + g_seed = (214013*g_seed+2531011); + return (g_seed>>16)&0x7FFF; +} + +inline unsigned long long monotonic_time() { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000); +} + +void gen_random_str(char *s, const int len) { + g_seed = monotonic_time() ^ getpid() ^ pthread_self(); + static const char alphanum[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + for (int i = 0; i < len; ++i) { + s[i] = alphanum[fastrand() % (sizeof(alphanum) - 1)]; + } + + s[len] = 0; +} + +/** + * @brief For now a query rules test for destination hostgroup is going + * to consist into: + * + * - A set of rules to apply. + * - A set of queries to exercise those rules. + * - The destination hostgroup in which the queries are supposed to end. + */ +using dst_hostgroup_test = + std::pair, std::vector>>; + + +/** + * @brief All supplied queries should be unique, to know that two queries + * are going to be executed in the backend when a prepared statement + * is executed: + */ +std::vector dst_hostgroup_tests { + { + { + "INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)" + " VALUES (1,1,'^SELECT.*FOR UPDATE',0,1)", + "INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)" + " VALUES (2,1,'^SELECT',1,1)" + }, + { + { + "SELECT /*+ ;%s */ 1", + 1 + }, + { + "SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id=1", + 1 + }, + { + "SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 20", + 1 + }, + { + "SELECT /*+ ;%s */ SUM(k) c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10", + 1 + }, + { + "INSERT /*+ ;%s */ INTO test.reg_test_3427_0 (k) VALUES (2)", + 0 + }, + { + "UPDATE /*+ ;%s */ test.reg_test_3427_0 SET pad=\"random\" WHERE id=2", + 0 + }, + { + "SELECT DISTINCT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10 ORDER BY c", + 1 + } + } + }, + { + { + "INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)" + " VALUES (1,1,'^SELECT.*FROM test.reg_test_3427_0 .*',1,1)", + "INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)" + " VALUES (2,1,'^SELECT.*FROM test.reg_test_3427_1 .*',0,1)", + }, + { + { + "UPDATE /*+ ;%s */ test.reg_test_3427_0 SET pad=\"random\" WHERE id=2", + 0 + }, + { + "SELECT DISTINCT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10 ORDER BY c", + 1 + }, + { + "SELECT /*+ ;%s */ c FROM test.reg_test_3427_1 WHERE id BETWEEN 1 AND 10 ORDER BY c", + 0 + }, + { + "INSERT /*+ ;%s */ INTO test.reg_test_3427_0 (k) VALUES (2)", + 0 + }, + { + "SELECT DISTINCT /*+ ;hostgroup=0;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10 ORDER BY c", + 0 + }, + } + }, + { + { + "INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)" + " VALUES (1,1,'^SELECT.*FOR UPDATE',0,1)", + "INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)" + " VALUES (2,1,'^SELECT',1,1)" + }, + { + { + "UPDATE /*+ ;%s */ test.reg_test_3427_0 SET pad=\"random\" WHERE id=2", + 0 + }, + { + "SELECT /*+ ;hostgroup=0;%s */ c FROM test.reg_test_3427_0 WHERE id=1", + 0 + }, + { + "SELECT /*+ ;hostgroup=0;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 20", + 0 + }, + { + "SELECT /*+ ;hostgroup=0;%s */ SUM(k) c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10", + 0 + }, + { + "SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id=1", + 1 + }, + { + "SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 20", + 1 + }, + { + "SELECT /*+ ;%s */ SUM(k) c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10", + 1 + } + } + } +}; + +/** + * @brief Get the current query count for a specific hostgroup. + * + * @param proxysql_admin A already opened MYSQL connection to ProxySQL admin + * interface. + * @param hostgroup_id The 'hostgroup_id' from which get the query count. + * + * @return The number of queries that have been executed in that hostgroup id. + */ +int get_hostgroup_query_count(MYSQL* proxysql_admin, const int hostgroup_id) { + if (proxysql_admin == NULL) { + return EXIT_FAILURE; + } + + int query_count = -1; + + std::string t_query { + "SELECT SUM(Queries) FROM stats.stats_mysql_connection_pool WHERE hostgroup=%d" + }; + std::string query {}; + string_format(t_query, query, hostgroup_id); + + MYSQL_QUERY(proxysql_admin, query.c_str()); + MYSQL_RES* sum_res = mysql_store_result(proxysql_admin); + MYSQL_ROW row = mysql_fetch_row(sum_res); + + if (row[0]) { + query_count = atoi(row[0]); + } + + mysql_free_result(sum_res); + + return query_count; +} + +/** + * @brief Simple function that performs a text protocol query and discards the result. + * + * @param proxysql A already opened MYSQL connection to ProxySQL. + * @param query The query to be executed. + * + * @return The error code of executing the query. + */ +int perform_text_procotol_query(MYSQL* proxysql, const std::string& query) { + int rc = mysql_query(proxysql, query.c_str()); + + if (rc == 0) { + MYSQL_RES* query_res = mysql_store_result(proxysql); + + if (query_res) { + mysql_free_result(query_res); + } + } + + return rc; +} + +/** + * @brief Simple function that performs a stmt query and discards the result. + * + * @param proxysql A already opened MYSQL connection to ProxySQL. + * @param query The query to be executed. + * + * @return The error code of executing the query. + */ +int perform_stmt_query(MYSQL* proxysql, const std::string& query) { + int rc = EXIT_FAILURE; + + MYSQL_STMT* stmt = mysql_stmt_init(proxysql); + if (stmt == NULL) { return EXIT_FAILURE; } + + rc = mysql_stmt_prepare(stmt, query.c_str(), strlen(query.c_str())); + if (rc) { return EXIT_FAILURE; } + + rc = mysql_stmt_execute(stmt); + if (rc) { return EXIT_FAILURE; } + + rc = mysql_stmt_close(stmt); + if (rc) { return EXIT_FAILURE; } + + return rc; +} + +/** + * @brief Simple helper function for creating a 'sysbench' + * alike testing table. + * + * @param proxysql A already opened MYSQL connection to ProxySQL. + * + * @return EXIT_FAILURE in case of failure or EXIT_SUCCESS otherwise. + */ +int create_testing_tables(MYSQL* proxysql, uint32_t num_tables) { + if (proxysql == NULL) { return EXIT_FAILURE; } + + MYSQL_QUERY(proxysql, "CREATE DATABASE IF NOT EXISTS test"); + + for (uint32_t i = 0; i < num_tables; i++) { + std::string t_drop_table_query { + "DROP TABLE IF EXISTS test.reg_test_3427_%d" + }; + std::string t_create_table_query { + "CREATE TABLE IF NOT EXISTS test.reg_test_3427_%d (" + " id INT NOT NULL AUTO_INCREMENT PRIMARY KEY," + " `k` int(11) NOT NULL DEFAULT '0'," + " `c` char(120) NOT NULL DEFAULT ''," + " `pad` char(60) NOT NULL DEFAULT ''," + " KEY `k_1` (`k`)" + ")" + }; + std::string t_insert_trivial_val { + "INSERT INTO test.reg_test_3427_%d (k, c, pad) VALUES (3427, 'foo', 'bar')" + }; + + // Format the queries + std::string drop_table_query {}; + string_format(t_drop_table_query, drop_table_query, i); + + std::string create_table_query {}; + string_format(t_create_table_query, create_table_query, i); + + std::string insert_trivial_val {}; + string_format(t_insert_trivial_val, insert_trivial_val, i); + + // Perform the queries + MYSQL_QUERY(proxysql, drop_table_query.c_str()); + MYSQL_QUERY(proxysql, create_table_query.c_str()); + // Insert trivial value, we are only interesting in routing + MYSQL_QUERY(proxysql, insert_trivial_val.c_str()); + } + + return EXIT_SUCCESS; +} + +const double COLISSION_PROB = 1e-8; + +/** + * @brief Helper function to wait for replication to complete, + * performs a simple supplied queried until it succeed or the + * timeout expires. + * + * @param proxysql A already opened MYSQL connection to ProxySQL. + * @param proxysql_admin A already opened MYSQL connection to ProxySQL Admin interface. + * @param check The query to perform until timeout expires. + * @param timeout The timeout in seconds to retry the query. + * @param reader_hostgroup The current 'reader hostgroup' for which + * servers replication needs to be waited. + * + * @return EXIT_SUCCESS in case of success, EXIT_FAILURE + * otherwise. + */ +int wait_for_replication( + MYSQL* proxysql, + MYSQL* proxysql_admin, + const std::string& check, + uint32_t timeout, + uint32_t read_hostgroup +) { + if (proxysql == NULL) { return EXIT_FAILURE; } + + const std::string t_count_reader_hg_servers { + "SELECT COUNT(*) FROM mysql_servers WHERE hostgroup_id=%d" + }; + std::string count_reader_hg_servers {}; + size_t size = + snprintf( + nullptr, 0, t_count_reader_hg_servers.c_str(), read_hostgroup + ) + 1; + { + std::unique_ptr buf(new char[size]); + snprintf(buf.get(), size, t_count_reader_hg_servers.c_str(), read_hostgroup); + count_reader_hg_servers = std::string(buf.get(), buf.get() + size - 1); + } + + MYSQL_QUERY(proxysql_admin, count_reader_hg_servers.c_str()); + MYSQL_RES* hg_count_res = mysql_store_result(proxysql_admin); + MYSQL_ROW row = mysql_fetch_row(hg_count_res); + uint32_t srv_count = strtoul(row[0], NULL, 10); + mysql_free_result(hg_count_res); + + if (srv_count > UINT_MAX) { + return EXIT_FAILURE; + } + + int waited = 0; + int queries = 0; + int result = EXIT_FAILURE; + + if (srv_count != 0) { + int retries = + ceil( + log10(COLISSION_PROB) / + log10(static_cast(1)/srv_count) + ); + auto start = std::chrono::system_clock::now(); + std::chrono::duration elapsed {}; + + while (elapsed.count() < timeout && queries < retries) { + int rc = mysql_query(proxysql, check.c_str()); + + if (rc == EXIT_SUCCESS) { + MYSQL_RES* st_res = mysql_store_result(proxysql); + if (st_res) { + mysql_free_result(st_res); + } + + queries += 1; + continue; + } else { + queries = 0; + waited += 1; + sleep(1); + } + + auto it_end = std::chrono::system_clock::now(); + elapsed = it_end - start; + } + + if (queries == retries) { + result = EXIT_SUCCESS; + } + } else { + result = EXIT_SUCCESS; + } + + return result; +} + +int main(int argc, char** argv) { + CommandLine cl; + + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + plan(dst_hostgroup_tests.size()); + + MYSQL* proxysql_admin = mysql_init(NULL); + MYSQL* proxysql_text = mysql_init(NULL); + MYSQL* proxysql_stmt = mysql_init(NULL); + + if (!mysql_real_connect(proxysql_text, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_text)); + return -1; + } + if (!mysql_real_connect(proxysql_stmt, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_stmt)); + return -1; + } + if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + + // Disable 'auto_increment_delay_multiplex' for avoiding unintentionally + // disabling multiplexing due to inserts. + MYSQL_QUERY(proxysql_admin, "SET mysql-auto_increment_delay_multiplex=0"); + MYSQL_QUERY(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + + // Create the testing table + int c_table_res = create_testing_tables(proxysql_text, 2); + if (c_table_res) { return EXIT_FAILURE; } + + int rep_err = wait_for_replication( + proxysql_text, + proxysql_admin, + "SELECT c FROM test.reg_test_3427_0 WHERE id=1", + 10, + 1 + ); + if (rep_err) { + fprintf(stderr, + "File %s, line %d, Error: %s\n", + __FILE__, __LINE__, "Waiting for replication failed." + ); + return EXIT_FAILURE; + } + + for (const auto& dst_hostgroup_test : dst_hostgroup_tests) { + const auto& query_rules = dst_hostgroup_test.first; + const auto& queries_hids = dst_hostgroup_test.second; + + // First prepare the query rules + // ******************************************************************** + MYSQL_QUERY(proxysql_admin, "DELETE FROM mysql_query_rules"); + + for (const auto& query_rule : query_rules) { + MYSQL_QUERY(proxysql_admin, query_rule.c_str()); + } + + MYSQL_QUERY(proxysql_admin, "LOAD MYSQL QUERY RULES TO RUNTIME"); + + // ******************************************************************** + + // Secondly execute the queries and check the hostgroup + // ******************************************************************** + + bool queries_properly_routed = true; + std::vector text_queries_failed_to_route {}; + std::vector stmt_queries_failed_to_route {}; + + for (const auto& query_hid : queries_hids) { + // Create an unique query + std::string query {}; + std::string rnd_str(static_cast(20), '\0'); + gen_random_str(&rnd_str[0], 20); + string_format(query_hid.first, query, rnd_str.c_str()); + + // First execute the query for text protocol + // ******************************************************************** + + // Get the current hosgtroup queries + int cur_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second); + + // Perform the query in a text protocol connection + int text_prot_res = perform_text_procotol_query(proxysql_text, query); + if (text_prot_res) { + diag( + "Executing 'text_protocol' query: '%s' failed with err code: '%d'", + query.c_str(), + text_prot_res + ); + return EXIT_FAILURE; + } + + // Get the new hosgtroup queries + int new_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second); + + if (new_hid_queries - cur_hid_queries != 1) { + queries_properly_routed = false; + text_queries_failed_to_route.push_back(query); + } + + // Secondly execute the query for binary protocol + // ******************************************************************** + + // Get the current hosgtroup queries + cur_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second); + + // Perform the query in a stmt protocol connection + int stmt_res = perform_stmt_query(proxysql_stmt, query); + if (stmt_res) { + diag( + "Executing 'stmt' query: '%s' failed with err code: '%d', err: '%s'", + query.c_str(), + stmt_res, + mysql_error(proxysql_stmt) + ); + return EXIT_FAILURE; + } + + // Get the new hosgtroup queries + new_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second); + + if (new_hid_queries - cur_hid_queries != 2) { + queries_properly_routed = false; + stmt_queries_failed_to_route.push_back(query); + } + } + + if (queries_properly_routed == false) { + std::string str_query_rules = + std::accumulate( + query_rules.begin(), + query_rules.end(), + std::string {}, + [](const std::string& a, const std::string& b) -> std::string { + return a + (a.length() > 0 ? "\n" : "") + b; + } + ); + + std::string str_text_queries = + std::accumulate( + text_queries_failed_to_route.begin(), + text_queries_failed_to_route.end(), + std::string {}, + [](const std::string& a, const std::string& b) -> std::string { + return a + (a.length() > 0 ? "\n" : "") + b; + } + ); + + std::string str_stmt_queries = + std::accumulate( + stmt_queries_failed_to_route.begin(), + stmt_queries_failed_to_route.end(), + std::string {}, + [](const std::string& a, const std::string& b) -> std::string { + return a + (a.length() > 0 ? "\n" : "") + b; + } + ); + + diag( + "Test with rules:\n\n%s\n\nFailed to route the following text queries:\n\n%s\n", + str_query_rules.c_str(), + str_text_queries.c_str() + ); + + diag( + "Test with rules:\n\n%s\n\nFailed to route the following stmt queries:\n\n%s\n", + str_query_rules.c_str(), + str_stmt_queries.c_str() + ); + } + + ok(queries_properly_routed, "Queries for test were properly routed to the target hostgroups"); + } + + mysql_close(proxysql_admin); + mysql_close(proxysql_stmt); + mysql_close(proxysql_text); + + return exit_status(); +}