Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-800: Drain the Pipeline #450

Draft
wants to merge 36 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4cfa253
GH-800: Write the most basic TODOs for this card
utkarshg6 Jun 13, 2024
07cfef9
GH-800: Add todos for renaming purposes
utkarshg6 Jun 25, 2024
62db40f
GH-800: Add a message scheduler for purging the stream key
utkarshg6 Jun 27, 2024
e4199ae
GH-800: Improve the test handle_client_response_payload_purges_stream…
utkarshg6 Jun 28, 2024
ee8f897
GH-800: Make delay configurable via constant
utkarshg6 Jun 28, 2024
91323d4
GH-800: introduce StreamSenders
utkarshg6 Jul 2, 2024
7cc2fbb
GH-800: introduce channel for shutdown signal
utkarshg6 Jul 4, 2024
d784157
GH-800: add test connection_shutdown_test.rs
utkarshg6 Jul 6, 2024
14a5de3
GH-800: test drive the shutdown signal in StreamReader
utkarshg6 Jul 10, 2024
d8d1f10
GH-800: refactor test stream_reader_shuts_down_when_it_receives_the_s…
utkarshg6 Jul 10, 2024
ddf7d9c
GH-800: some refactor inside the file stream_reader
utkarshg6 Jul 10, 2024
a6f2538
GH-800: some more refactoring
utkarshg6 Jul 10, 2024
130a37c
GH-800: trying_to_write_a_test_for_stream_senders was a success
utkarshg6 Jul 11, 2024
6db4c0c
GH-800: did some refactoring for trying_to_write_a_test_for_stream_se…
utkarshg6 Jul 12, 2024
c3cc6a8
GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_t…
utkarshg6 Jul 18, 2024
75edd8f
GH-800: Minor refactor for the test stream_handler_pool_sends_shutdow…
utkarshg6 Jul 18, 2024
296f449
GH-800: more refactoring of the test stream_handler_pool_sends_shutdo…
utkarshg6 Jul 18, 2024
a2c763d
GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_t…
utkarshg6 Jul 18, 2024
3c223ea
GH-800: some more todos removed
utkarshg6 Jul 18, 2024
4605d83
GH-800: clean_up_dead_streams_logs_when_the_shutdown_channel_is_down …
utkarshg6 Jul 19, 2024
e1417c8
GH-800: tests are passing in StreamHandlerPool
utkarshg6 Jul 19, 2024
6e28ff1
GH-800: Wrote a test for the case when the shutdown signal channel is…
utkarshg6 Jul 19, 2024
6849db6
GH-800: make it easier to understand who experiences the write error
utkarshg6 Jul 22, 2024
af09d25
GH-800: trying to write test while_housekeeping_the_stream_senders_ar…
utkarshg6 Jul 22, 2024
b896db6
GH-800: add test add_new_streams_works
utkarshg6 Jul 26, 2024
acf2c89
GH-800: wip: add the log
utkarshg6 Jul 26, 2024
deb1535
GH-800: test proxy_client_stream_reader_dies_when_client_stream_is_ki…
utkarshg6 Jul 31, 2024
2ecfebc
GH-800: change the channel from crossbeam to tokio
utkarshg6 Jul 31, 2024
3b387a7
GH-800: Add fn send_shutdown_signal_to_stream_reader
utkarshg6 Aug 6, 2024
f94c8d3
GH-800: wip: fixing 2 tests
utkarshg6 Aug 7, 2024
54040ae
GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_t…
utkarshg6 Aug 7, 2024
31db7c4
GH-800: all tests in stream_handler_pool.rs are passing
utkarshg6 Aug 16, 2024
4f8c141
GH-800: add test for the logs; all tests passing
utkarshg6 Aug 27, 2024
1880006
GH-800: remove lookup_ip() mock fns from a test
utkarshg6 Aug 27, 2024
d81c2f0
GH-800: remove warnings
utkarshg6 Aug 27, 2024
ae08930
GH-800: use fn send_shutdown_signal_to_stream_reader in clean_up_dead…
utkarshg6 Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions node/src/proxy_client/stream_establisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

use crate::proxy_client::stream_handler_pool::StreamSenders;
use crate::proxy_client::stream_reader::StreamReader;
use crate::proxy_client::stream_writer::StreamWriter;
use crate::sub_lib::channel_wrappers::FuturesChannelFactory;
Expand All @@ -14,16 +15,18 @@ use crate::sub_lib::stream_connector::StreamConnectorReal;
use crate::sub_lib::stream_key::StreamKey;
use crate::sub_lib::tokio_wrappers::ReadHalfWrapper;
use actix::Recipient;
use crossbeam_channel::Sender;
use crossbeam_channel::{unbounded, Receiver, Sender};
use masq_lib::logger::Logger;
use std::io;
use std::net::IpAddr;
use std::net::SocketAddr;
use tokio::sync::mpsc::UnboundedReceiver;

pub struct StreamEstablisher {
pub cryptde: &'static dyn CryptDE,
pub stream_adder_tx: Sender<(StreamKey, Box<dyn SenderWrapper<SequencedPacket>>)>,
pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>,
pub stream_killer_tx: Sender<(StreamKey, u64)>,
pub shutdown_signal_rx: Receiver<()>,
pub stream_connector: Box<dyn StreamConnector>,
pub proxy_client_sub: Recipient<InboundServerData>,
pub logger: Logger,
Expand All @@ -36,6 +39,7 @@ impl Clone for StreamEstablisher {
cryptde: self.cryptde,
stream_adder_tx: self.stream_adder_tx.clone(),
stream_killer_tx: self.stream_killer_tx.clone(),
shutdown_signal_rx: unbounded().1,
stream_connector: Box::new(StreamConnectorReal {}),
proxy_client_sub: self.proxy_client_sub.clone(),
logger: self.logger.clone(),
Expand All @@ -57,11 +61,13 @@ impl StreamEstablisher {
payload.target_port,
&self.logger,
)?;
let (shutdown_signal_tx, shutdown_signal_rx) = tokio::sync::mpsc::unbounded_channel();

self.spawn_stream_reader(
&payload.clone(),
connection_info.reader,
connection_info.peer_addr,
shutdown_signal_rx,
);

let (tx_to_write, rx_to_write) = self.channel_factory.make(connection_info.peer_addr);
Expand All @@ -73,8 +79,13 @@ impl StreamEstablisher {
);
tokio::spawn(stream_writer);

let stream_senders = StreamSenders {
writer_data: tx_to_write.clone(),
reader_shutdown_tx: shutdown_signal_tx,
};

self.stream_adder_tx
.send((payload.stream_key, tx_to_write.clone()))
.send((payload.stream_key, stream_senders))
.expect("StreamHandlerPool died");
Ok(tx_to_write)
}
Expand All @@ -84,12 +95,14 @@ impl StreamEstablisher {
payload: &ClientRequestPayload_0v1,
read_stream: Box<dyn ReadHalfWrapper>,
peer_addr: SocketAddr,
shutdown_signal: UnboundedReceiver<()>,
) {
let stream_reader = StreamReader::new(
payload.stream_key,
self.proxy_client_sub.clone(),
read_stream,
self.stream_killer_tx.clone(),
shutdown_signal,
peer_addr,
);
debug!(self.logger, "Spawning StreamReader for {}", peer_addr);
Expand All @@ -103,7 +116,7 @@ pub trait StreamEstablisherFactory: Send {

pub struct StreamEstablisherFactoryReal {
pub cryptde: &'static dyn CryptDE,
pub stream_adder_tx: Sender<(StreamKey, Box<dyn SenderWrapper<SequencedPacket>>)>,
pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>,
pub stream_killer_tx: Sender<(StreamKey, u64)>,
pub proxy_client_subs: ProxyClientSubs,
pub logger: Logger,
Expand All @@ -115,6 +128,7 @@ impl StreamEstablisherFactory for StreamEstablisherFactoryReal {
cryptde: self.cryptde,
stream_adder_tx: self.stream_adder_tx.clone(),
stream_killer_tx: self.stream_killer_tx.clone(),
shutdown_signal_rx: unbounded().1,
stream_connector: Box::new(StreamConnectorReal {}),
proxy_client_sub: self.proxy_client_subs.inbound_server_data.clone(),
logger: self.logger.clone(),
Expand All @@ -140,6 +154,7 @@ mod tests {
use std::str::FromStr;
use std::thread;
use tokio::prelude::Async;
use tokio::sync::mpsc::unbounded_channel;

#[test]
fn spawn_stream_reader_handles_data() {
Expand Down Expand Up @@ -171,6 +186,7 @@ mod tests {
cryptde: main_cryptde(),
stream_adder_tx,
stream_killer_tx,
shutdown_signal_rx: unbounded().1,
stream_connector: Box::new(StreamConnectorMock::new()), // only used in "establish_stream"
proxy_client_sub,
logger: Logger::new("ProxyClient"),
Expand All @@ -191,6 +207,7 @@ mod tests {
},
read_stream,
SocketAddr::from_str("1.2.3.4:5678").unwrap(),
unbounded_channel().1,
);

proxy_client_awaiter.await_message_count(1);
Expand Down
Loading