Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(proxy-wasm) improve instance recycling robustness #407

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 84 additions & 47 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters,
pwexec->pool = pool;
pwexec->filter = filter;
pwexec->parent = pwctx;
pwexec->root_id = filter->id;

pwexec->log = ngx_pcalloc(pwexec->pool, sizeof(ngx_log_t));
if (pwexec->log == NULL) {
return NULL;
Expand All @@ -344,14 +346,12 @@ ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters,
pwexec->log_ctx.pwexec = pwexec;
pwexec->log_ctx.orig_log = log;

ictx = ngx_proxy_wasm_get_instance(filter, pwstore, log);
ictx = ngx_proxy_wasm_get_instance(filter, pwstore, pwexec, log);
if (ictx == NULL) {
return NULL;
}

pwexec->ictx = ictx;
pwexec->id = ictx->next_id++;
pwexec->root_id = filter->id;

ngx_wasm_assert(pwexec->index + 1 == pwctx->pwexecs.nelts);

Expand Down Expand Up @@ -461,7 +461,7 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx,
filter = pwexec->filter;
action = pwctx->action;

dd("action: %d", action);
dd("enter (pwexec: %p, action: %d)", pwexec, action);

if (pwexec->ecode) {
if (!pwexec->ecode_logged
Expand Down Expand Up @@ -611,7 +611,6 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_step_e step)
{
ngx_int_t rc;
ngx_proxy_wasm_err_e ecode;
ngx_proxy_wasm_action_e action = NGX_PROXY_WASM_ACTION_CONTINUE;
ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent;
ngx_proxy_wasm_filter_t *filter = pwexec->filter;
Expand Down Expand Up @@ -657,12 +656,6 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
#ifdef NGX_WASM_RESPONSE_TRAILERS
case NGX_PROXY_WASM_STEP_RESP_TRAILERS:
#endif
ecode = ngx_proxy_wasm_on_start(ictx, filter, 0);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
pwexec->ecode = ecode;
goto done;
}

rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_LOG:
Expand Down Expand Up @@ -885,15 +878,6 @@ ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store)
ngx_proxy_wasm_release_instance(ictx, 1);
}

while (!ngx_queue_empty(&store->sweep)) {
dd("remove sweep");
q = ngx_queue_head(&store->sweep);
ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q);

ngx_queue_remove(&ictx->q);
ngx_proxy_wasm_instance_destroy(ictx);
}

dd("exit");
}

Expand Down Expand Up @@ -1177,7 +1161,6 @@ ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter)
static ngx_int_t
ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter)
{
ngx_proxy_wasm_err_e ecode;
ngx_proxy_wasm_instance_t *ictx;

ngx_wasm_assert(filter->loaded);
Expand All @@ -1190,24 +1173,12 @@ ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter)
return NGX_OK;
}

ictx = ngx_proxy_wasm_get_instance(filter, filter->store, filter->log);
ictx = ngx_proxy_wasm_get_instance(filter, filter->store, NULL,
filter->log);
if (ictx == NULL) {
return NGX_ERROR;
}

/*
* update instance log
* FFI-injected filters have a valid log while the instance's
* might be outdated.
*/
ngx_wavm_instance_set_data(ictx->instance, ictx, filter->log);

ecode = ngx_proxy_wasm_on_start(ictx, filter, 1);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
filter->ecode = ecode;
return NGX_ERROR;
}

filter->started = 1;

return NGX_OK;
Expand All @@ -1216,12 +1187,16 @@ ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter)

ngx_proxy_wasm_instance_t *
ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,
ngx_proxy_wasm_store_t *store, ngx_log_t *log)
ngx_proxy_wasm_store_t *store, ngx_proxy_wasm_exec_t *pwexec,
ngx_log_t *log)
{
ngx_queue_t *q;
ngx_pool_t *pool;
ngx_wavm_module_t *module = filter->module;
ngx_proxy_wasm_instance_t *ictx = NULL;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_err_e ecode;

dd("enter (pwexec: %p)", pwexec);

if (store == NULL) {
dd("no store, jump to create");
Expand Down Expand Up @@ -1250,7 +1225,7 @@ ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,
}

if (ictx->module == filter->module) {
dd("reuse instance");
dd("reuse busy instance");
ngx_wasm_assert(ictx->nrefs);
goto reuse;
}
Expand All @@ -1262,17 +1237,29 @@ ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,
{
ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q);

if (ictx->instance->trapped) {
ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0,
"\"%V\" filter freeing trapped instance "
"(ictx: %p, store: %p)",
filter->name, ictx, store);

q = ngx_queue_next(&ictx->q);
ngx_proxy_wasm_release_instance(ictx, 1);
continue;
}

if (ictx->module == filter->module) {
dd("reuse free instance");
ngx_wasm_assert(ictx->nrefs == 0);
ngx_queue_remove(&ictx->q);
goto reuse;
}
}

dd("create instance in store: %p", store);

create:

dd("create instance in store: %p", store);

ictx = ngx_pcalloc(pool, sizeof(ngx_proxy_wasm_instance_t));
if (ictx == NULL) {
goto error;
Expand Down Expand Up @@ -1302,49 +1289,88 @@ ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,

goto done;

error:

return NULL;

reuse:

ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0,
"\"%V\" filter reusing instance "
"(ictx: %p, nrefs: %d, store: %p)",
filter->name, ictx, ictx->nrefs + 1, store);

goto done;

done:

if (store && !ictx->nrefs) {
ngx_queue_insert_tail(&store->busy, &ictx->q);
}

if (pwexec) {
if (pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID) {
ngx_wasm_assert(pwexec->id == 0);
pwexec->id = ictx->next_id++;
}

if (pwexec->ecode) {
/* recycled instance */
pwexec->ecode = NGX_PROXY_WASM_ERR_NONE;
pwexec->ecode_logged = 0;
}
}

ictx->nrefs++;
ictx->pwexec = pwexec;

/**
* update instance log
* FFI-injected filters have a valid log while the instance's
* might be outdated.
*/
ngx_wavm_instance_set_data(ictx->instance, ictx, log);

/* create wasm context (root/http) */

ecode = ngx_proxy_wasm_on_start(ictx, filter, pwexec == NULL);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
if (pwexec) {
pwexec->ecode = ecode;

} else {
filter->ecode = ecode;
}

goto error;
}

return ictx;

error:

return NULL;
}


void
ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx,
unsigned sweep)
{
ngx_queue_t *q;

if (sweep) {
ictx->nrefs = 0;

} else if (ictx->nrefs) {
ictx->nrefs--;
}

dd("ictx: %p (nrefs: %ld, sweep: %d)",
ictx, ictx->nrefs, sweep);
dd("ictx: %p (nrefs: %ld, sweep: %d, trapped: %d)",
ictx, ictx->nrefs, sweep, ictx->instance->trapped);

if (ictx->nrefs == 0) {
if (ictx->store) {
dd("remove from busy");
ngx_queue_remove(&ictx->q); /* remove from busy/free */

if (sweep) {
if (sweep || ictx->instance->trapped) {
dd("insert in sweep");
ngx_queue_insert_tail(&ictx->store->sweep, &ictx->q);

Expand All @@ -1353,6 +1379,15 @@ ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx,
ngx_queue_insert_tail(&ictx->store->free, &ictx->q);
}

while (!ngx_queue_empty(&ictx->store->sweep)) {
dd("sweep (destroy)");
q = ngx_queue_head(&ictx->store->sweep);
ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q);

ngx_queue_remove(&ictx->q);
ngx_proxy_wasm_instance_destroy(ictx);
}

} else {
dd("destroy");
ngx_proxy_wasm_instance_destroy(ictx);
Expand Down Expand Up @@ -1421,6 +1456,8 @@ ngx_proxy_wasm_on_start(ngx_proxy_wasm_instance_t *ictx,
ngx_wavm_instance_t *instance = ictx->instance;
wasm_val_vec_t *rets;

dd("enter (pwexec: %p, ictx: %p)", pwexec, ictx);

rexec = ngx_proxy_wasm_root_lookup(ictx, filter->id);
if (rexec == NULL) {
rexec = ngx_pcalloc(ictx->pool, sizeof(ngx_proxy_wasm_exec_t));
Expand Down
2 changes: 1 addition & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ ngx_wavm_ptr_t ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size);
void ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store);
ngx_proxy_wasm_instance_t *ngx_proxy_wasm_get_instance(
ngx_proxy_wasm_filter_t *filter, ngx_proxy_wasm_store_t *store,
ngx_log_t *log);
ngx_proxy_wasm_exec_t *pwexec, ngx_log_t *log);
void ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx,
unsigned sweep);

Expand Down
5 changes: 2 additions & 3 deletions src/common/proxy_wasm/ngx_proxy_wasm_properties.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,6 @@ ngx_proxy_wasm_properties_get_ngx(ngx_proxy_wasm_ctx_t *pwctx,
hash = hash_str(name.data, name.len);

rctx = (ngx_http_wasm_req_ctx_t *) pwctx->data;

if (rctx == NULL || rctx->fake_request) {
ngx_wavm_log_error(NGX_LOG_ERR, pwctx->log, NULL,
"cannot get ngx properties outside of a request");
Expand Down Expand Up @@ -861,8 +860,8 @@ static ngx_int_t
ngx_proxy_wasm_properties_get_host(ngx_proxy_wasm_ctx_t *pwctx,
ngx_str_t *path, ngx_str_t *value)
{
host_props_node_t *hpn;
uint32_t hash;
host_props_node_t *hpn;
#ifdef NGX_WASM_HTTP
ngx_http_wasm_req_ctx_t *rctx = pwctx->data;

Expand Down Expand Up @@ -893,9 +892,9 @@ static ngx_int_t
ngx_proxy_wasm_properties_set_host(ngx_proxy_wasm_ctx_t *pwctx,
ngx_str_t *path, ngx_str_t *value)
{
host_props_node_t *hpn;
uint32_t hash;
unsigned new_entry = 1;
host_props_node_t *hpn;
#ifdef NGX_WASM_HTTP
ngx_http_wasm_req_ctx_t *rctx = pwctx->data;

Expand Down
16 changes: 10 additions & 6 deletions src/common/proxy_wasm/ngx_proxy_wasm_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev)
#endif
ngx_proxy_wasm_filter_t *filter = pwexec->filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_err_e ecode;

dd("enter");

Expand All @@ -212,7 +213,8 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev)
return;
}

ictx = ngx_proxy_wasm_get_instance(filter, filter->store, filter->log);
ictx = ngx_proxy_wasm_get_instance(filter, filter->store, pwexec,
filter->log);
if (ictx == NULL) {
ngx_wasm_log_error(NGX_LOG_ERR, log, 0,
"tick_handler: no instance");
Expand All @@ -225,17 +227,17 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev)
#endif
pwexec->in_tick = 1;

pwexec->ecode = ngx_proxy_wasm_run_step(pwexec, ictx,
NGX_PROXY_WASM_STEP_TICK);

pwexec->in_tick = 0;
ecode = ngx_proxy_wasm_run_step(pwexec, ictx, NGX_PROXY_WASM_STEP_TICK);

ngx_proxy_wasm_release_instance(ictx, 0);

if (pwexec->ecode != NGX_PROXY_WASM_ERR_NONE) {
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
pwexec->ecode = ecode;
return;
}

pwexec->in_tick = 0;

if (!ngx_exiting) {
pwexec->ev = ngx_calloc(sizeof(ngx_event_t), log);
if (pwexec->ev == NULL) {
Expand All @@ -246,6 +248,8 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev)
pwexec->ev->data = pwexec;
pwexec->ev->log = log;

dd("scheduling next tick in %ld", pwexec->tick_period);

ngx_add_timer(pwexec->ev, pwexec->tick_period);
}

Expand Down
2 changes: 1 addition & 1 deletion src/wasm/vm/ngx_wavm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ ngx_wavm_log_error(ngx_uint_t level, ngx_log_t *log, ngx_wrt_err_t *e,
wasm_byte_vec_delete(&trapmsg);

#if (NGX_WASM_HAVE_V8 || NGX_WASM_HAVE_WASMER)
if (ctx->instance) {
if (ctx && ctx->instance) {
wasm_trap_trace(e->trap, &trace);

if (trace.size > 0) {
Expand Down
Loading
Loading