Skip to content

Commit

Permalink
wip: Prevent a slow moving sql query to block replication
Browse files Browse the repository at this point in the history
Signed-off-by: Akshat Sikarwar <[email protected]>
  • Loading branch information
akshatsikarwar committed Sep 11, 2024
1 parent edf1425 commit 090f85a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 11 deletions.
4 changes: 2 additions & 2 deletions db/dohsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ static int dohsql_dist_next_row(struct sqlclntstate *clnt, sqlite3_stmt *stmt)
}

/* did client disconnect? */
if (check_sql_client_disconnect(clnt, __FILE__, __LINE__)) {
if (check_sql_client_disconnect(clnt, __FILE__, __LINE__, NULL)) {
_signal_children_master_is_done(conns);
return SQLITE_EARLYSTOP_DOHSQL;
}
Expand Down Expand Up @@ -1827,7 +1827,7 @@ static int dohsql_dist_next_row_ordered(struct sqlclntstate *clnt,
}

/* did client disconnect? */
if (check_sql_client_disconnect(clnt, __FILE__, __LINE__)) {
if (check_sql_client_disconnect(clnt, __FILE__, __LINE__, NULL)) {
_signal_children_master_is_done(conns);
return SQLITE_EARLYSTOP_DOHSQL;
}
Expand Down
2 changes: 1 addition & 1 deletion db/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,7 @@ void ssl_set_clnt_user(struct sqlclntstate *);
/* use backup to restore the sqlite3 plugin interface */
void clnt_plugin_reset(struct sqlclntstate *clnt);

int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line);
int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line, int *did_check);

/* Convert a sequence of Mem * to a serialized sqlite row */
int sqlite3_unpacked_to_packed(Mem *mems, int nmems, char **ret_rec,
Expand Down
19 changes: 16 additions & 3 deletions db/sqlglue.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,14 @@ static int is_sqlite_db_init(BtCursor *pCur)
return 0;
}

int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line)
int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line, int *did_check)
{
extern int gbl_epoch_time;
extern int gbl_watchdog_disable_at_start;
if (gbl_watchdog_disable_at_start)
return 0;
if (gbl_epoch_time && (gbl_epoch_time - clnt->last_check_time > 5)) {
if (did_check) *did_check = 1;
clnt->last_check_time = gbl_epoch_time;
if (!gbl_notimeouts && peer_dropped_connection(clnt)) {
logmsg(LOGMSG_INFO, "Peer dropped connection %s:%d\n", file, line);
Expand Down Expand Up @@ -688,7 +689,8 @@ static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
}
}

if (check_sql_client_disconnect(clnt, __FILE__, __LINE__)) {
int did_check = 0; // has it been 5 seconds since last check
if (check_sql_client_disconnect(clnt, __FILE__, __LINE__, &did_check)) {
rc = SQLITE_ABORT;
goto done;
}
Expand All @@ -698,6 +700,17 @@ static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
goto done;
}

if (no_recover_deadlock || !did_check) {
goto done;
}

int recover_deadlock_evbuffer(struct sqlclntstate *clnt);
rc = recover_deadlock_evbuffer(clnt);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: recover_deadlock failed sql:\"%.32s%s\"\n",
__func__, clnt->sql, strlen(clnt->sql) > 32 ? "..." : "");
}

done:
Pthread_mutex_unlock(&clnt->sql_tick_lk);
return rc;
Expand Down Expand Up @@ -7315,7 +7328,7 @@ int get_data(BtCursor *pCur, struct schema *sc, uint8_t *in, int fnum, Mem *m,

break;
default:
logmsg(LOGMSG_ERROR, "get_data_int: unhandled type %d\n", f->type);
logmsg(LOGMSG_ERROR, "%s: unhandled type %d query:%s\n", __func__, f->type, pCur->clnt->sql);
break;
}

Expand Down
11 changes: 6 additions & 5 deletions sqlite/src/func.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,8 @@ static void sleepFunc(sqlite3_context *context, int argc, sqlite3_value *argv[])
int i;
for(i = 0; i < n; i++) {
sleep(1);
int rc = comdb2_sql_tick();
if( rc ) {
sqlite3_result_error_code(context, rc);
if( comdb2_sql_tick() ){
sqlite3_result_error_code(context, SQLITE_ABORT);
return;
}
}
Expand All @@ -473,8 +472,10 @@ static void usleepFunc(sqlite3_context *context, int argc, sqlite3_value *argv[]
us = ( remain > 1000000 ) ? 1000000 : remain;
remain -= us;
usleep(us);
if( comdb2_sql_tick() )
break;
if( comdb2_sql_tick() ){
sqlite3_result_error_code(context, SQLITE_ABORT);
return;
}
}
sqlite3_result_int(context, (total - remain));
}
Expand Down

0 comments on commit 090f85a

Please sign in to comment.