Skip to content

Commit

Permalink
fix(proxy-wasm) improve instance recycling robustness
Browse files Browse the repository at this point in the history
* Allow for recycling the root instance (not managed by isolation modes)
* Allow for smoother recycling of http instances (including when the
  root instance has trapped)
* Also recycle trapped instances in the busy queue
* Less verbose context creation of fresh and/or recycled instances
  starting in atypical steps (e.g. on_response_headers).
  • Loading branch information
thibaultcha committed Sep 7, 2023
1 parent a999747 commit 73e76db
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 66 deletions.
105 changes: 70 additions & 35 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters,
pwexec->pool = pool;
pwexec->filter = filter;
pwexec->parent = pwctx;
pwexec->root_id = filter->id;

pwexec->log = ngx_pcalloc(pwexec->pool, sizeof(ngx_log_t));
if (pwexec->log == NULL) {
return NULL;
Expand All @@ -344,14 +346,12 @@ ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters,
pwexec->log_ctx.pwexec = pwexec;
pwexec->log_ctx.orig_log = log;

ictx = ngx_proxy_wasm_get_instance(filter, pwstore, log);
ictx = ngx_proxy_wasm_get_instance(filter, pwstore, pwexec, log);
if (ictx == NULL) {
return NULL;
}

pwexec->ictx = ictx;
pwexec->id = ictx->next_id++;
pwexec->root_id = filter->id;

ngx_wasm_assert(pwexec->index + 1 == pwctx->pwexecs.nelts);

Expand Down Expand Up @@ -461,7 +461,7 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx,
filter = pwexec->filter;
action = pwctx->action;

dd("action: %d", action);
dd("enter (pwexec: %p, action: %d)", pwexec, action);

if (pwexec->ecode) {
if (!pwexec->ecode_logged
Expand Down Expand Up @@ -611,7 +611,6 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_step_e step)
{
ngx_int_t rc;
ngx_proxy_wasm_err_e ecode;
ngx_proxy_wasm_action_e action = NGX_PROXY_WASM_ACTION_CONTINUE;
ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent;
ngx_proxy_wasm_filter_t *filter = pwexec->filter;
Expand Down Expand Up @@ -657,12 +656,6 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
#ifdef NGX_WASM_RESPONSE_TRAILERS
case NGX_PROXY_WASM_STEP_RESP_TRAILERS:
#endif
ecode = ngx_proxy_wasm_on_start(ictx, filter, 0);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
pwexec->ecode = ecode;
goto done;
}

rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_LOG:
Expand Down Expand Up @@ -1177,7 +1170,6 @@ ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter)
static ngx_int_t
ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter)
{
ngx_proxy_wasm_err_e ecode;
ngx_proxy_wasm_instance_t *ictx;

ngx_wasm_assert(filter->loaded);
Expand All @@ -1190,24 +1182,12 @@ ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter)
return NGX_OK;
}

ictx = ngx_proxy_wasm_get_instance(filter, filter->store, filter->log);
ictx = ngx_proxy_wasm_get_instance(filter, filter->store, NULL,
filter->log);
if (ictx == NULL) {
return NGX_ERROR;
}

/*
* update instance log
* FFI-injected filters have a valid log while the instance's
* might be outdated.
*/
ngx_wavm_instance_set_data(ictx->instance, ictx, filter->log);

ecode = ngx_proxy_wasm_on_start(ictx, filter, 1);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
filter->ecode = ecode;
return NGX_ERROR;
}

filter->started = 1;

return NGX_OK;
Expand All @@ -1216,12 +1196,16 @@ ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter)

ngx_proxy_wasm_instance_t *
ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,
ngx_proxy_wasm_store_t *store, ngx_log_t *log)
ngx_proxy_wasm_store_t *store, ngx_proxy_wasm_exec_t *pwexec,
ngx_log_t *log)
{
ngx_queue_t *q;
ngx_pool_t *pool;
ngx_wavm_module_t *module = filter->module;
ngx_proxy_wasm_instance_t *ictx = NULL;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_err_e ecode;

dd("enter (pwexec: %p)", pwexec);

if (store == NULL) {
dd("no store, jump to create");
Expand Down Expand Up @@ -1250,7 +1234,7 @@ ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,
}

if (ictx->module == filter->module) {
dd("reuse instance");
dd("reuse busy instance");
ngx_wasm_assert(ictx->nrefs);
goto reuse;
}
Expand All @@ -1262,17 +1246,29 @@ ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,
{
ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q);

if (ictx->instance->trapped) {
ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0,
"\"%V\" filter freeing trapped instance "
"(ictx: %p, store: %p)",
filter->name, ictx, store);

q = ngx_queue_next(&ictx->q);
ngx_proxy_wasm_release_instance(ictx, 1);
continue;
}

if (ictx->module == filter->module) {
dd("reuse free instance");
ngx_wasm_assert(ictx->nrefs == 0);
ngx_queue_remove(&ictx->q);
goto reuse;
}
}

dd("create instance in store: %p", store);

create:

dd("create instance in store: %p", store);

ictx = ngx_pcalloc(pool, sizeof(ngx_proxy_wasm_instance_t));
if (ictx == NULL) {
goto error;
Expand Down Expand Up @@ -1302,26 +1298,63 @@ ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter,

goto done;

error:

return NULL;

reuse:

ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0,
"\"%V\" filter reusing instance "
"(ictx: %p, nrefs: %d, store: %p)",
filter->name, ictx, ictx->nrefs + 1, store);

goto done;

done:

if (store && !ictx->nrefs) {
ngx_queue_insert_tail(&store->busy, &ictx->q);
}

if (pwexec) {
if (pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID) {
ngx_wasm_assert(pwexec->id == 0);
pwexec->id = ictx->next_id++;
}

if (pwexec->ecode) {
/* recycled instance */
pwexec->ecode = NGX_PROXY_WASM_ERR_NONE;
pwexec->ecode_logged = 0;
}
}

ictx->nrefs++;
ictx->pwexec = pwexec;

/**
* update instance log
* FFI-injected filters have a valid log while the instance's
* might be outdated.
*/
ngx_wavm_instance_set_data(ictx->instance, ictx, log);

/* create wasm context (root/http) */

ecode = ngx_proxy_wasm_on_start(ictx, filter, pwexec == NULL);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
if (pwexec) {
pwexec->ecode = ecode;

} else {
filter->ecode = ecode;
}

goto error;
}

return ictx;

error:

return NULL;
}


Expand Down Expand Up @@ -1421,6 +1454,8 @@ ngx_proxy_wasm_on_start(ngx_proxy_wasm_instance_t *ictx,
ngx_wavm_instance_t *instance = ictx->instance;
wasm_val_vec_t *rets;

dd("enter (pwexec: %p, ictx: %p)", pwexec, ictx);

rexec = ngx_proxy_wasm_root_lookup(ictx, filter->id);
if (rexec == NULL) {
rexec = ngx_pcalloc(ictx->pool, sizeof(ngx_proxy_wasm_exec_t));
Expand Down
2 changes: 1 addition & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ ngx_wavm_ptr_t ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size);
void ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store);
ngx_proxy_wasm_instance_t *ngx_proxy_wasm_get_instance(
ngx_proxy_wasm_filter_t *filter, ngx_proxy_wasm_store_t *store,
ngx_log_t *log);
ngx_proxy_wasm_exec_t *pwexec, ngx_log_t *log);
void ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx,
unsigned sweep);

Expand Down
5 changes: 2 additions & 3 deletions src/common/proxy_wasm/ngx_proxy_wasm_properties.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,6 @@ ngx_proxy_wasm_properties_get_ngx(ngx_proxy_wasm_ctx_t *pwctx,
hash = hash_str(name.data, name.len);

rctx = (ngx_http_wasm_req_ctx_t *) pwctx->data;

if (rctx == NULL || rctx->fake_request) {
ngx_wavm_log_error(NGX_LOG_ERR, pwctx->log, NULL,
"cannot get ngx properties outside of a request");
Expand Down Expand Up @@ -861,8 +860,8 @@ static ngx_int_t
ngx_proxy_wasm_properties_get_host(ngx_proxy_wasm_ctx_t *pwctx,
ngx_str_t *path, ngx_str_t *value)
{
host_props_node_t *hpn;
uint32_t hash;
host_props_node_t *hpn;
#ifdef NGX_WASM_HTTP
ngx_http_wasm_req_ctx_t *rctx = pwctx->data;

Expand Down Expand Up @@ -893,9 +892,9 @@ static ngx_int_t
ngx_proxy_wasm_properties_set_host(ngx_proxy_wasm_ctx_t *pwctx,
ngx_str_t *path, ngx_str_t *value)
{
host_props_node_t *hpn;
uint32_t hash;
unsigned new_entry = 1;
host_props_node_t *hpn;
#ifdef NGX_WASM_HTTP
ngx_http_wasm_req_ctx_t *rctx = pwctx->data;

Expand Down
5 changes: 4 additions & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev)
return;
}

ictx = ngx_proxy_wasm_get_instance(filter, filter->store, filter->log);
ictx = ngx_proxy_wasm_get_instance(filter, filter->store, pwexec,
filter->log);
if (ictx == NULL) {
ngx_wasm_log_error(NGX_LOG_ERR, log, 0,
"tick_handler: no instance");
Expand Down Expand Up @@ -246,6 +247,8 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev)
pwexec->ev->data = pwexec;
pwexec->ev->log = log;

dd("scheduling next tick in %ld", pwexec->tick_period);

ngx_add_timer(pwexec->ev, pwexec->tick_period);
}

Expand Down
2 changes: 1 addition & 1 deletion src/wasm/vm/ngx_wavm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ ngx_wavm_log_error(ngx_uint_t level, ngx_log_t *log, ngx_wrt_err_t *e,
wasm_byte_vec_delete(&trapmsg);

#if (NGX_WASM_HAVE_V8 || NGX_WASM_HAVE_WASMER)
if (ctx->instance) {
if (ctx && ctx->instance) {
wasm_trap_trace(e->trap, &trace);

if (trace.size > 0) {
Expand Down
28 changes: 28 additions & 0 deletions t/03-proxy_wasm/003-on_tick.t
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,31 @@ qr/.*?(\[error\]|Uncaught RuntimeError: |\s+)tick_period already set.*
[stub2]
[stub3]
--- must_die
=== TEST 6: proxy_wasm - on_tick with trap
Should recycle the root instance
Should not prevent http context/instance from starting
--- skip_no_debug: 7
--- wasm_modules: on_phases
--- config
location /t {
proxy_wasm on_phases 'tick_period=200 on_tick=trap';
return 200;
}
--- response_body
--- grep_error_log eval: qr/\[(info|crit)].*?on_tick.*/
--- grep_error_log_out eval
qr/.*?
\[info\] .*? on_tick 200.*
\[crit\] .*? panicked at 'on_tick trap'.*
.*?
\[info\] .*? on_tick 200.*
\[crit\] .*? panicked at 'on_tick trap'.*/
--- error_log
filter 1/1 resuming "on_request_headers" step
filter 1/1 resuming "on_response_headers" step
--- no_error_log
[emerg]
[alert]
8 changes: 4 additions & 4 deletions t/03-proxy_wasm/007-on_http_instance_isolation.t
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ should use an instance per stream
#0 on_vm_start[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter new instance[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter reusing instance[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_response_headers" step in "header_filter" phase[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_response_headers" step in "header_filter" phase[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_response_body" step in "body_filter" phase[^#*]*
Expand All @@ -156,11 +156,11 @@ should use an instance per stream
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*\Z/,
qr/\A\*\d+ .*? filter new instance[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter reusing instance[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_response_headers" step in "header_filter" phase[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_response_headers" step in "header_filter" phase[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_response_body" step in "body_filter" phase[^#*]*
Expand Down
Loading

0 comments on commit 73e76db

Please sign in to comment.