From ba621ffb9cb63a01053854bb270786c470c90392 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 17 May 2023 16:53:02 +0200 Subject: [PATCH 1/4] test: improve debug log message from P2PConnection::connection_made() This is used in both cases - TCP server (accept) and TCP client (connect). The message "Connected & Listening address:port" is confusing. Print both ends of the TCP connection. --- test/functional/test_framework/p2p.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py index 4f1265eb54889..adabbcf22f4ac 100755 --- a/test/functional/test_framework/p2p.py +++ b/test/functional/test_framework/p2p.py @@ -217,7 +217,12 @@ def peer_disconnect(self): def connection_made(self, transport): """asyncio callback when a connection is opened.""" assert not self._transport - logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) + info = transport.get_extra_info("socket") + us = info.getsockname() + them = info.getpeername() + logger.debug(f"Connected: us={us[0]}:{us[1]}, them={them[0]}:{them[1]}") + self.dstaddr = them[0] + self.dstport = them[1] self._transport = transport # in an inbound connection to the TestNode with P2PConnection as the initiator, [TestNode <---- P2PConnection] # send the initial handshake immediately From ebe42c00aa4a7a16900eff3aec45604c86b2dbf5 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Fri, 19 May 2023 12:31:22 +0200 Subject: [PATCH 2/4] test: extend the SOCKS5 Python proxy to actually connect to a destination If requested, make the SOCKS5 Python proxy redirect each connection to a given destination. Actually act as a real proxy, connecting the client to a destination, except that the destination is not what the client asked for. This would enable us to "connect" to Tor addresses from the functional tests. --- test/functional/test_framework/netutil.py | 7 +++ test/functional/test_framework/socks5.py | 70 +++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/test/functional/test_framework/netutil.py b/test/functional/test_framework/netutil.py index 08d41fe97fdf5..f6acf926fa8c1 100644 --- a/test/functional/test_framework/netutil.py +++ b/test/functional/test_framework/netutil.py @@ -167,3 +167,10 @@ def test_unix_socket(): return False else: return True + +def format_addr_port(addr, port): + '''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.''' + if ":" in addr: + return f"[{addr}]:{port}" + else: + return f"{addr}:{port}" diff --git a/test/functional/test_framework/socks5.py b/test/functional/test_framework/socks5.py index 0ca06a7396165..3387c8a1fef11 100644 --- a/test/functional/test_framework/socks5.py +++ b/test/functional/test_framework/socks5.py @@ -4,11 +4,16 @@ # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Dummy Socks5 server for testing.""" +import select import socket import threading import queue import logging +from .netutil import ( + format_addr_port +) + logger = logging.getLogger("TestFramework.socks5") # Protocol constants @@ -32,6 +37,42 @@ def recvall(s, n): n -= len(d) return rv +def sendall(s, data): + """Send all data to a socket, or fail.""" + sent = 0 + while sent < len(data): + _, wlist, _ = select.select([], [s], []) + if len(wlist) > 0: + n = s.send(data[sent:]) + if n == 0: + raise IOError('send() on socket returned 0') + sent += n + +def forward_sockets(a, b): + """Forward data received on socket a to socket b and vice versa, until EOF is received on one of the sockets.""" + # Mark as non-blocking so that we do not end up in a deadlock-like situation + # where we block and wait on data from `a` while there is data ready to be + # received on `b` and forwarded to `a`. And at the same time the application + # at `a` is not sending anything because it waits for the data from `b` to + # respond. + a.setblocking(False) + b.setblocking(False) + sockets = [a, b] + done = False + while not done: + rlist, _, xlist = select.select(sockets, [], sockets) + if len(xlist) > 0: + raise IOError('Exceptional condition on socket') + for s in rlist: + data = s.recv(4096) + if data is None or len(data) == 0: + done = True + break + if s == a: + sendall(b, data) + else: + sendall(a, data) + # Implementation classes class Socks5Configuration(): """Proxy configuration.""" @@ -41,6 +82,19 @@ def __init__(self): self.unauth = False # Support unauthenticated self.auth = False # Support authentication self.keep_alive = False # Do not automatically close connections + # This function is called whenever a new connection arrives to the proxy + # and it decides where the connection is redirected to. It is passed: + # - the address the client requested to connect to + # - the port the client requested to connect to + # It is supposed to return an object like: + # { + # "actual_to_addr": "127.0.0.1" + # "actual_to_port": 28276 + # } + # or None. + # If it returns an object then the connection is redirected to actual_to_addr:actual_to_port. + # If it returns None, or destinations_factory itself is None then the connection is closed. + self.destinations_factory = None class Socks5Command(): """Information about an incoming socks5 command.""" @@ -117,6 +171,22 @@ def handle(self): cmdin = Socks5Command(cmd, atyp, addr, port, username, password) self.serv.queue.put(cmdin) logger.debug('Proxy: %s', cmdin) + + requested_to_addr = addr.decode("utf-8") + requested_to = format_addr_port(requested_to_addr, port) + + if self.serv.conf.destinations_factory is not None: + dest = self.serv.conf.destinations_factory(requested_to_addr, port) + if dest is not None: + logger.debug(f"Serving connection to {requested_to}, will redirect it to " + f"{dest['actual_to_addr']}:{dest['actual_to_port']} instead") + with socket.create_connection((dest["actual_to_addr"], dest["actual_to_port"])) as conn_to: + forward_sockets(self.conn, conn_to) + else: + logger.debug(f"Closing connection to {requested_to}: the destinations factory returned None") + else: + logger.debug(f"Closing connection to {requested_to}: no destinations factory") + # Fall through to disconnect except Exception as e: logger.exception("socks5 request handling failed.") From 22cd0e888c71b0f56171a524251c1557bcb6237b Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 24 May 2023 08:34:57 +0200 Subject: [PATCH 3/4] test: support WTX INVs from P2PDataStore and fix a comment --- test/functional/test_framework/p2p.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py index adabbcf22f4ac..9b791180c38a2 100755 --- a/test/functional/test_framework/p2p.py +++ b/test/functional/test_framework/p2p.py @@ -808,12 +808,13 @@ def __init__(self): self.getdata_requests = [] def on_getdata(self, message): - """Check for the tx/block in our stores and if found, reply with an inv message.""" + """Check for the tx/block in our stores and if found, reply with MSG_TX or MSG_BLOCK.""" for inv in message.inv: self.getdata_requests.append(inv.hash) - if (inv.type & MSG_TYPE_MASK) == MSG_TX and inv.hash in self.tx_store.keys(): + invtype = inv.type & MSG_TYPE_MASK + if (invtype == MSG_TX or invtype == MSG_WTX) and inv.hash in self.tx_store.keys(): self.send_message(msg_tx(self.tx_store[inv.hash])) - elif (inv.type & MSG_TYPE_MASK) == MSG_BLOCK and inv.hash in self.block_store.keys(): + elif invtype == MSG_BLOCK and inv.hash in self.block_store.keys(): self.send_message(msg_block(self.block_store[inv.hash])) else: logger.debug('getdata message type {} received.'.format(hex(inv.type))) From 57529ac4dbb2721c1ad0a3566f0299dbdb5ca5c0 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Mon, 5 Feb 2024 15:17:52 +0100 Subject: [PATCH 4/4] test: set P2PConnection.p2p_connected_to_node in peer_connect_helper() Set `P2PConnection.p2p_connected_to_node` in `P2PConnection.peer_connect_helper()` instead of `TestNode.add_p2p_connection()` and `TestNode.add_outbound_p2p_connection()`. This way tests can create an instance of `P2PConnection` and use `P2PConnection.peer_connect_helper()` directly. --- test/functional/test_framework/p2p.py | 1 + test/functional/test_framework/test_node.py | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py index 9b791180c38a2..4a03fdd16d8fe 100755 --- a/test/functional/test_framework/p2p.py +++ b/test/functional/test_framework/p2p.py @@ -188,6 +188,7 @@ def peer_connect_helper(self, dstaddr, dstport, net, timeout_factor): self.on_connection_send_msg = None self.recvbuf = b"" self.magic_bytes = MAGIC_BYTES[net] + self.p2p_connected_to_node = dstport != 0 def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, supports_v2_p2p): self.peer_connect_helper(dstaddr, dstport, net, timeout_factor) diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index 60ca9269a5b9f..f12c2d550d175 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -714,7 +714,6 @@ def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=Tru if supports_v2_p2p is None: supports_v2_p2p = self.use_v2transport - p2p_conn.p2p_connected_to_node = True if self.use_v2transport: kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 supports_v2_p2p = self.use_v2transport and supports_v2_p2p @@ -781,7 +780,6 @@ def addconnection_callback(address, port): self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type)) self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p) - p2p_conn.p2p_connected_to_node = False if supports_v2_p2p is None: supports_v2_p2p = self.use_v2transport if advertise_v2_p2p is None: