diff --git a/example/server/main.cpp b/example/server/main.cpp index 794c71f..47cbc73 100644 --- a/example/server/main.cpp +++ b/example/server/main.cpp @@ -49,33 +49,22 @@ void install_services(capy::application& app) http::serializer::config()); } -/* - -struct json_rpc; - -// Handle POST by parsing JSON-RPC, -// storing `json_rpc` in `rp.request_data` -// This validates the JSON and can respond with an error -// -app.use("/rpc", json_rpc_post()) - -// Process the JSON-RPC command -app.post( - "/rpc", - json_post(), - do_json_rpc() - ); - -*/ - class json_sink : public http::sink { public: - explicit + json_sink(json_sink&&) = default; + json_sink( - json::value& jv) : jv_(jv) + json::storage_ptr sp = {}) + : pr_(new json::parser(std::move(sp))) { } + + auto release() -> json::value + { + return pr_->release(); + } + private: results on_write( @@ -85,29 +74,25 @@ class json_sink : public http::sink results rv; if(more) { - rv.bytes = pr_.write_some( + rv.bytes = pr_->write_some( static_cast( b.data()), b.size(), rv.ec); } else { - rv.bytes = pr_.write( + rv.bytes = pr_->write( static_cast(b.data()), b.size(), rv.ec); } if(! rv.ec.failed()) - { - jv_ = pr_.release(); return rv; - } return rv; } - json::value& jv_; - json::parser pr_; + std::unique_ptr pr_; }; -struct post_json_rpc +struct do_json_rpc { auto operator()( http::route_params& rp) const -> @@ -115,36 +100,26 @@ struct post_json_rpc { if(! rp.is_method(http::method::post)) return http::route::next; - BOOST_ASSERT(rp.parser.is_complete()); - auto& jv = rp.route_data.emplace(); - rp.parser.set_body(jv); - system::error_code ec; - rp.parser.parse(ec); - if(ec.failed()) - return ec; - return http::route::next; + return rp.read_body( + json_sink(), + [this, &rp]( + json::value jv) -> + http::route_result + { + return dispatch(std::move(jv)); + }); } -}; -struct do_json_rpc -{ - auto operator()( - http::route_params&) const -> + // process the JSON-RPC request + auto dispatch( + json::value jv) const -> http::route_result { + (void)jv; return http::route::next; } -}; -#ifdef BOOST_CAPY_HAS_CORO -auto -do_request( - http::route_params& rp) -> - capy::task -{ - co_return http::route::next; -} -#endif +}; int server_main( int argc, char* argv[] ) { @@ -170,23 +145,11 @@ int server_main( int argc, char* argv[] ) http::cors_options opts; opts.allowedHeaders = "Content-Type"; -#ifdef BOOST_CAPY_HAS_CORO - srv.wwwroot.use( http::co_route( do_request ) ); -#endif - - srv.wwwroot.use("/rpc", + srv.wwwroot.use( + "/rpc", http::cors(opts), - post_json_rpc(), - []( http::route_params& rp) -> - http::route_result - { - if(rp.parser.is_complete()) - { - // auto s = rp.parser.body(); - return http::route::next; - } - return http::route::next; - }); + do_json_rpc() + ); srv.wwwroot.use("/", serve_static( argv[3] )); app.start(); diff --git a/include/boost/beast2/impl/read.hpp b/include/boost/beast2/impl/read.hpp index 151d593..483030a 100644 --- a/include/boost/beast2/impl/read.hpp +++ b/include/boost/beast2/impl/read.hpp @@ -143,6 +143,23 @@ is_complete_condition(http::parser& pr) //------------------------------------------------ +/** Asynchronously reads some data into the parser. + + This function is used to asynchronously read data from a + stream into the parser's input sequence. This function will always + keep reading until a complete header is obtained. + The function call will invoke the completion token + with the following signature: + @code + void(system::error_code ec + std::size_t bytes_transferred); + @endcode + @note The parser's input sequence may contain additional data + beyond what was required to complete the header. + @param s The stream to read from. + @param pr The parser to read data into. + @param token The completion token. +*/ template< class AsyncReadStream, BOOST_ASIO_COMPLETION_TOKEN_FOR( @@ -163,6 +180,22 @@ async_read_some( s); } +/** Asynchronously reads data into the parser until the header is complete. + This function is used to asynchronously read data from a + stream into the parser's input sequence until the parser's + header is complete. + The function call will invoke the completion token + with the following signature: + @code + void(system::error_code ec + std::size_t bytes_transferred); + @endcode + @note The parser's input sequence may contain additional data + beyond what was required to complete the header. + @param s The stream to read from. + @param pr The parser to read data into. + @param token The completion token. +*/ template< class AsyncReadStream, BOOST_ASIO_COMPLETION_TOKEN_FOR( @@ -179,6 +212,22 @@ async_read_header( return async_read_some(s, pr, std::move(token)); } +/** Asynchronously reads data into the parser until the message is complete. + This function is used to asynchronously read data from a + stream into the parser's input sequence until the parser's + message is complete. + The function call will invoke the completion token + with the following signature: + @code + void(system::error_code ec + std::size_t bytes_transferred); + @endcode + @note The parser's input sequence may contain additional data + beyond what was required to complete the message. + @param s The stream to read from. + @param pr The parser to read data into. + @param token The completion token. +*/ template< class AsyncReadStream, BOOST_ASIO_COMPLETION_TOKEN_FOR( diff --git a/include/boost/beast2/server/http_stream.hpp b/include/boost/beast2/server/http_stream.hpp index a2a1f3b..4c3ae49 100644 --- a/include/boost/beast2/server/http_stream.hpp +++ b/include/boost/beast2/server/http_stream.hpp @@ -90,6 +90,10 @@ class http_stream std::size_t bytes_transferred); void on_headers(); void do_dispatch(http::route_result rv = {}); + void do_read_body(); + void on_read_body( + system::error_code ec, + std::size_t bytes_transferred); void do_respond(http::route_result rv); void do_write(); void on_write( @@ -121,7 +125,7 @@ class http_stream using work_guard = asio::executor_work_guard().get_executor())>; std::unique_ptr pwg_; - asio_route_params p_; + asio_route_params rp_; }; //------------------------------------------------ @@ -172,12 +176,12 @@ http_stream( , stream_(stream) , routes_(std::move(routes)) , close_(close) - , p_(stream_) + , rp_(stream_) { - p_.parser = http::request_parser(app); + rp_.parser = http::request_parser(app); - p_.serializer = http::serializer(app); - p_.suspend = http::suspender(*this); + rp_.serializer = http::serializer(app); + rp_.suspend = http::suspender(*this); } // called to start a new HTTP session. @@ -190,8 +194,8 @@ on_stream_begin( { pconfig_ = &config; - p_.parser.reset(); - p_.session_data.clear(); + rp_.parser.reset(); + rp_.session_data.clear(); do_read(); } @@ -201,9 +205,11 @@ void http_stream:: do_read() { - p_.parser.start(); + rp_.parser.start(); - beast2::async_read(stream_, p_.parser, + beast2::async_read_some( + stream_, + rp_.parser, call_mf(&http_stream::on_read, this)); } @@ -224,8 +230,6 @@ on_read( "{} http_stream::on_read bytes={}", this->id(), bytes_transferred); - BOOST_ASSERT(p_.parser.is_complete()); - on_headers(); } @@ -237,25 +241,25 @@ on_headers() { // set up Request and Response objects // VFALCO HACK for now we make a copy of the message - p_.req = p_.parser.get(); - p_.route_data.clear(); - p_.res.set_start_line( // VFALCO WTF - http::status::ok, p_.req.version()); - p_.res.set_keep_alive(p_.req.keep_alive()); - p_.serializer.reset(); + rp_.req = rp_.parser.get(); + rp_.route_data.clear(); + rp_.res.set_start_line( // VFALCO WTF + http::status::ok, rp_.req.version()); + rp_.res.set_keep_alive(rp_.req.keep_alive()); + rp_.serializer.reset(); // parse the URL { - auto rv = urls::parse_uri_reference(p_.req.target()); + auto rv = urls::parse_uri_reference(rp_.req.target()); if(rv.has_error()) { // error parsing URL - p_.status(http::status::bad_request); - p_.set_body("Bad Request: " + rv.error().message()); + rp_.status(http::status::bad_request); + rp_.set_body("Bad Request: " + rv.error().message()); return do_respond(rv.error()); } - p_.url = rv.value(); + rp_.url = rv.value(); } // invoke handlers for the route @@ -273,16 +277,48 @@ do_dispatch( { BOOST_ASSERT(! pwg_); // can't be suspended rv = routes_.dispatch( - p_.req.method(), p_.url, p_); + rp_.req.method(), rp_.url, rp_); } else { - rv = routes_.resume(p_, rv); + rv = routes_.resume(rp_, rv); } do_respond(rv); } +// finish reading the body +template +void +http_stream:: +do_read_body() +{ + beast2::async_read( + stream_, + rp_.parser, + call_mf(&http_stream::on_read_body, this)); +} + +// called repeatedly when reading the body +template +void +http_stream:: +on_read_body( + system::error_code ec, + std::size_t bytes_transferred) +{ + if(ec.failed()) + return do_fail("http_stream::on_read_body", ec); + + LOG_TRC(this->sect_)( + "{} http_stream::on_read_body bytes={}", + this->id(), bytes_transferred); + + BOOST_ASSERT(rp_.parser.is_complete()); + + rp_.do_finish(); +} + // called after obtaining a route result template void @@ -300,8 +336,8 @@ do_respond( if(rv == http::route::complete) { // VFALCO what if the connection was closed or keep-alive=false? - // handler sendt the response? - BOOST_ASSERT(p_.serializer.is_done()); + // handler sent the response? + BOOST_ASSERT(rp_.serializer.is_done()); return on_write(system::error_code(), 0); } @@ -310,6 +346,8 @@ do_respond( // didn't call suspend()? if(! pwg_) detail::throw_logic_error(); + if(rp_.parser.is_body_set()) + return do_read_body(); return; } @@ -317,19 +355,19 @@ do_respond( { // unhandled request auto const status = http::status::not_found; - p_.status(status); - p_.set_body(http::to_string(status)); + rp_.status(status); + rp_.set_body(http::to_string(status)); } else if(rv != http::route::send) { // error message of last resort BOOST_ASSERT(rv.failed()); BOOST_ASSERT(! http::is_route_result(rv)); - p_.status(http::status::internal_server_error); + rp_.status(http::status::internal_server_error); std::string s; format_to(s, "An internal server error occurred: {}", rv.message()); - p_.res.set_keep_alive(false); // VFALCO? - p_.set_body(s); + rp_.res.set_keep_alive(false); // VFALCO? + rp_.set_body(s); } do_write(); @@ -341,8 +379,8 @@ void http_stream:: do_write() { - BOOST_ASSERT(! p_.serializer.is_done()); - beast2::async_write(stream_, p_.serializer, + BOOST_ASSERT(! rp_.serializer.is_done()); + beast2::async_write(stream_, rp_.serializer, call_mf(&http_stream::on_write, this)); } @@ -359,13 +397,13 @@ on_write( if(ec.failed()) return do_fail("http_stream::on_write", ec); - BOOST_ASSERT(p_.serializer.is_done()); + BOOST_ASSERT(rp_.serializer.is_done()); LOG_TRC(this->sect_)( "{} http_stream::on_write bytes={}", this->id(), bytes_transferred); - if(p_.res.keep_alive()) + if(rp_.res.keep_alive()) return do_read(); do_close(); @@ -416,8 +454,8 @@ do_fail( LOG_TRC(this->sect_)("{}: {}", s, ec.message()); // tidy up lingering objects - p_.parser.reset(); - p_.serializer.reset(); + rp_.parser.reset(); + rp_.serializer.reset(); close_(ec); } @@ -438,9 +476,9 @@ void http_stream:: clear() noexcept { - p_.parser.reset(); - p_.serializer.reset(); - p_.res.clear(); + rp_.parser.reset(); + rp_.serializer.reset(); + rp_.res.clear(); } } // beast2 diff --git a/include/boost/beast2/server/route_handler_asio.hpp b/include/boost/beast2/server/route_handler_asio.hpp index a6ec8a1..9dfde61 100644 --- a/include/boost/beast2/server/route_handler_asio.hpp +++ b/include/boost/beast2/server/route_handler_asio.hpp @@ -39,6 +39,20 @@ class asio_route_params } private: +public: + // VFALCO This needs to be private + void do_finish() + { + if(finish_) + { + auto f = std::move(finish_); + finish_ = {}; + f(); + } + } + +private: + #ifdef BOOST_CAPY_HAS_CORO auto spawn(