Skip to content

Commit

Permalink
Implemeting of more locking in SELECT statements
Browse files Browse the repository at this point in the history
SELECT ... FOR SHARE
SELECT ... LOCK IN SHARE MODE
SELECT ... FOR (UPDATE|SHARE) [OF ...] (NOWAIT|SKIP LOCKED)
  • Loading branch information
renecannao committed Jul 1, 2019
1 parent a51a23a commit 9ed7094
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 10 deletions.
1 change: 1 addition & 0 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ class MySQL_HostGroups_Manager {
unsigned long long access_denied_wrong_password;
unsigned long long access_denied_max_connections;
unsigned long long access_denied_max_user_connections;
unsigned long long select_for_update_or_equivalent;
} status;
wqueue<MySQL_Connection *> queue;
MySQL_HostGroups_Manager();
Expand Down
2 changes: 2 additions & 0 deletions include/MySQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Query_Info {

int QueryLength;
enum MYSQL_COM_QUERY_command MyComQueryCmd;
bool bool_is_select_NOT_for_update;
bool bool_is_select_NOT_for_update_computed;

Query_Info();
~Query_Info();
Expand Down
1 change: 1 addition & 0 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() {
status.access_denied_wrong_password=0;
status.access_denied_max_connections=0;
status.access_denied_max_user_connections=0;
status.select_for_update_or_equivalent=0;
pthread_mutex_init(&readonly_mutex, NULL);
pthread_mutex_init(&Group_Replication_Info_mutex, NULL);
pthread_mutex_init(&Galera_Info_mutex, NULL);
Expand Down
79 changes: 74 additions & 5 deletions lib/MySQL_PreparedStatement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,87 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, unsigned int h,
// Query_Info::is_select_NOT_for_update()
if (ql >= 7) {
if (strncasecmp(q, (char *)"SELECT ", 7) == 0) { // is a SELECT
is_select_NOT_for_update = true;
if (ql >= 17) {
char *p = (char *)q;
char *p = q;
p += ql - 11;
if (strncasecmp(p, " FOR UPDATE", 11) ==
0) { // is a SELECT FOR UPDATE
is_select_NOT_for_update = false;
if (strncasecmp(p, " FOR UPDATE", 11) == 0) { // is a SELECT FOR UPDATE
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
p = q;
p += ql-10;
if (strncasecmp(p, " FOR SHARE", 10) == 0) { // is a SELECT FOR SHARE
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
if (ql >= 25) {
p = q;
p += ql-19;
if (strncasecmp(p, " LOCK IN SHARE MODE", 19) == 0) { // is a SELECT LOCK IN SHARE MODE
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
p = q;
p += ql-7;
if (strncasecmp(p," NOWAIT",7)==0) {
// let simplify. If NOWAIT is used, we assume FOR UPDATE|SHARE is used
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
/*
if (strcasestr(q," FOR UPDATE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
if (strcasestr(q," FOR SHARE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
*/
}
p = q;
p += ql-12;
if (strncasecmp(p," SKIP LOCKED",12)==0) {
// let simplify. If SKIP LOCKED is used, we assume FOR UPDATE|SHARE is used
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
/*
if (strcasestr(q," FOR UPDATE ")==NULL) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
if (strcasestr(q," FOR SHARE ")==NULL) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
*/
}
p=q;
char buf[129];
if (ql>=128) { // for long query, just check the last 128 bytes
p+=ql-128;
memcpy(buf,p,128);
buf[128]=0;
} else {
memcpy(buf,p,ql);
buf[ql]=0;
}
if (strcasestr(buf," FOR ")) {
if (strcasestr(buf," FOR UPDATE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
if (strcasestr(buf," FOR SHARE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
goto __exit_MySQL_STMT_Global_info___search_select;
}
}
}
}
is_select_NOT_for_update = true;
}
}
}
__exit_MySQL_STMT_Global_info___search_select:

// set default properties:
properties.cache_ttl = -1;
Expand Down
96 changes: 91 additions & 5 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ Query_Info::Query_Info() {
QueryParserArgs.digest_text=NULL;
QueryParserArgs.first_comment=NULL;
stmt_info=NULL;
bool_is_select_NOT_for_update=false;
bool_is_select_NOT_for_update_computed=false;
}

Query_Info::~Query_Info() {
Expand All @@ -185,6 +187,8 @@ void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) {
if (mysql_thread___commands_stats)
query_parser_command_type();
}
bool_is_select_NOT_for_update=false;
bool_is_select_NOT_for_update_computed=false;
}

void Query_Info::end() {
Expand Down Expand Up @@ -215,6 +219,8 @@ void Query_Info::init(unsigned char *_p, int len, bool mysql_header) {
QueryLength=(mysql_header ? len-5 : len);
QueryPointer=(mysql_header ? _p+5 : _p);
MyComQueryCmd = MYSQL_COM_QUERY__UNINITIALIZED;
bool_is_select_NOT_for_update=false;
bool_is_select_NOT_for_update_computed=false;
}

void Query_Info::query_parser_init() {
Expand Down Expand Up @@ -251,24 +257,104 @@ bool Query_Info::is_select_NOT_for_update() {
if (stmt_info) { // we are processing a prepared statement. We already have the information
return stmt_info->is_select_NOT_for_update;
}
// to avoid an expensive strlen() on the digest_text, we consider only the real query
if (QueryPointer==NULL) {
return false;
}
if (bool_is_select_NOT_for_update_computed) {
return bool_is_select_NOT_for_update;
}
bool_is_select_NOT_for_update_computed=true;
if (QueryLength<7) {
return false;
}
if (strncasecmp((char *)QueryPointer,(char *)"SELECT ",7)) {
char *QP = (char *)QueryPointer;
size_t ql = QueryLength;
// we try to use the digest, if avaiable
if (QueryParserArgs.digest_text) {
QP = QueryParserArgs.digest_text;
ql = strlen(QP);
}
if (strncasecmp(QP,(char *)"SELECT ",7)) {
return false;
}
// if we arrive till here, it is a SELECT
if (QueryLength>=17) {
char *p=(char *)QueryPointer;
p+=QueryLength-11;
if (ql>=17) {
char *p=QP;
p+=ql-11;
if (strncasecmp(p," FOR UPDATE",11)==0) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
p=QP;
p+=ql-10;
if (strncasecmp(p," FOR SHARE",10)==0) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
if (ql>=25) {
char *p=QP;
p+=ql-19;
if (strncasecmp(p," LOCK IN SHARE MODE",19)==0) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
p=QP;
p+=ql-7;
if (strncasecmp(p," NOWAIT",7)==0) {
// let simplify. If NOWAIT is used, we assume FOR UPDATE|SHARE is used
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
/*
if (strcasestr(QP," FOR UPDATE ")==NULL) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
if (strcasestr(QP," FOR SHARE ")==NULL) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
*/
}
p=QP;
p+=ql-12;
if (strncasecmp(p," SKIP LOCKED",12)==0) {
// let simplify. If SKIP LOCKED is used, we assume FOR UPDATE|SHARE is used
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
/*
if (strcasestr(QP," FOR UPDATE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
if (strcasestr(QP," FOR SHARE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
*/
}
p=QP;
char buf[129];
if (ql>=128) { // for long query, just check the last 128 bytes
p+=ql-128;
memcpy(buf,p,128);
buf[128]=0;
} else {
memcpy(buf,p,ql);
buf[ql]=0;
}
if (strcasestr(buf," FOR ")) {
if (strcasestr(buf," FOR UPDATE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
if (strcasestr(buf," FOR SHARE ")) {
__sync_fetch_and_add(&MyHGM->status.select_for_update_or_equivalent, 1);
return false;
}
}
}
}
bool_is_select_NOT_for_update=true;
return true;
}

Expand Down
6 changes: 6 additions & 0 deletions lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4330,6 +4330,12 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus(bool _memory) {
pta[1]=buf;
result->add_row(pta);
}
{ // Queries that are SELECT for update or equivalent
pta[0]=(char *)"Selects_for_update__autocommit0";
sprintf(buf,"%llu",MyHGM->status.select_for_update_or_equivalent);
pta[1]=buf;
result->add_row(pta);
}
{ // Slow queries
pta[0]=(char *)"Slow_queries";
sprintf(buf,"%llu",get_slow_queries());
Expand Down
14 changes: 14 additions & 0 deletions test/select_for_update.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
SELECT 1 FROM dual FOR UPDATE;
SET autocommit=0;
SELECT 1 FROM dual FOR UPDATE;
SELECT 1 FROM dual FOR UPDATE OF tbl1 ;
SELECT 1 FROM dual a JOIN dual b;
SELECT 1, 3, 5, 6 , 'aa' FROM dual a JOIN dual b;
ROLLBACK;
SELECT 1 FROM dual FOR SHARE;
ROLLBACK;
SELECT 1 FROM dual LOCK IN SHARE MODE ;
ROLLBACK;
SET autocommit=1;
SELECT 1 FROM dual FOR SHARE;
SELECT 1 FROM dual FOR UPDATE;

0 comments on commit 9ed7094

Please sign in to comment.