Skip to content

Commit 6716623

Browse files
Support for v3 protocol (#89)
* implementation for simultaneous tcp connections
1 parent 3ab22c7 commit 6716623

File tree

11 files changed

+961
-299
lines changed

11 files changed

+961
-299
lines changed

V3WebSocketProtocolGuide.md

Lines changed: 327 additions & 0 deletions
Large diffs are not rendered by default.

resources/Message.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ message Message {
1414
bytes payload = 4;
1515
string serviceId = 5;
1616
repeated string availableServiceIds = 6;
17+
uint32 connectionId = 7;
1718

1819
enum Type {
1920
UNKNOWN = 0;
@@ -22,5 +23,7 @@ message Message {
2223
STREAM_RESET = 3;
2324
SESSION_RESET = 4;
2425
SERVICE_IDS = 5;
26+
CONNECTION_START = 6;
27+
CONNECTION_RESET = 7;
2528
}
2629
}

src/LocalproxyConfig.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ namespace aws {
5757
/**
5858
* The web proxy endpoint port. This will be set only if a web proxy is necessary. defaults to 3128.
5959
*/
60-
std::uint16_t web_proxy_port {0 };
60+
std::uint16_t web_proxy_port { 0 };
6161
/**
6262
* The web proxy authN. This will be set only if an web proxy is necessary and it requires authN.
6363
*/
@@ -105,6 +105,10 @@ namespace aws {
105105
* If this is set to true, it means that v2 local proxy won't validate service id field.
106106
*/
107107
bool is_v1_message_format {false};
108+
/**
109+
* A flag to judge if v3 local proxy needs to fallback to communicate using v2 local proxy message format.
110+
*/
111+
bool is_v2_message_format {false};
108112
};
109113
}
110114
}

src/ProxySettings.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
2323
std::size_t const DEFAULT_MAX_DATA_FRAME_SIZE = DEFAULT_MESSAGE_MAX_SIZE + DEFAULT_DATA_LENGTH_SIZE;
2424

2525
char const * const KEY_TCP_CONNECTION_RETRY_COUNT = "tunneling.proxy.tcp.connection_retry_count";
26-
std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = 5;
26+
std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = -1;
2727

2828
char const * const KEY_TCP_CONNECTION_RETRY_DELAY_MS = "tunneling.proxy.tcp.connection_retry_delay_ms";
2929
std::uint32_t const DEFAULT_TCP_CONNECTION_RETRY_DELAY_MS = 1000;
@@ -34,6 +34,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
3434

3535
char const * const KEY_MESSAGE_MAX_SIZE = "tunneling.proxy.message.max_size";
3636
std::size_t const DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024;
37+
38+
char const * const KEY_MAX_ACTIVE_CONNECTIONS = "tunneling.proxy.tcp.max_active_connections";
39+
std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS = 128;
3740

3841
char const * const KEY_WEB_SOCKET_PING_PERIOD_MS = "tunneling.proxy.websocket.ping_period_ms";
3942
std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS = 5000;
@@ -48,7 +51,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
4851
bool const DEFAULT_WEB_SOCKET_DATA_ERROR_RETRY = true;
4952

5053
char const * const KEY_WEB_SOCKET_SUBPROTOCOL = "tunneling.proxy.websocket.subprotocol";
51-
std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-2.0";
54+
std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-3.0";
5255

5356
char const * const KEY_WEB_SOCKET_MAX_FRAME_SIZE = "tunneling.proxy.websocket.max_frame_size";
5457
std::size_t const DEFAULT_WEB_SOCKET_MAX_FRAME_SIZE = DEFAULT_MAX_DATA_FRAME_SIZE * 2;
@@ -83,6 +86,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
8386
ADD_SETTING_DEFAULT(settings, TCP_READ_BUFFER_SIZE);
8487
ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_PAYLOAD_SIZE);
8588
ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_SIZE);
89+
ADD_SETTING_DEFAULT(settings, MAX_ACTIVE_CONNECTIONS);
8690
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_PING_PERIOD_MS);
8791
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_DELAY_MS);
8892
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_COUNT);

src/ProxySettings.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
3232
extern char const * const KEY_MESSAGE_MAX_SIZE;
3333
extern std::size_t const DEFAULT_MESSAGE_MAX_SIZE;
3434

35+
extern char const * const KEY_MAX_ACTIVE_CONNECTIONS;
36+
extern std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS;
37+
3538
extern char const * const KEY_WEB_SOCKET_PING_PERIOD_MS;
3639
extern std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS;
3740

src/TcpAdapterProxy.cpp

Lines changed: 563 additions & 263 deletions
Large diffs are not rendered by default.

src/TcpAdapterProxy.h

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,14 @@ namespace aws { namespace iot { namespace securedtunneling {
8080
wss{ nullptr },
8181
wss_resolver{ io_ctx },
8282
wss_response{ },
83+
num_active_connections{ 0 },
8384
stream_id{ -1 },
8485
service_id{ "" },
8586
serviceId_to_streamId_map{},
8687
serviceId_to_tcp_server_map{},
8788
serviceId_to_tcp_client_map{},
89+
serviceId_to_control_message_handler_map{},
90+
serviceId_to_data_message_handler_map{},
8891
bind_address_actual{ },
8992
is_web_socket_reading{ false },
9093
is_service_ids_received{ false },
@@ -105,6 +108,8 @@ namespace aws { namespace iot { namespace securedtunneling {
105108
//debuggability.
106109
boost::beast::websocket::response_type wss_response;
107110

111+
std::atomic_uint16_t num_active_connections;
112+
108113
//represents the current stream ID to expect data from
109114
//care should be taken how(if) this is updated directly
110115
// To be deleted
@@ -113,6 +118,8 @@ namespace aws { namespace iot { namespace securedtunneling {
113118
std::unordered_map<std::string, std::int32_t> serviceId_to_streamId_map;
114119
std::unordered_map<std::string, tcp_server::pointer> serviceId_to_tcp_server_map;
115120
std::unordered_map<std::string, tcp_client::pointer> serviceId_to_tcp_client_map;
121+
std::unordered_map<std::string, std::function<bool(message const &)>> serviceId_to_control_message_handler_map;
122+
std::unordered_map<std::string, std::function<bool(message const &)>> serviceId_to_data_message_handler_map;
116123
std::string bind_address_actual;
117124
//flag set to true while web socket data is being drained
118125
//necessary for better TCP socket recovery rather than destroying
@@ -152,15 +159,15 @@ namespace aws { namespace iot { namespace securedtunneling {
152159

153160
int run_proxy();
154161
private:
162+
void update_message_handlers(tcp_adapter_context &tac, std::function<bool(message const &)> handler);
155163
void setup_tcp_socket(tcp_adapter_context &tac, std::string const & service_id);
156164
void setup_tcp_sockets(tcp_adapter_context &tac);
157165
//setup async io flow to connect tcp socket to the adapter config's data host/port
158-
void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id);
159-
void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id);
166+
void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection);
167+
void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection);
160168
void async_setup_source_tcp_sockets(tcp_adapter_context &tac);
161169
void async_setup_source_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string service_id);
162-
void initialize_tcp_clients(tcp_adapter_context &tac);
163-
void initialize_tcp_servers(tcp_adapter_context &tac);
170+
void do_accept_tcp_connection(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string service_id, std::uint16_t local_port, bool is_first_connection);
164171
void setup_web_socket(tcp_adapter_context &tac);
165172
//setup async web socket, and as soon as connection is up, setup async ping schedule
166173
void async_setup_web_socket(tcp_adapter_context &tac);
@@ -169,10 +176,13 @@ namespace aws { namespace iot { namespace securedtunneling {
169176
//then the reset is intentionally reset via web socket, and retries
170177
//occur definitely (regardless of retry configuration)
171178
void tcp_socket_reset_all(tcp_adapter_context &tac, std::function<void()> post_reset_operation);
172-
void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, std::function<void()> post_reset_operation);
173-
tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id);
179+
void tcp_socket_reset_init(tcp_adapter_context &tac, std::string service_id, std::function<void()> post_reset_operation);
180+
void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id, std::function<void()> post_reset_operation);
181+
void tcp_socket_close(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);
182+
tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);
174183

175-
void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id);
184+
void delete_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
185+
void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id, uint32_t const & connection_id);
176186

177187
//sets up a web socket read loop that will read, and ignore most messages until a stream start
178188
//is read and then do something with it (likely, connect to configured endpoint)
@@ -197,22 +207,21 @@ namespace aws { namespace iot { namespace securedtunneling {
197207
//invokes after_setup_web_socket_read_until_stream_start() after stream start is encountered
198208
bool async_wait_for_stream_start(tcp_adapter_context &tac, message const &message);
199209
bool async_wait_for_service_ids(tcp_adapter_context &tac);
200-
void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id);
210+
void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
201211

202212
//below loop does continuous writes to TCP socket from the TCP adapter
203213
//context's tcp_write_buffer. After consuming chunks out of the buffer
204-
//the behavior will be to check
205-
void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id);
214+
//the behavior will be to check
215+
void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);
206216

207-
void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id);
208-
void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id);
217+
void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
218+
void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
209219

210220
//returns a boolean that indicates if another web socket data read message can be put
211221
//onto the tcp write buffer. We have no way of knowing what the next message is and if
212222
//it will be too big to process, thus we don't do the read applying back pressure on
213223
//the socket. Implicitly, this means that an async_read is not happening on the web socket
214224
bool tcp_has_enough_write_buffer_space(tcp_connection::pointer connection);
215-
bool tcp_has_enough_write_buffer_space(tcp_adapter_context const &tac);
216225

217226
//returns a boolean that indicates if another tcp socket read's data can be put on the
218227
//web socket write buffer. It's a bit different from tcp write buffer space requirements
@@ -226,8 +235,11 @@ namespace aws { namespace iot { namespace securedtunneling {
226235
bool is_valid_stream_id(tcp_adapter_context const& tac, message const &message);
227236

228237
void async_send_message(tcp_adapter_context &tac, message const &message);
229-
void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id);
230-
void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id);
238+
void async_send_message(tcp_adapter_context &tac, message const &message, std::string const & service_id, uint32_t const & connection_id);
239+
void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
240+
void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
241+
void async_send_connection_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
242+
void async_send_connection_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
231243

232244
//handler for successfully sent ping will delay the next one
233245
void async_ping_handler_loop(tcp_adapter_context &tac,
@@ -239,16 +251,14 @@ namespace aws { namespace iot { namespace securedtunneling {
239251
void clear_ws_buffers(tcp_adapter_context &tac);
240252
void clear_tcp_connection_buffers(tcp_connection::pointer connection);
241253

242-
void tcp_socket_ensure_closed(boost::asio::ip::tcp::socket & tcp_socket);
243-
244254
//closes the websocket connection
245255
//1 - shutdown the receive side of TCP
246256
//2 - drain the web socket write buffer
247257
//3 - send a web socket close frame
248258
//4 - perform teardown procedure on websocket
249259
void web_socket_close_and_stop(tcp_adapter_context &tac);
250260

251-
void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, boost::system::error_code const &ec, tcp::resolver::results_type results);
261+
void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, uint32_t const & connection_id, boost::system::error_code const &ec, tcp::resolver::results_type results);
252262

253263
bool process_incoming_websocket_buffer(tcp_adapter_context &tac, boost::beast::multi_buffer &message_buffer);
254264

@@ -264,7 +274,7 @@ namespace aws { namespace iot { namespace securedtunneling {
264274

265275
bool fall_back_to_v1_message_format(std::unordered_map<std::string, std::string> const & serviceId_to_endpoint_map);
266276

267-
void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr<boost::beast::flat_buffer> const& ss, std::string const & service_id);
277+
void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr<boost::beast::flat_buffer> const& ss, std::string const & service_id, uint32_t const & connection_id);
268278

269279
void async_setup_destination_tcp_sockets(tcp_adapter_context &tac);
270280

src/TcpClient.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
1414
tcp_client(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size)
1515
: resolver_(io_context)
1616
{
17-
connection_ =
18-
tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size);
17+
1918
}
2019
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size)
2120
{
2221
return pointer(new tcp_client(io_context, write_buf_size, read_buf_size, ws_write_buf_size));
2322
}
2423

25-
tcp_connection::pointer connection_;
2624
tcp::resolver resolver_;
25+
26+
std::unordered_map<uint32_t, tcp_connection::pointer> connectionId_to_tcp_connection_map;
27+
2728
// function object defines what to do after set up a tcp socket
2829
std::function<void()> after_setup_tcp_socket = nullptr;
30+
2931
// function object defines what to do receiving control message: stream start
3032
std::function<void()> on_receive_stream_start = nullptr;
3133
};

src/TcpConnection.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,23 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
1717
public:
1818
typedef boost::shared_ptr<tcp_connection> pointer;
1919

20-
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size)
20+
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id)
2121
{
22-
return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size));
22+
return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size, connection_id));
2323
}
2424

2525
tcp::socket& socket()
2626
{
2727
return socket_;
2828
}
2929

30-
tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size)
30+
tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id)
3131
: socket_(io_context)
3232
, tcp_write_buffer_(write_buf_size)
3333
, tcp_read_buffer_(read_buf_size)
3434
, web_socket_data_write_buffer_(ws_write_buf_size)
35+
, connection_id_(connection_id)
36+
3537
{
3638
}
3739

@@ -51,6 +53,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
5153
//condense smaller TCP read chunks to bigger web socket writes. It also makes
5254
//it impossible to "inject" a non-data message in data sequence order
5355
boost::beast::multi_buffer web_socket_data_write_buffer_;
56+
57+
uint32_t connection_id_; // assigned connection_id for tcp connection
58+
5459
// Is this tcp socket currently writing
5560
bool is_tcp_socket_writing_{ false };
5661
// Is this tcp socket currently reading

src/TcpServer.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <boost/beast/core/flat_buffer.hpp>
55
#include <boost/asio.hpp>
66
#include <boost/asio/ip/tcp.hpp>
7+
#include <unordered_map>
78
#include "TcpConnection.h"
89

910
namespace aws { namespace iot { namespace securedtunneling { namespace connection {
@@ -16,8 +17,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
1617
: acceptor_(io_context)
1718
, resolver_(io_context)
1819
{
19-
connection_ =
20-
tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size);
20+
highest_connection_id = 0;
2121
}
2222

2323
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size)
@@ -32,7 +32,11 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
3232

3333
tcp::acceptor acceptor_;
3434
tcp::resolver resolver_;
35-
tcp_connection::pointer connection_;
35+
36+
std::unordered_map<uint32_t, tcp_connection::pointer> connectionId_to_tcp_connection_map;
37+
38+
std::atomic_uint32_t highest_connection_id;
39+
3640
// function object defines what to do after set up a tcp socket
3741
std::function<void()> after_setup_tcp_socket = nullptr;
3842
};

0 commit comments

Comments
 (0)