Skip to content

Commit

Permalink
keep alive layerand shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolayBlagoev committed Apr 12, 2024
1 parent 10d1915 commit 7c2e5d8
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 21 deletions.
12 changes: 11 additions & 1 deletion deccom/protocols/abstractprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def check_if_have(submodule, attr, name = None)->bool:


return True
@bindto("stop")
async def _lower_stop(self):
return
async def stop(self):
self.started = False

return await self._lower_stop()
def get_if_have(submodule: Any, attr, name = None)->bool:
if name != None and submodule.__class__.__name__ != name:
return AbstractProtocol.get_if_have(submodule.submodule,attr,name)
Expand All @@ -62,7 +69,8 @@ def process_datagram(self, addr, data):
return self.callback(addr,data)

async def send_datagram(self, msg: bytes, addr: tuple[str,int]):

if not self.started:
return
await self._lower_sendto(self.uniqueid + msg, addr)


Expand Down Expand Up @@ -103,6 +111,8 @@ def __init__(self, submodule = None, callback: Callable[[tuple[str, int], bytes]

@bindfrom("callback")
def datagram_received(self, addr:tuple[str,int],data:bytes):
if not self.started:
return
if data[:8] == self.uniqueid:
return self.process_datagram(addr, data[8:])
else:
Expand Down
8 changes: 7 additions & 1 deletion deccom/protocols/defaultprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class DefaultProtocol(asyncio.DatagramProtocol):
PONG = int.from_bytes(PONG_b, byteorder="big")
def __init__(self, callback: Callable[[tuple[str,int], bytes], None] = lambda addr, data: ...):
self.transport = None
self.started = False
self.callback = callback
self.pings = dict()
self._taken = dict()
Expand Down Expand Up @@ -46,7 +47,8 @@ def process_datagram(self, data, addr):
elif data[0] == DefaultProtocol.PONG:
loop.create_task(self.handle_pong(addr,data[1:]))
def datagram_received(self, data, addr):

if not self.started:
return
# print("from:", addr, "data", data)

if data[:8] == self.uniqueid:
Expand All @@ -64,9 +66,13 @@ async def call_callback(self, addr,data):
traceback.print_exc(file=log)


async def stop(self):
self.started = False

return
async def start(self, p: Peer, *args):
self.p = p
self.started = True
return
def timeout(self, addr, error, msg_id):
if self.pings.get(msg_id) is None:
Expand Down
7 changes: 6 additions & 1 deletion deccom/protocols/faultytransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def process_datagram(self, data, addr):
elif data[0] == FaultyProtocol.PONG:
loop.create_task(self.handle_pong(addr,data[1:]))
def datagram_received(self, data, addr):

if not self.started:
return
# print("from:", addr, "data", data)

if data[:8] == self.uniqueid:
Expand All @@ -65,9 +66,13 @@ async def call_callback(self, addr,data):
traceback.print_exc(file=log)


async def stop(self):
self.started = False

return
async def start(self, p: Peer, *args):
self.p = p
self.started = True
return
def timeout(self, addr, error, msg_id):
if self.pings.get(msg_id) is None:
Expand Down
10 changes: 8 additions & 2 deletions deccom/protocols/holepuncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ def __init__(self, submodule=None, callback: Callable[[tuple[str, int], bytes],
self.successful = set()
self.outstanding: dict[tuple[str,int], tuple[int, list[bytes]]] = dict()


async def stop(self):
self.outstanding.clear()
self.heard_data.clear()
for _,v in self.outstanding.items():
v[1].clear()
self.outstanding.clear()
return await super().stop()
def heard_from(self,add1, add2):
print("adding", add2,"from",add1)
if self.heard_data.get(add2) == None:
self.heard_data[add2] = []
self.heard_data[add2].append(add1)
self.heard_data[add2] = self.heard_data[add2][-5:]

def datagram_received(self, addr:tuple[str,int],data:bytes):
self.successful.add(addr)
if self.outstanding.get(addr) != None:
Expand Down
49 changes: 49 additions & 0 deletions deccom/protocols/keepalive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import asyncio
from typing import Callable
from deccom.protocols.abstractprotocol import AbstractProtocol
from deccom.protocols.wrappers import *


class KeepAlive(AbstractProtocol):
def __init__(self, interval = 20, timeout = 10, submodule=None, callback: Callable[[tuple[str, int], bytes], None] = lambda addr,msg: ...):
assert timeout < interval
self.keep_alives: dict[bytes,tuple[str,int]] = dict()
self.disconnected_callback = lambda *args: ...
self.interval = interval
self.timeout = timeout
self.refresh_loop = None
super().__init__(submodule, callback)

async def start(self, p):
await super().start(p)
loop = asyncio.get_event_loop()
self.refresh_loop = loop.call_later(self.interval, self.check_each)

async def stop(self):
self.keep_alives.clear()
if self.refresh_loop != None:
self.refresh_loop.cancel()
return await super().stop()
def register_keep_alive(self, addr,node_id):
self.keep_alives[node_id] = addr

def register_keep_alive(self, node_id):
if self.keep_alives.get(node_id) != None:
del self.keep_alives[node_id]

def remove_peer(self, addr: tuple[str, int], node_id: bytes):
if not self.started:
return
self.disconnected_callback(addr,node_id)
@bindto("send_ping")
async def _lower_ping(self, addr, success, failure, timeout):
return
async def send_ping(self, addr, success, fail, timeout):
await self._lower_ping(addr, success, fail, timeout)
def check_each(self):
loop = asyncio.get_event_loop()
for k,v in self.keep_alives.items():
loop.create_task(self.send_ping(v,lambda *args: ..., lambda addr, id_node=k, self=self: self.remove_peer(addr, id_node),self.timeout))

self.refresh_loop = loop.call_later(self.interval, self.check_each)

5 changes: 4 additions & 1 deletion deccom/protocols/peerdiscovery/_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ def add_peer(self, peers: list[Peer]):
pi = NodeaAbstraction(p.id_node,p,int.from_bytes(p.id_node, byteorder="big"))
heapq.heappush(self.peers, (pi.idint ^ self.look_for_int, pi))


def stop(self):
self.contacted.clear()
self.peers.clear()

9 changes: 8 additions & 1 deletion deccom/protocols/peerdiscovery/_kademlia_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def remove_peer(self, dist):
dist,peer = self.toadd.pop()
self.success_call(dist, peer)
self.peers[dist] = peer
def clear(self):
self.peers.clear()
self.toadd.clear()

def get_peer(self, dist):

Expand Down Expand Up @@ -149,7 +152,11 @@ def update_peer(self, id, node) -> Peer:
indx = self._get_index(dist)
return self.buckets[indx].update_peer(dist, node)


def clear(self):
for b in self.buckets:
b.clear()
self.buckets.clear()

def add_peer(self,id,node, lv=0):
if self.get_peer(id) != None:
return None
Expand Down
11 changes: 5 additions & 6 deletions deccom/protocols/peerdiscovery/abstractpeerdiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ def __init__(self, bootstrap_peers: list[Peer] = [], interval: int = 10, submodu
self.peers: dict[bytes, Peer] = dict()


def set_connected_callback(self, callback: Callable[[Peer], None]):
self.connected_callback = callback

def set_disconnected_callback(self, callback):
self.disconnected_callback = callback

@bindfrom("connected_callback")
def add_peer(self, addr: tuple[str,int], p: Peer):
self.connected_callback(addr, p)

async def stop(self):
self.peers.clear()

return await super().stop()
def ban_peer(self, addr: tuple[str,int], p: Peer):
return
@bindfrom("disconnected_callback")
Expand Down
13 changes: 11 additions & 2 deletions deccom/protocols/peerdiscovery/biggossip.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, bootstrap_peers: list[Peer] = [], interval: int = 60, k: int
self.peer_crawls = dict()
self.sent_finds = dict()
self.warmup = 0
self.refresh_loop = None
self.max_warmup = 60
self.searches: dict[bytes,bytes] = dict()

Expand All @@ -37,7 +38,7 @@ async def start(self, p: Peer):
msg = bytearray([BigGossip.ASK_FOR_ID])
await self.send_datagram(msg,p.addr)
loop = asyncio.get_event_loop()
loop.call_later(2, self.refresh_table)
self.refresh_loop = loop.call_later(2, self.refresh_table)

def refresh_table(self):

Expand Down Expand Up @@ -66,7 +67,15 @@ async def _refresh_table(self):
msg = bytearray([BigGossip.PULL])
await self.send_datagram(msg,v.addr)
self.refresh_loop = loop.call_later(self.interval, self.refresh_table)


async def stop(self):
if self.refresh_loop != None:
self.refresh_loop.cancel()
self.refresh_loop = None
self.searches.clear()
self.peer_crawls.clear()
self.sent_finds.clear()
return await super().stop()
def remove_peer(self, addr: tuple[str, int], node_id: bytes):
if self.peers.get(node_id) == None:
return
Expand Down
7 changes: 7 additions & 0 deletions deccom/protocols/peerdiscovery/fixedpeers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def introduce_to_others(self):
for a in self.a_to_p:
# self.introduced.append(a)
loop.create_task(self.introduction(a))
async def stop(self):
self.p_to_a.clear()
self.a_to_p.clear()
self.sent_finds.clear()
self.peer_crawls.clear()
self.introduced.clear()
return await super().stop()
async def introduction(self, addr):
msg = bytearray([1])
# print("introducing to ",addr)
Expand Down
9 changes: 7 additions & 2 deletions deccom/protocols/peerdiscovery/gossipdiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, bootstrap_peers: list[Peer] = [], interval: int = 10, submodu
super().__init__(bootstrap_peers, interval, submodule, callback, disconnected_callback, connected_callback)
self.peer_crawls = dict()
self.sent_finds = dict()

self.refresh_loop = None

async def start(self, p: Peer):
await super().start(p)
Expand Down Expand Up @@ -86,7 +86,12 @@ def remove_peer(self, addr: tuple[str, int], node_id: bytes):
if self.peers.get(node_id) != None:
del self.peers[node_id]
return super().remove_peer(addr, node_id)

async def stop(self):
if self.refresh_loop != None:
self.refresh_loop.cancel()
self.peer_crawls.clear()
self.sent_finds.clear()
return await super().stop()
async def introduce_to_peer(self, peer: Peer):
# print("introducing to", peer.id_node)
msg = bytearray([GossipDiscovery.INTRODUCTION])
Expand Down
17 changes: 15 additions & 2 deletions deccom/protocols/peerdiscovery/kademliadiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ def __init__(self, bootstrap_peers: list[Peer] = [], interval: int = 60, k: int
self.sent_finds = dict()
self.warmup = 0
self.max_warmup = 30 * (k//10)
self.refresh_loop = None
self.always_split = always_split
self.searches: dict[bytes,bytes] = dict()
self.finders: dict[bytes, Finder] = dict()
self.bucket_manager = None
async def start(self, p: Peer):
await super().start(p)
self.bucket_manager = BucketManager(self.peer.id_node,self.k,self._add,always_split = self.always_split)
Expand All @@ -39,8 +41,19 @@ async def start(self, p: Peer):
msg = bytearray([KademliaDiscovery.ASK_FOR_ID])
await self.send_datagram(msg,p.addr)
loop = asyncio.get_event_loop()
loop.call_later(2, self.refresh_table)

self.refresh_loop = loop.call_later(2, self.refresh_table)
async def stop(self):
self.searches.clear()
for k,v in self.finders.items():
v.stop()
self.finders.clear()
self.peer_crawls.clear()
self.sent_finds.clear()
if self.bucket_manager != None:
self.bucket_manager.clear()
if self.refresh_loop != None:
self.refresh_loop.cancel()
return await super().stop()
def refresh_table(self):

loop = asyncio.get_event_loop()
Expand Down
17 changes: 16 additions & 1 deletion deccom/protocols/reliableudp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ def __init__(self, connection, data = None, max_size = 3084, max_packets = 2, se
self.send_datagram = send_datagram
self.complete = False
self.desired = -1
def stop(self, clear_queue = True):
if self.refresh_loop != None:
self.refresh_loop.cancel()
if clear_queue and isinstance(self.data, asyncio.Queue):
while not self.data.empty():
self.data.get_nowait()
self.missing.clear()


def send(self, head: int):
if head * self.max_size >= self.ln:
self.done = True
Expand Down Expand Up @@ -254,7 +262,14 @@ def __init__(self, max_size = 3084, max_packets = 10, max_connections = 20, subm
self.receives: dict[tuple[tuple[str,int], bytes], Connection] = dict()
self.sends: dict[tuple[tuple[str,int], bytes], Connection] = dict()
self.connections = 0

async def stop(self):
for _,v in self.receives.items():
v.stop()
self.receives.clear()
for _,v in self.sends.items():
v.stop()
self.sends.clear()
return await super().stop()
def process_datagram(self, addr, data):

connection = data[0:4]
Expand Down
8 changes: 8 additions & 0 deletions deccom/protocols/securityprotocols/noiseprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,15 @@ def send_challenge(self, addr, peer: Peer, success, failure):
msg += sign(self.peer.key, SHA256(shared))
self.awaiting_approval[(addr,peer.id_node)] = (shared,peer,addr,success,failure)
loop.create_task(self.send_datagram(msg,addr))
async def stop(self):

for _,v in self.message_buffer.items():
v.clear()
self.message_buffer.clear()
self.keys.clear()
self.approved_connections.clear()
self.awaiting_approval.clear()
return await super().stop()
async def broadcast(self, msg):
for k,v in self.keys.items():
await self.sendto(msg, k)
Expand Down
11 changes: 10 additions & 1 deletion deccom/protocols/streamprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,16 @@ def __init__(self, always_connect: bool, submodule=None, callback: Callable[[tup


# self.lock = asyncio.Lock()

async def stop(self):
for k,v in self.connections.items():
async with self.locks[k]:
v.writer.close()
if v.fut != None:
v.fut.cancel()
self.connections.clear()
self.locks.clear()
self.await_connections.clear()
return await super().stop()

async def handle_connection(self, reader: asyncio.StreamReader,writer: asyncio.StreamWriter, node_id: Any = None, addr: Any = None):
#print("CONNECTION FROM PEER", writer.get_extra_info('peername'))
Expand Down
Loading

0 comments on commit 7c2e5d8

Please sign in to comment.