Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Prevent a slow moving sql query from blocking replication #4670

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions db/comdb2.h
Original file line number Diff line number Diff line change
Expand Up @@ -3682,6 +3682,9 @@ extern int gbl_sql_release_locks_on_slow_reader;
extern int gbl_fail_client_write_lock;
extern int gbl_server_admin_mode;

extern int gbl_epoch_time;
extern int gbl_watchdog_disable_at_start;

void csc2_free_all(void);

/* hack to temporary allow bools on production stage */
Expand Down
2 changes: 1 addition & 1 deletion db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ extern int gbl_altersc_latency_inc;
extern int gbl_sc_history_max_rows;
extern int gbl_sc_status_max_rows;
extern int gbl_rep_process_pstack_time;
extern int gbl_sql_recover_ddlk_duration;

extern void set_snapshot_impl(snap_impl_enum impl);
extern const char *snap_impl_str(snap_impl_enum impl);
Expand Down Expand Up @@ -1890,4 +1891,3 @@ const char *tunable_error(comdb2_tunable_err code)
}
return "????";
}

2 changes: 2 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2434,4 +2434,6 @@ REGISTER_TUNABLE("sc_status_max_rows", "Max number of rows returned in comdb2_sc
TUNABLE_INTEGER, &gbl_sc_status_max_rows, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("rep_process_pstack_time", "pstack the server if rep_process runs longer than time specified in secs (Default: 30s)",
TUNABLE_INTEGER, &gbl_rep_process_pstack_time, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sql_recover_ddlk_duration", "Run recover_deadlock after specified duration, if an SQL statement has waiters. To disable, set to 0 (Default: 60s)",
TUNABLE_INTEGER, &gbl_sql_recover_ddlk_duration, 0, NULL, NULL, NULL, NULL);
#endif /* _DB_TUNABLES_H */
1 change: 0 additions & 1 deletion db/process_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,6 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st)
dbenv->txns_committed, dbenv->txns_aborted, txns_applied,
n_retries, gbl_verify_tran_replays, rep_retry, max_retries);

extern int gbl_epoch_time;
extern int gbl_starttime;
logmsg(LOGMSG_USER, "uptime %ds\n",
gbl_epoch_time - gbl_starttime);
Expand Down
2 changes: 1 addition & 1 deletion db/reqlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -2389,7 +2389,7 @@ static void update_clientstats_cache(nodestats_t *entry) {
Pthread_mutex_lock(&clientstats_cache_mtx);
listc_maybe_rfl(&clientstats_cache, entry);
if (!entry->ref) {
logmsg(LOGMSG_INFO, "%s: no active references to nodestats %u, marking as evictable.\n", __func__, entry->checksum);
//logmsg(LOGMSG_INFO, "%s: no active references to nodestats %u, marking as evictable.\n", __func__, entry->checksum);
listc_abl(&clientstats_cache, entry);
}
Pthread_mutex_unlock(&clientstats_cache_mtx);
Expand Down
1 change: 1 addition & 0 deletions db/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ struct sqlclntstate {

pthread_t debug_sqlclntstate;
int last_check_time;
int last_recover_ddlk;
int query_timeout;
int statement_timedout;
struct conninfo conn;
Expand Down
28 changes: 23 additions & 5 deletions db/sqlglue.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@
#include <portmuxapi.h>
#include "cdb2_constants.h"
#include <translistener.h>
#include <sqlwriter.h>

int gbl_sql_recover_ddlk_duration = 60;
int gbl_delay_sql_lock_release_sec = 5;

unsigned long long get_id(bdb_state_type *);
Expand Down Expand Up @@ -611,8 +613,6 @@ static int is_sqlite_db_init(BtCursor *pCur)

int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line)
{
extern int gbl_epoch_time;
extern int gbl_watchdog_disable_at_start;
if (gbl_watchdog_disable_at_start)
return 0;
if (gbl_epoch_time && (gbl_epoch_time - clnt->last_check_time > 5)) {
Expand All @@ -625,6 +625,7 @@ int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line)
}
return 0;
}

/*
This is called every time the db does something (find/next/etc. on a cursor).
The query is aborted if this returns non-zero.
Expand All @@ -634,7 +635,6 @@ int gbl_debug_sleep_in_analyze;
static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
{
int rc;
extern int gbl_epoch_time;

if (thd == NULL)
return 0;
Expand Down Expand Up @@ -698,6 +698,21 @@ static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
goto done;
}

if (no_recover_deadlock ||
clnt->last_recover_ddlk == 0 ||
gbl_sql_recover_ddlk_duration == 0 ||
gbl_epoch_time - clnt->last_recover_ddlk < gbl_sql_recover_ddlk_duration
){
goto done;
}

rc = recover_deadlock_evbuffer(clnt);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: recover_deadlock failed sql:\"%.32s%s\"\n",
__func__, clnt->sql, strlen(clnt->sql) > 32 ? "..." : "");
}
clnt->last_recover_ddlk = gbl_epoch_time;

done:
Pthread_mutex_unlock(&clnt->sql_tick_lk);
return rc;
Expand Down Expand Up @@ -7315,7 +7330,7 @@ int get_data(BtCursor *pCur, struct schema *sc, uint8_t *in, int fnum, Mem *m,

break;
default:
logmsg(LOGMSG_ERROR, "get_data_int: unhandled type %d\n", f->type);
logmsg(LOGMSG_ERROR, "%s: unhandled type %d query:%s\n", __func__, f->type, pCur->clnt->sql);
break;
}

Expand Down Expand Up @@ -10032,8 +10047,11 @@ static int recover_deadlock_flags_int(bdb_state_type *bdb_state,
} else {
rc = SQLITE_SCHEMA;
}
} else
} else {
rdlock_schema_lk();
rc = get_curtran_flags(thedb->bdb_env, clnt, curtran_flags);
unlock_schema_lk();
}

if (rc) {
char *err;
Expand Down
6 changes: 2 additions & 4 deletions db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -3651,6 +3651,7 @@ void run_stmt_setup(struct sqlclntstate *clnt, sqlite3_stmt *stmt)
} else {
clnt->has_recording = v->recording;
}
clnt->last_recover_ddlk = gbl_epoch_time;
clnt->nsteps = 0;
comdb2_set_sqlite_vdbe_tzname_int(v, clnt);
comdb2_set_sqlite_vdbe_dtprec_int(v, clnt);
Expand Down Expand Up @@ -5522,10 +5523,7 @@ int recover_deadlock_evbuffer(struct sqlclntstate *clnt)
if (gbl_fail_client_write_lock && !(rand() % gbl_fail_client_write_lock)) {
flags = RECOVER_DEADLOCK_FORCE_FAIL;
}
if (!recover_deadlock_flags(env, clnt, NULL, 0, __func__, __LINE__, flags)) {
return -1;
}
return 0;
return recover_deadlock_flags(env, clnt, NULL, 0, __func__, __LINE__, flags);
}

static int recover_deadlock_sbuf(struct sqlclntstate *clnt)
Expand Down
1 change: 0 additions & 1 deletion lua/sp.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ extern int gbl_lua_new_trans_model;
extern int gbl_max_lua_instructions;
extern int gbl_lua_version;
extern int gbl_notimeouts;
extern int gbl_epoch_time;
extern int gbl_allow_lua_print;
extern int gbl_allow_lua_dynamic_libs;
extern int gbl_lua_prepare_max_retries;
Expand Down
21 changes: 14 additions & 7 deletions net/sqlwriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ struct sqlwriter {
unsigned timed_out : 1;
unsigned wr_continue : 1;
unsigned packing : 1; /* 1 if writer is in sql_pack_response and wr_lock is held. */
unsigned recover_ddlk_failed : 1;
struct ssl_data *ssl_data;
int (*wr_evbuffer_fn)(struct sqlwriter *, int);
};

static void sql_trickle_cb(int fd, short what, void *arg);

static void sql_enable_flush(struct sqlwriter *writer)
{
struct timeval recover_ddlk_timeout = {.tv_sec = 1};
Expand Down Expand Up @@ -171,8 +170,8 @@ static int wr_evbuffer(struct sqlwriter *writer, int fd)
static void sql_flush_cb(int fd, short what, void *arg)
{
struct sqlwriter *writer = arg;
if (what & EV_TIMEOUT) {
recover_deadlock_evbuffer(writer->clnt);
if ((what & EV_TIMEOUT) && !writer->recover_ddlk_failed) {
writer->recover_ddlk_failed = !!recover_deadlock_evbuffer(writer->clnt);
}
if (!(what & EV_WRITE)) {
return;
Expand All @@ -195,8 +194,14 @@ static void sql_flush_cb(int fd, short what, void *arg)
static int sql_flush_int(struct sqlwriter *writer)
{
sql_enable_flush(writer);
event_base_dispatch(writer->wr_base);
return (writer->wr_continue && !writer->bad) ? 0 : -1;
event_base_dispatch(writer->wr_base); /* -> sql_flush_cb */
if (writer->bad ||
writer->recover_ddlk_failed ||
writer->wr_continue == 0
){
return -1;
}
return 0;
}

int sql_flush(struct sqlwriter *writer)
Expand Down Expand Up @@ -405,6 +410,7 @@ void sql_reset(struct sqlwriter *writer)
writer->sent_at = time(NULL);
writer->timed_out = 0;
writer->wr_continue = 1;
writer->recover_ddlk_failed = 0;
}

int sql_peer_check(struct sqlwriter *writer)
Expand Down Expand Up @@ -483,7 +489,6 @@ struct sqlwriter *sqlwriter_new(struct sqlwriter_arg *arg)
writer->pack_hb = arg->pack_hb;
writer->timer_base = arg->timer_base;
writer->timer_thd = pthread_self();
writer->wr_continue = 1;
writer->wr_buf = evbuffer_new();

struct event_config *cfg = event_config_new();
Expand All @@ -496,6 +501,8 @@ struct sqlwriter *sqlwriter_new(struct sqlwriter_arg *arg)
writer->heartbeat_ev = event_new(writer->timer_base, arg->fd, EV_PERSIST, sql_heartbeat_cb, writer);
writer->heartbeat_trickle_ev = event_new(writer->timer_base, arg->fd, EV_WRITE, sql_trickle_cb, writer);

sql_reset(writer);

return writer;
}

Expand Down
11 changes: 6 additions & 5 deletions sqlite/src/func.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,8 @@ static void sleepFunc(sqlite3_context *context, int argc, sqlite3_value *argv[])
int i;
for(i = 0; i < n; i++) {
sleep(1);
int rc = comdb2_sql_tick();
if( rc ) {
sqlite3_result_error_code(context, rc);
if( comdb2_sql_tick() ){
sqlite3_result_error_code(context, SQLITE_ERROR);
return;
}
}
Expand All @@ -473,8 +472,10 @@ static void usleepFunc(sqlite3_context *context, int argc, sqlite3_value *argv[]
us = ( remain > 1000000 ) ? 1000000 : remain;
remain -= us;
usleep(us);
if( comdb2_sql_tick() )
break;
if( us == 1000000 && comdb2_sql_tick() ){
sqlite3_result_error_code(context, SQLITE_ERROR);
return;
}
}
sqlite3_result_int(context, (total - remain));
}
Expand Down
1 change: 1 addition & 0 deletions tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@
(name='sql_optimize_shadows', description='', type='BOOLEAN', value='OFF', read_only='N')
(name='sql_queueing_critical_trace', description='Produce trace when SQL request queue is this deep.', type='INTEGER', value='100', read_only='N')
(name='sql_queueing_disable_trace', description='Disable trace when SQL requests are starting to queue.', type='BOOLEAN', value='OFF', read_only='N')
(name='sql_recover_ddlk_duration', description='Run recover_deadlock after specified duration, if an SQL statement has waiters. To disable, set to 0 (Default: 60s)', type='INTEGER', value='60', read_only='N')
(name='sql_release_locks_in_update_shadows', description='Release sql locks in update_shadows on lockwait', type='BOOLEAN', value='ON', read_only='N')
(name='sql_release_locks_on_emit_row_lockwait', description='Release sql locks when we are about to emit a row', type='BOOLEAN', value='OFF', read_only='N')
(name='sql_release_locks_on_si_lockwait', description='Release sql locks from si if the rep thread is waiting', type='BOOLEAN', value='ON', read_only='N')
Expand Down