-
Notifications
You must be signed in to change notification settings - Fork 1
/
Server.py
151 lines (131 loc) · 5.32 KB
/
Server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import asyncio
import socket
import threading
from config import DISCOVERY_PORT, DELIVERY_PORT, SELF_IP, OFFER_TIMEOUT, SERVER_WAIT_FOR_SCRIPT_TOLERANCE, MESSAGE_TIMEOUT, OFFER_PORT
from handlers import execute_script, send_probe_response
import select
from utils import change_style
class Task:
def __init__(self, hash):
self.hash = hash
self.script = None
self.limit = None
self.offset = None
self.result = None
def get_result_message(self):
message = "|".join([self.hash, self.result, self.offset, self.limit])
return message.encode('utf_8')
class Server:
def __init__(self,quant):
self.busy = False
self.loop = None
self.task = None
self.quant= quant
self.lock = threading.Lock()
self.task = None
def set_busy(self):
self.busy = True
def timeout_after_offer(self, hash):
if self.task is not None:
if self.task.script is not None:
return
if self.task.hash == hash:
self.busy = False
self.task = None
def timeout_after_execution(self, hash):
if self.task is None:
return
if self.task.hash == hash:
self.busy = False
self.task = None
def start_discovery_handler(self):
disc_thread = DiscoveryHandler(self.send_probe_response)
disc_thread.setDaemon(True)
disc_thread.start()
def send_probe_response(self, receiver_ip):
msg = "HERE|"
msg += SELF_IP + "|" + str(self.quant)
msg = msg.encode('utf_8')
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server:
try:
server.settimeout(MESSAGE_TIMEOUT)
server.connect((receiver_ip, DISCOVERY_PORT))
server.sendall(msg)
except Exception as ex:
print("Error occured while sending the discovery response", ex)
return
finally:
server.close()
def serve(self):
#t = threading.Thread(target = self.start_server)
#t.setDaemon(True)
#t.start()
asyncio.run(self.start_server())
async def start_server(self):
self.loop = asyncio.get_running_loop()
server = await self.loop.create_server(
lambda: OfferTakerProtocol(self),
SELF_IP, OFFER_PORT)
print("Started Offer Server")
async with server:
await server.serve_forever()
class OfferTakerProtocol(asyncio.Protocol):
def __init__(self, server):
self.server = server
self.loop = server.loop
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
self.server.lock.acquire()
try:
if len(data.decode('utf_8').split("|")) == 2:
msg = ""
if self.server.busy:
msg += "BUSY|"
else:
msg += "OK|"
self.server.set_busy()
self.loop.call_later(SERVER_WAIT_FOR_SCRIPT_TOLERANCE, self.server.timeout_after_offer,(hash,))
self.server.task = Task(data.decode('utf_8').split("|")[1])
msg = msg + SELF_IP + "|" + str(self.server.quant)
print("Received Script Offer")
self.transport.write(msg.encode('utf_8'))
self.transport.close()
else:
if self.server.task is None:
return
print("Received Script Content")
self.loop.call_later(self.server.quant + SERVER_WAIT_FOR_SCRIPT_TOLERANCE, self.server.timeout_after_execution,(self.server.task.hash,))
self.server.task.hash, self.server.task.script, self.server.task.offset, self.server.task.limit = data.decode('utf_8').split("|")
self.server.task.result = execute_script(self.server.task, self.server.quant)
message = self.server.task.get_result_message()
self.transport.write(message)
self.server.task = None
self.server.busy = False
self.transport.close()
except Exception as ex:
print("Exception occured during data receive on Server: " + str(ex))
finally:
self.server.lock.release()
class DiscoveryHandler(threading.Thread):
def __init__(self, discovery_cb):
threading.Thread.__init__(self)
self.discovery_cb = discovery_cb
self.discovery_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print('initializing discovery thread')
def run(self):
self.discovery_socket.bind(('', DISCOVERY_PORT))
self.discovery_socket.setblocking(0)
while True:
result = select.select([self.discovery_socket], [], [])
msg = result[0][0].recv(1024)
msg = msg.decode('utf_8')
if len(msg.split('|')) > 2:
continue
probe,received_from = msg.split('|')
print("Received discovery request from:" + received_from)
self.discovery_cb(received_from)
capacity = input("\n" + change_style("Enter the number of seconds that this server should spend for each execution", 'underline') + ": ")
s = Server(int(capacity))
s.start_discovery_handler()
s.serve()