Skip to content

Commit

Permalink
major revamps
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolayBlagoev committed Apr 16, 2024
1 parent 98d2a3f commit 996f535
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 35 deletions.
12 changes: 8 additions & 4 deletions deccom/protocols/defaultprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ def set_callback(self, callback):
async def send_datagram(self, msg: bytes, addr: tuple[str,int]):

await self.sendto(self.uniqueid + msg, addr)



def process_datagram(self, data, addr):
def process_datagram(self, addr, data):
loop = asyncio.get_event_loop()

if len(data) < 2:
print("invalid msg received")
return
if data[0] == DefaultProtocol.PING:

loop.create_task(self.handle_ping(addr, data[1:]))
elif data[0] == DefaultProtocol.PONG:
loop.create_task(self.handle_pong(addr,data[1:]))
Expand Down Expand Up @@ -86,7 +89,7 @@ async def send_ping(self, addr, success, error, dt = 10):
while self.pings.get(msg_id) != None:
bts = os.urandom(4)
msg_id = int.from_bytes(bts, "big")
# print("sending ping",addr)

timeout = loop.call_later(dt+2,
self.timeout, addr,error,msg_id)
self.pings[msg_id] = (success, timeout)
Expand All @@ -99,13 +102,14 @@ async def send_ping(self, addr, success, error, dt = 10):
async def handle_ping(self, addr, data):
trmp = bytearray([DefaultProtocol.PONG])
trmp = trmp + data

await self.send_datagram(trmp, addr=addr)
# print("sent pong",addr)

return

async def handle_pong(self, addr, data):
msg_id = int.from_bytes(data, "big")
# print("received pong",addr )

if self.pings.get(msg_id) is None:
return
success, timeout = self.pings[msg_id]
Expand Down
19 changes: 11 additions & 8 deletions deccom/protocols/delayprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,26 @@

class DelayProtocol(AbstractProtocol):
def __init__(self, delay_map, submodule=None, callback: Callable[[tuple[str, int], bytes], None] = ...):
self.stream_callback = lambda data, node_id,addr: ...

self.delay_map = delay_map
super().__init__(submodule, callback)

@bindfrom("stream_callback")
def process_data(self,data,node_id,addr):

#self.stream_callback(data,node_id,addr)
async def send_stream(self,node_id,data):
print("delay...")
p = self.get_peer(node_id)
print(p)
loop = asyncio.get_event_loop()
dl = self.delay_map(p.pub_key, self.peer.pub_key)
print(dl)
print("will receive in ",dl[0]/1000 + len(data)/(1024**3*dl[1]))
loop.call_later(dl[0]/1000 + len(data)/(1024**3*dl[1]), self.stream_callback, data, node_id, addr)
#self.stream_callback(data,node_id,addr)


print("will send in ",dl[0]/1000 + 100*len(data)/(1024**3*dl[1]))
await asyncio.sleep(dl[0]/1000 + 100*len(data)/(1024**3*dl[1]))
if self.started:
return await self._lower_send_to(node_id,data)
@bindto("send_stream")
async def _lower_send_to(self, nodeid, data):
return
@bindto("get_peer")
def get_peer(self, id: bytes) -> Union[Peer,None]:
return None
1 change: 1 addition & 0 deletions deccom/protocols/keepalive.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def deregister_keep_alive(self, node_id):
def remove_peer(self, addr: tuple[str, int], node_id: bytes):
if not self.started:
return
print("PEER TIMED OUTs")
if self.keep_alives.get(node_id) != None:
del self.keep_alives[node_id]
self.disconnected_callback(addr,node_id)
Expand Down
2 changes: 1 addition & 1 deletion deccom/protocols/peerdiscovery/kademliadiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def _refresh_table(self):

def remove_peer(self, addr: tuple[str, int], node_id: bytes):
if self.bucket_manager.get_peer(node_id) == None:
return
return super().remove_peer(addr, node_id)
del self.peers[node_id]
print(self.peer.pub_key, "removing peer.", self.bucket_manager.get_peer(node_id).pub_key)
self.bucket_manager.remove_peer(node_id)
Expand Down
81 changes: 59 additions & 22 deletions deccom/protocols/streamprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,43 @@ async def stop(self):

async def handle_connection(self, reader: asyncio.StreamReader,writer: asyncio.StreamWriter, node_id: Any = None, addr: Any = None):
#print("CONNECTION FROM PEER", writer.get_extra_info('peername'))
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"connection from {writer.get_extra_info('peername')}\n")
addr = writer.get_extra_info('peername')
try:
data = await asyncio.wait_for(reader.readline(), timeout=10)
data = await asyncio.wait_for(reader.readexactly(36), timeout=10)

except exceptions.TimeoutError:
print('PEER DID NOT INTRODUCE THEMSELVES')
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"peer {addr} did not introduce themselves\n")
writer.close()
return
if len(data)<5 or data[0:4] != b'\xe4\xe5\xf3\xc6':
print("wrong introduction")
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"wrong introduction \n")
writer.close()
return
node_id = data[4:-1]
node_id = data[4:]
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"connection is from {addr} {node_id}\n")
if self.locks.get(node_id) == None:
self.locks[node_id] = asyncio.Lock()
# print("connection from",node_id)
if self.connections.get(node_id) != None:
print("duplicate connection RECEIVED")
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"duplicate connection from {addr} {node_id}\n")
#rint(self.connections.get(node_id).opened_by_me,ternary_comparison(self.peer.id_node, node_id))
if self.connections.get(node_id).opened_by_me * ternary_comparison(self.peer.id_node, node_id) == -1:
print("closing previous, keeping new")
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"closing previous with {addr} {node_id}\n")
self.remove_from_dict(node_id)
else:
print("keeping old one")
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"keeping old one with {addr} {node_id}\n")
writer.close()
return
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"listening from {addr} {node_id}\n")
self.connections[node_id] = DictItem(reader,writer,None, -1)
self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,addr))
return
Expand Down Expand Up @@ -153,29 +164,34 @@ async def open_connection(self, remote_ip, remote_port, node_id: bytes, port_lis

s.connect((remote_ip,remote_port))
reader, writer = await asyncio.wait_for(asyncio.open_connection(sock = s), timeout=10)

if self.locks.get(node_id) == None:
self.locks[node_id] = asyncio.Lock()
except ConnectionRefusedError as e:
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write("connection refused\n")
self.await_connections[node_id].set_result(False)

s.close()
s.shutdown(0)

print("BROKEN SOMETHING ?")
return False
except asyncio.TimeoutError as e:
print("timed out...")
print(e.with_traceback())

s.close()
s.shutdown(0)

return False
self.connections[node_id] = DictItem(reader,writer,None,1)
self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,(remote_ip,remote_port)))
#print("introducing myself :)")
async with self.locks[node_id]:
writer.write(b'\xe4\xe5\xf3\xc6')
writer.write(self.peer.id_node)
writer.write(b'\n')

with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"introducing with {len(self.peer.id_node)} \n ")

await writer.drain()
self.await_connections[node_id].set_result(True)
Expand Down Expand Up @@ -206,6 +222,9 @@ async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, ad

# seqrand = random.randint(1,40000)
#print("listening for data")
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(f"listening for data {node_id} \n")

try:
data = await reader.read(32)
except (ConnectionResetError, BrokenPipeError) as e:
Expand All @@ -221,7 +240,7 @@ async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, ad
self.remove_from_dict(node_id)
self.closed_stream(node_id, addr)
return


if data == b'':
async with self.locks[node_id]:
Expand Down Expand Up @@ -257,21 +276,39 @@ async def listen_for_data(self, reader: asyncio.StreamReader, node_id = None, ad
asyncio.run_coroutine_threadsafe(self._caller(buffer,node_id,addr), loop)

self.connections[node_id].fut = asyncio.ensure_future(self.listen_for_data(reader,node_id,addr))
async def send_stream(self, node_id, data):
async def send_stream(self, node_id, data, lvl = 0):

if self.connections.get(node_id) == None:
print("CANT FIND???")
return
async with self.locks[node_id]:
if self.connections.get(node_id) == None or lvl >= 3:

return False
try:
async with self.locks[node_id]:
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(datetime.now().strftime("%d/%m/%Y, %H:%M:%S"))
log.write(f" sending to {self.get_peer(node_id).pub_key} {len(data)}\n")

self.connections[node_id].writer.write(len(data).to_bytes(32,byteorder="big"))
await self.connections[node_id].writer.drain()
self.connections[node_id].writer.write(data)
await self.connections[node_id].writer.drain()
except ConnectionResetError:
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(datetime.now().strftime("%d/%m/%Y, %H:%M:%S"))
log.write(f" sending to {self.get_peer(node_id).pub_key} {len(data)}\n")

self.connections[node_id].writer.write(len(data).to_bytes(32,byteorder="big"))
await self.connections[node_id].writer.drain()
self.connections[node_id].writer.write(data)
await self.connections[node_id].writer.drain()
log.write(f" cannot send to {self.get_peer(node_id).pub_key} {len(data)}\n")
self.remove_from_dict(node_id)
await asyncio.sleep(3)
p: Peer = self.get_peer(node_id)
if p == None:
return False
ret = await self.open_connection(p.addr[0],p.tcp, p.id_node, port_listen = 0)
if ret == False:
return False
return await self.send_stream(node_id,data, lvl=lvl+1)
with open(f"log{self.peer.pub_key}.txt", "a") as log:
log.write(datetime.now().strftime("%d/%m/%Y, %H:%M:%S"))
log.write(f" finished sending to {self.get_peer(node_id).pub_key} {len(data)}\n")
# print("done srream")
return True
def set_stream_close_callback(self, callback):
self.stream_close_callback = callback
async def _caller(self,data,node_id,addr):
Expand Down

0 comments on commit 996f535

Please sign in to comment.