Skip to content

Commit

Permalink
feat: basic OpenAPI spec generation
Browse files Browse the repository at this point in the history
  • Loading branch information
ABeltramo committed Sep 10, 2024
1 parent 2470d48 commit 858a9a7
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 163 deletions.
301 changes: 140 additions & 161 deletions src/moonlight-server/api/api.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <api/api.hpp>
#include <boost/asio.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <events/events.hpp>
#include <events/reflectors.hpp>
#include <helpers/tsqueue.hpp>
#include <memory>
Expand All @@ -13,192 +12,172 @@ namespace wolf::api {

using namespace wolf::core;

struct UnixSocket {
boost::asio::local::stream_protocol::socket socket;
bool is_alive = true;
};

struct HTTPRequest {
std::string method{};
std::string path{};
std::string query_string{};
std::string http_version{};
SimpleWeb::CaseInsensitiveMultimap headers{};
std::string body{};
};

std::string to_str(boost::asio::streambuf &streambuf) {
return {buffers_begin(streambuf.data()), buffers_end(streambuf.data())};
}

class UnixSocketServer {
public:
UnixSocketServer(boost::asio::io_context &io_context,
const std::string &socket_path,
immer::box<state::AppState> app_state)
: io_context_(io_context), app_state(app_state),
acceptor_(io_context, boost::asio::local::stream_protocol::endpoint(socket_path)) {
start_accept();
}

void broadcast_event(const std::string &event_json) {
logs::log(logs::trace, "[API] Sending event: {}", event_json);
for (auto &socket : sockets_) {
boost::asio::async_write(socket->socket,
boost::asio::buffer(event_json + "\n"),
[this, socket](const boost::system::error_code &ec, std::size_t /*length*/) {
if (ec) {
logs::log(logs::error, "[API] Error sending event: {}", ec.message());
close(*socket);
}
});
}
}

void cleanup_sockets() {
sockets_.erase(
std::remove_if(sockets_.begin(), sockets_.end(), [](const auto &socket) { return !socket->is_alive; }),
sockets_.end());
}
UnixSocketServer::UnixSocketServer(boost::asio::io_context &io_context,
const std::string &socket_path,
immer::box<state::AppState> app_state)
: io_context_(io_context), app_state(app_state),
acceptor_(io_context, boost::asio::local::stream_protocol::endpoint(socket_path)) {

http.add(HTTPMethod::GET,
"/api/v1/events",
{.summary = "Subscribe to events",
.description = "This endpoint allows clients to subscribe to events using HTTP 0.9. \n"
"Events are sent as JSON objects, one per line. Clients should expect to receive a newline "
"character after each event. The connection will be kept open until the client closes it.",
.handler = [this](auto req, auto socket) { endpoint_Events(req, socket); }});

http.add(
HTTPMethod::GET,
"/api/v1/pending-pair-requests",
{
.summary = "Get pending pair requests",
.description = "This endpoint returns a list of Moonlight clients that are currently waiting to be paired.",
.response_description = {{200, {.json_schema = rfl::json::to_schema<PendingPairRequestsResponse>()}}},
.handler = [this](auto req, auto socket) { endpoint_PendingPairRequest(req, socket); },
});

http.add(HTTPMethod::POST,
"/api/v1/pair-client",
{
.summary = "Pair a client",
.request_description = APIDescription{.json_schema = rfl::json::to_schema<PairRequest>()},
.response_description = {{200, {.json_schema = rfl::json::to_schema<PairResponse>()}},
{500, {.json_schema = rfl::json::to_schema<PairResponseError>()}}},
.handler = [this](auto req, auto socket) { endpoint_Pair(req, socket); },
});

auto openapi_schema = http.openapi_schema();
http.add(HTTPMethod::GET, "/api/v1/openapi-schema", {.handler = [this, openapi_schema](auto req, auto socket) {
send_http(socket, 200, openapi_schema);
}});

start_accept();
}

private:
void send_http(std::shared_ptr<UnixSocket> socket, int status_code, std::string_view body) {
auto http_reply = fmt::format("HTTP/1.0 {} OK\r\nContent-Length: {}\r\n\r\n{}", status_code, body.size(), body);
void UnixSocketServer::broadcast_event(const std::string &event_json) {
logs::log(logs::trace, "[API] Sending event: {}", event_json);
for (auto &socket : sockets_) {
boost::asio::async_write(socket->socket,
boost::asio::buffer(http_reply),
boost::asio::buffer(event_json + "\n"),
[this, socket](const boost::system::error_code &ec, std::size_t /*length*/) {
if (ec) {
logs::log(logs::error, "[API] Error sending HTTP: {}", ec.message());
logs::log(logs::error, "[API] Error sending event: {}", ec.message());
close(*socket);
}
});
}
}

void handle_request(HTTPRequest &req, std::shared_ptr<UnixSocket> socket) {
logs::log(logs::debug, "[API] Received request: {} {} - {}", req.method, req.path, req.body);
if (req.method == "GET" && req.path == "/api/v1/events") {
// curl -v --http0.9 --unix-socket /tmp/wolf.sock http://localhost/api/v1/events
sockets_.push_back(socket);
return; // We don't send a reply, and we keep the connection open
}

if (req.method == "GET" && req.path == "/api/v1/pending-pair-requests") {
// curl -v --http1.0 --unix-socket /tmp/wolf.sock http://localhost/api/v1/pending-pair-requests
auto res = rfl::Generic::Object();
res["success"] = true;
auto requests = std::vector<rfl::Generic>();
for (auto [secret, pair_request] : *app_state->pairing_atom->load()) {
auto pair_request_obj = rfl::Generic::Object();
pair_request_obj["pair_secret"] = secret;
pair_request_obj["client_ip"] = pair_request->client_ip;
requests.push_back(pair_request_obj);
}
res["requests"] = requests;
send_http(socket, 200, rfl::json::write(res));
return;
}
void UnixSocketServer::cleanup_sockets() {
sockets_.erase(std::remove_if(sockets_.begin(), sockets_.end(), [](const auto &socket) { return !socket->is_alive; }),
sockets_.end());
}

if (req.method == "POST" && req.path == "/api/v1/pair-client") {
// curl -v --http1.0 --unix-socket /tmp/wolf.sock -d '{"pair_secret": "xxxx", "pin": "1234"}'
// http://localhost/api/v1/pair-client
if (auto event = rfl::json::read<PairClient>(req.body)) {
if (auto pair_request = app_state->pairing_atom->load()->find(event.value().pair_secret)) {
pair_request->get().user_pin->set_value(event.value().pin); // Resolve the promise
auto res = rfl::Generic::Object();
res["success"] = true;
send_http(socket, 200, rfl::json::write(res));
} else {
logs::log(logs::warning, "[API] Invalid pair secret: {}", event.value().pair_secret);
send_http(socket, 404, "");
}
} else {
logs::log(logs::warning, "[API] Invalid event: {}", req.body);
send_http(socket, 500, "");
}
void UnixSocketServer::send_http(std::shared_ptr<UnixSocket> socket, int status_code, std::string_view body) {
auto http_reply = fmt::format("HTTP/1.0 {} OK\r\nContent-Length: {}\r\n\r\n{}", status_code, body.size(), body);
boost::asio::async_write(socket->socket,
boost::asio::buffer(http_reply),
[this, socket](const boost::system::error_code &ec, std::size_t /*length*/) {
if (ec) {
logs::log(logs::error, "[API] Error sending HTTP: {}", ec.message());
close(*socket);
}
});
}

return;
}
void UnixSocketServer::handle_request(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket) {
logs::log(logs::debug, "[API] Received request: {} {} - {}", rfl::enum_to_string(req.method), req.path, req.body);

logs::log(logs::warning, "[API] Invalid request: {} {}", req.method, req.path);
if (!http.handle_request(req, socket)) {
send_http(socket, 404, "");
close(*socket);
}
}

void start_connection(std::shared_ptr<UnixSocket> socket) {
auto request_buf = std::make_shared<boost::asio::streambuf>(std::numeric_limits<std::size_t>::max());
boost::asio::async_read_until(
socket->socket,
*request_buf,
"\r\n\r\n",
[this, socket, request_buf](const boost::system::error_code &ec, std::size_t bytes_transferred) {
if (ec) {
logs::log(logs::error, "[API] Error reading request: {}", ec.message());
close(*socket);
return;
}
HTTPRequest req = {};
std::istream is(request_buf.get());
SimpleWeb::RequestMessage::parse(is, req.method, req.path, req.query_string, req.http_version, req.headers);

if (req.headers.contains("Transfer-Encoding") && req.headers.find("Transfer-Encoding")->second == "chunked") {
logs::log(logs::error, "[API] Chunked encoding not supported, use HTTP/1.0 instead");
close(*socket);
return;
}
void UnixSocketServer::start_connection(std::shared_ptr<UnixSocket> socket) {
auto request_buf = std::make_shared<boost::asio::streambuf>(std::numeric_limits<std::size_t>::max());
boost::asio::async_read_until(
socket->socket,
*request_buf,
"\r\n\r\n",
[this, socket, request_buf](const boost::system::error_code &ec, std::size_t bytes_transferred) {
if (ec) {
logs::log(logs::error, "[API] Error reading request: {}", ec.message());
close(*socket);
return;
}
HTTPRequest req = {};
std::string method;
std::istream is(request_buf.get());
SimpleWeb::RequestMessage::parse(is, method, req.path, req.query_string, req.http_version, req.headers);
if (method == "GET")
req.method = HTTPMethod::GET;
else if (method == "POST")
req.method = HTTPMethod::POST;
else if (method == "PUT")
req.method = HTTPMethod::PUT;
else if (method == "DELETE")
req.method = HTTPMethod::DELETE;
else
req.method = HTTPMethod::GET;

if (req.headers.contains("Transfer-Encoding") && req.headers.find("Transfer-Encoding")->second == "chunked") {
logs::log(logs::error, "[API] Chunked encoding not supported, use HTTP/1.0 instead");
close(*socket);
return;
}

// Get the body payload
if (req.headers.contains("Content-Length")) {
auto content_length = std::stoul(req.headers.find("Content-Length")->second);
std::size_t num_additional_bytes = request_buf->size() - bytes_transferred;
if (content_length > num_additional_bytes) {
boost::asio::async_read(socket->socket,
*request_buf,
boost::asio::transfer_exactly(content_length - bytes_transferred),
[this, socket, request_buf, req = std::make_unique<HTTPRequest>(req)](
const boost::system::error_code &ec,
std::size_t /*bytes_transferred*/) {
if (ec) {
logs::log(logs::error, "[API] Error reading request body: {}", ec.message());
close(*socket);
return;
}
req->body = to_str(*request_buf);
handle_request(*req, socket);
});
} else {
req.body = to_str(*request_buf);
handle_request(req, socket);
}
// Get the body payload
if (req.headers.contains("Content-Length")) {
auto content_length = std::stoul(req.headers.find("Content-Length")->second);
std::size_t num_additional_bytes = request_buf->size() - bytes_transferred;
if (content_length > num_additional_bytes) {
boost::asio::async_read(socket->socket,
*request_buf,
boost::asio::transfer_exactly(content_length - bytes_transferred),
[this, socket, request_buf, req = std::make_unique<HTTPRequest>(req)](
const boost::system::error_code &ec,
std::size_t /*bytes_transferred*/) {
if (ec) {
logs::log(logs::error, "[API] Error reading request body: {}", ec.message());
close(*socket);
return;
}
req->body = to_str(*request_buf);
handle_request(*req, socket);
});
} else {
req.body = to_str(*request_buf);
handle_request(req, socket);
}
});
}

void start_accept() {
auto socket =
std::make_shared<UnixSocket>(UnixSocket{.socket = boost::asio::local::stream_protocol::socket(io_context_)});
acceptor_.async_accept(socket->socket, [this, socket](const boost::system::error_code &ec) {
if (!ec) {
start_accept(); // Immediately start accepting a new connection
start_connection(socket); // Start reading the request from the new connection
} else {
logs::log(logs::error, "[API] Error accepting connection: {}", ec.message());
close(*socket);
}
});
}
} else {
handle_request(req, socket);
}
});
}

void close(UnixSocket &socket) {
socket.socket.close();
socket.is_alive = false;
}
void UnixSocketServer::start_accept() {
auto socket =
std::make_shared<UnixSocket>(UnixSocket{.socket = boost::asio::local::stream_protocol::socket(io_context_)});
acceptor_.async_accept(socket->socket, [this, socket](const boost::system::error_code &ec) {
if (!ec) {
start_accept(); // Immediately start accepting a new connection
start_connection(socket); // Start reading the request from the new connection
} else {
logs::log(logs::error, "[API] Error accepting connection: {}", ec.message());
close(*socket);
}
});
}

boost::asio::io_context &io_context_;
boost::asio::local::stream_protocol::acceptor acceptor_;
std::vector<std::shared_ptr<UnixSocket>> sockets_;
immer::box<state::AppState> app_state;
};
void UnixSocketServer::close(UnixSocket &socket) {
socket.socket.close();
socket.is_alive = false;
}

void start_server(immer::box<state::AppState> app_state) {
auto socket_path = "/tmp/wolf.sock";
Expand Down
Loading

0 comments on commit 858a9a7

Please sign in to comment.