Skip to content

Commit e3cd24c

Browse files
committed
feat: trait for HTTP request handler
1 parent 3a2d869 commit e3cd24c

File tree

6 files changed

+297
-187
lines changed

6 files changed

+297
-187
lines changed

examples/async.rs

Lines changed: 65 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@ use std::time::Instant;
66

77
use ngx::core;
88
use ngx::ffi::{
9-
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt,
10-
ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
11-
ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
9+
ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_module_t, ngx_int_t,
10+
ngx_module_t, ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
1211
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG,
1312
};
14-
use ngx::http::{self, HttpModule, MergeConfigError};
15-
use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
16-
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
13+
use ngx::http::{self, HttpModule, HttpModuleLocationConf, HttpRequestHandler, MergeConfigError};
14+
use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string};
1715
use tokio::runtime::Runtime;
1816

1917
struct Module;
@@ -25,18 +23,10 @@ impl http::HttpModule for Module {
2523

2624
unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
2725
// SAFETY: this function is called with non-NULL cf always
28-
let cf = &mut *cf;
29-
let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf");
30-
31-
let h = ngx_array_push(
32-
&mut cmcf.phases[ngx_http_phases_NGX_HTTP_ACCESS_PHASE as usize].handlers,
33-
) as *mut ngx_http_handler_pt;
34-
if h.is_null() {
35-
return core::Status::NGX_ERROR.into();
36-
}
37-
// set an Access phase handler
38-
*h = Some(async_access_handler);
39-
core::Status::NGX_OK.into()
26+
let cf = unsafe { &mut *cf };
27+
http::add_phase_handler::<AsyncAccessHandler, _>(cf)
28+
.map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK)
29+
.into()
4030
}
4131
}
4232

@@ -139,63 +129,70 @@ impl Drop for RequestCTX {
139129
}
140130
}
141131

142-
http_request_handler!(async_access_handler, |request: &mut http::Request| {
143-
let co = Module::location_conf(request).expect("module config is none");
132+
struct AsyncAccessHandler;
144133

145-
ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
134+
impl HttpRequestHandler for AsyncAccessHandler {
135+
const PHASE: ngx::http::Phases = ngx::http::Phases::Access;
136+
type ReturnType = Option<ngx_int_t>;
146137

147-
if !co.enable {
148-
return core::Status::NGX_DECLINED;
149-
}
138+
fn handler(request: &mut http::Request) -> Option<ngx_int_t> {
139+
let co = Module::location_conf(request).expect("module config is none");
140+
141+
ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
150142

151-
if let Some(ctx) =
152-
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
153-
{
154-
if !ctx.done.load(Ordering::Relaxed) {
155-
return core::Status::NGX_AGAIN;
143+
if !co.enable {
144+
return Some(core::Status::NGX_DECLINED.into());
156145
}
157146

158-
return core::Status::NGX_OK;
159-
}
147+
if let Some(ctx) =
148+
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
149+
{
150+
if !ctx.done.load(Ordering::Relaxed) {
151+
return Some(core::Status::NGX_AGAIN.into());
152+
}
160153

161-
let ctx = request.pool().allocate(RequestCTX::default());
162-
if ctx.is_null() {
163-
return core::Status::NGX_ERROR;
154+
return Some(core::Status::NGX_OK.into());
155+
}
156+
157+
let ctx = request.pool().allocate(RequestCTX::default());
158+
if ctx.is_null() {
159+
return Some(core::Status::NGX_ERROR.into());
160+
}
161+
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });
162+
163+
let ctx = unsafe { &mut *ctx };
164+
ctx.event.handler = Some(check_async_work_done);
165+
ctx.event.data = request.connection().cast();
166+
ctx.event.log = unsafe { (*request.connection()).log };
167+
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };
168+
169+
// Request is no longer needed and can be converted to something movable to the async block
170+
let req = AtomicPtr::new(request.into());
171+
let done_flag = ctx.done.clone();
172+
173+
let rt = ngx_http_async_runtime();
174+
ctx.task = Some(rt.spawn(async move {
175+
let start = Instant::now();
176+
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
177+
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
178+
// not really thread safe, we should apply all these operation in nginx thread
179+
// but this is just an example. proper way would be storing these headers in the request ctx
180+
// and apply them when we get back to the nginx thread.
181+
req.add_header_out(
182+
"X-Async-Time",
183+
start.elapsed().as_millis().to_string().as_str(),
184+
);
185+
186+
done_flag.store(true, Ordering::Release);
187+
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
188+
// in the nginx event loop. To workaround it we can notify the event loop using
189+
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
190+
// and use the same trick as the thread pool)
191+
}));
192+
193+
Some(core::Status::NGX_AGAIN.into())
164194
}
165-
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });
166-
167-
let ctx = unsafe { &mut *ctx };
168-
ctx.event.handler = Some(check_async_work_done);
169-
ctx.event.data = request.connection().cast();
170-
ctx.event.log = unsafe { (*request.connection()).log };
171-
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };
172-
173-
// Request is no longer needed and can be converted to something movable to the async block
174-
let req = AtomicPtr::new(request.into());
175-
let done_flag = ctx.done.clone();
176-
177-
let rt = ngx_http_async_runtime();
178-
ctx.task = Some(rt.spawn(async move {
179-
let start = Instant::now();
180-
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
181-
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
182-
// not really thread safe, we should apply all these operation in nginx thread
183-
// but this is just an example. proper way would be storing these headers in the request ctx
184-
// and apply them when we get back to the nginx thread.
185-
req.add_header_out(
186-
"X-Async-Time",
187-
start.elapsed().as_millis().to_string().as_str(),
188-
);
189-
190-
done_flag.store(true, Ordering::Release);
191-
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
192-
// in the nginx event loop. To workaround it we can notify the event loop using
193-
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
194-
// and use the same trick as the thread pool)
195-
}));
196-
197-
core::Status::NGX_AGAIN
198-
});
195+
}
199196

200197
extern "C" fn ngx_http_async_commands_set_enable(
201198
cf: *mut ngx_conf_t,

examples/awssig.rs

Lines changed: 82 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ use std::ffi::{c_char, c_void};
33
use http::HeaderMap;
44
use ngx::core;
55
use ngx::ffi::{
6-
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_http_handler_pt, ngx_http_module_t,
7-
ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t,
6+
ngx_command_t, ngx_conf_t, ngx_http_module_t, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t,
87
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE,
98
NGX_HTTP_SRV_CONF, NGX_LOG_EMERG,
109
};
1110
use ngx::http::*;
12-
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
11+
use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string};
1312

1413
struct Module;
1514

@@ -20,18 +19,10 @@ impl HttpModule for Module {
2019

2120
unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
2221
// SAFETY: this function is called with non-NULL cf always
23-
let cf = &mut *cf;
24-
let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf");
25-
26-
let h = ngx_array_push(
27-
&mut cmcf.phases[ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE as usize].handlers,
28-
) as *mut ngx_http_handler_pt;
29-
if h.is_null() {
30-
return core::Status::NGX_ERROR.into();
31-
}
32-
// set an phase handler
33-
*h = Some(awssigv4_header_handler);
34-
core::Status::NGX_OK.into()
22+
let cf = unsafe { &mut *cf };
23+
ngx::http::add_phase_handler::<AwsSigV4HeaderHandler, _>(cf)
24+
.map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK)
25+
.into()
3526
}
3627
}
3728

@@ -261,82 +252,89 @@ extern "C" fn ngx_http_awssigv4_commands_set_s3_endpoint(
261252
ngx::core::NGX_CONF_OK
262253
}
263254

264-
http_request_handler!(awssigv4_header_handler, |request: &mut Request| {
265-
// get Module Config from request
266-
let conf = Module::location_conf(request).expect("module conf");
267-
ngx_log_debug_http!(request, "AWS signature V4 module {}", {
268-
if conf.enable {
269-
"enabled"
270-
} else {
271-
"disabled"
272-
}
273-
});
274-
if !conf.enable {
275-
return core::Status::NGX_DECLINED;
276-
}
255+
struct AwsSigV4HeaderHandler;
277256

278-
// TODO: build url properly from the original URL from client
279-
let method = request.method();
280-
if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) {
281-
return HTTPStatus::FORBIDDEN.into();
282-
}
257+
impl HttpRequestHandler for AwsSigV4HeaderHandler {
258+
const PHASE: Phases = Phases::PreContent;
259+
type ReturnType = Option<ngx_int_t>;
283260

284-
let datetime = chrono::Utc::now();
285-
let uri = match request.unparsed_uri().to_str() {
286-
Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v),
287-
Err(_) => return core::Status::NGX_DECLINED,
288-
};
261+
fn handler(request: &mut Request) -> Option<ngx_int_t> {
262+
// get Module Config from request
263+
let conf = Module::location_conf(request).expect("module conf");
264+
ngx_log_debug_http!(request, "AWS signature V4 module {}", {
265+
if conf.enable {
266+
"enabled"
267+
} else {
268+
"disabled"
269+
}
270+
});
271+
if !conf.enable {
272+
return Some(core::Status::NGX_DECLINED.into());
273+
}
274+
275+
// TODO: build url properly from the original URL from client
276+
let method = request.method();
277+
if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) {
278+
return Some(HTTPStatus::FORBIDDEN.into());
279+
}
289280

290-
let datetime_now = datetime.format("%Y%m%dT%H%M%SZ");
291-
let datetime_now = datetime_now.to_string();
281+
let datetime = chrono::Utc::now();
282+
let uri = match request.unparsed_uri().to_str() {
283+
Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v),
284+
Err(_) => return Some(core::Status::NGX_DECLINED.into()),
285+
};
292286

293-
let signature = {
294-
// NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap.
295-
// Iterate over requests headers_in and copy into HeaderMap
296-
// Copy only headers that will be used to sign the request
297-
let mut headers = HeaderMap::new();
298-
for (name, value) in request.headers_in_iterator() {
299-
if let Ok(name) = name.to_str() {
300-
if name.to_lowercase() == "host" {
301-
if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) {
302-
headers.insert(http::header::HOST, value);
303-
} else {
304-
return core::Status::NGX_DECLINED;
287+
let datetime_now = datetime.format("%Y%m%dT%H%M%SZ");
288+
let datetime_now = datetime_now.to_string();
289+
290+
let signature = {
291+
// NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap.
292+
// Iterate over requests headers_in and copy into HeaderMap
293+
// Copy only headers that will be used to sign the request
294+
let mut headers = HeaderMap::new();
295+
for (name, value) in request.headers_in_iterator() {
296+
if let Ok(name) = name.to_str() {
297+
if name.to_lowercase() == "host" {
298+
if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) {
299+
headers.insert(http::header::HOST, value);
300+
} else {
301+
return Some(core::Status::NGX_DECLINED.into());
302+
}
305303
}
304+
} else {
305+
return Some(core::Status::NGX_DECLINED.into());
306306
}
307-
} else {
308-
return core::Status::NGX_DECLINED;
309307
}
310-
}
311-
headers.insert("X-Amz-Date", datetime_now.parse().unwrap());
312-
ngx_log_debug_http!(request, "headers {:?}", headers);
313-
ngx_log_debug_http!(request, "method {:?}", method);
314-
ngx_log_debug_http!(request, "uri {:?}", uri);
315-
ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now);
316-
317-
let s = aws_sign_v4::AwsSign::new(
318-
method.as_str(),
319-
&uri,
320-
&datetime,
321-
&headers,
322-
"us-east-1",
323-
conf.access_key.as_str(),
324-
conf.secret_key.as_str(),
325-
"s3",
326-
"",
327-
);
328-
s.sign()
329-
};
308+
headers.insert("X-Amz-Date", datetime_now.parse().unwrap());
309+
ngx_log_debug_http!(request, "headers {:?}", headers);
310+
ngx_log_debug_http!(request, "method {:?}", method);
311+
ngx_log_debug_http!(request, "uri {:?}", uri);
312+
ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now);
313+
314+
let s = aws_sign_v4::AwsSign::new(
315+
method.as_str(),
316+
&uri,
317+
&datetime,
318+
&headers,
319+
"us-east-1",
320+
conf.access_key.as_str(),
321+
conf.secret_key.as_str(),
322+
"s3",
323+
"",
324+
);
325+
s.sign()
326+
};
330327

331-
request.add_header_in("authorization", signature.as_str());
332-
request.add_header_in("X-Amz-Date", datetime_now.as_str());
328+
request.add_header_in("authorization", signature.as_str());
329+
request.add_header_in("X-Amz-Date", datetime_now.as_str());
333330

334-
for (name, value) in request.headers_out_iterator() {
335-
ngx_log_debug_http!(request, "headers_out {name}: {value}",);
336-
}
337-
for (name, value) in request.headers_in_iterator() {
338-
ngx_log_debug_http!(request, "headers_in {name}: {value}",);
339-
}
331+
for (name, value) in request.headers_out_iterator() {
332+
ngx_log_debug_http!(request, "headers_out {name}: {value}",);
333+
}
334+
for (name, value) in request.headers_in_iterator() {
335+
ngx_log_debug_http!(request, "headers_in {name}: {value}",);
336+
}
340337

341-
core::Status::NGX_OK
342-
});
338+
Some(core::Status::NGX_OK.into())
339+
}
340+
}

0 commit comments

Comments
 (0)