-
Notifications
You must be signed in to change notification settings - Fork 1
/
udpadvertiseserver.repy
322 lines (227 loc) · 9.7 KB
/
udpadvertiseserver.repy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""
Author: Justin Cappos
Start Date: Oct 30th, 2011
Description:
Advertisements to a central server (similar to openDHT)
This is mostly derived from the old advertise server
"""
dy_import_module_symbols("session")
dy_import_module_symbols("serialize")
serverport = 10102
servername = getmyip() # IP of the local box (our production udpadvertiserver located on blackbox)
# These are storage locations for data pertinent to logging.
mycontext["get_times"] = []
mycontext["put_times"] = []
# These are file objects for logging purposes. They are initialized
# in the main code, and used only in the _log_with_timestamp method.
mycontext["query_log"] = None
# How frequently should we purge our log data?
mycontext["log_spacing"] = 150 # How about every two and a half minutes?
hashtable = {}
#used for logging
logfiledata=dict()
#contains a list of get/put requests that have been completed, nothing is logged here for requests that are failed
#the call_trace log follows the following format:
#<timestamp> <PUT/GET> <request identification key> <first 16 characters of key> <value> <expiration interval in case of PUT, blank in case of GET>
logfiledata["call_trace"]=None
#contains information that is logged while each request is processed
#will log information for all requests that are received
logfiledata["expanded_log"]=None
log_lock = createlock()
#safely write to the given log file
def write_to_log(log_file, message):
# This has been commented out to prevent logging.
#log the request information
# log_lock.acquire()
# log_file.write(message)
# log_file.flush()
# log_lock.release()
pass
def expire_hashtable_items():
now = getruntime()
for key in hashtable.copy():
# copying the list, so I can remove elements...
for valuetimepair in hashtable[key][:]:
if valuetimepair[1] < now:
try:
# if time has expired, remove it...
hashtable[key].remove(valuetimepair)
except ValueError:
# This is okay. It means another instance of expire_hashtable_items
# or an insert_hashtable removed the entry.
pass
def insert_hashtable(key, value, expiretime):
if key not in hashtable:
hashtable[key] = []
# copying the list, so I can remove elements...
for valuetimepair in hashtable[key][:]:
if valuetimepair[0] == value:
# if there is already a value, remove it...
try:
hashtable[key].remove(valuetimepair)
except ValueError:
# This is okay. It means an instance of expire_hashtable_items or
# another insert_hashtable instance removed the entry.
pass
hashtable[key].append((value,expiretime))
def read_hashtable(key, maxkeys):
if key not in hashtable:
#log the hash table miss
expanded_message = str(getruntime())+": unable to find key " + str(key)[0:16] + "... in hashtable.\n"
write_to_log(logfiledata["expanded_log"], expanded_message)
return []
retlist = []
for item in hashtable[key]:
retlist.append(item[0])
return retlist[:maxkeys]
# This will receive requests that look like this:
# ('PUT', 'key', 'value', 100, requestid) # put 'value' under 'key' for
# # 100 seconds
# ('GET', 'key', 10, requestid) # get at most 10 values stored under 'key'
#
# The responses returned for a PUT look like the following:
# ('OK',requestid) (in response to put)
# ('An error foobar occurred when performing a PUT',requestid)
#
# The responses returned for a GET look like the following:
# ('OK',[],requestid) # (no items under key)
# ('OK',['abc','123'],requestid) # ('abc' and '123' are under the key)
# ('An error foobar occurred when performing a PUT',requestid)
# The requestid items listed above are merely a way for the caller to know
# which response corresponds to a request. It is opaque and is not assumed
# do be of any specific type, etc.
# I know it's a little wonky to return data of different types. I also know
# I could have used dicts instead of tuples. After consideration I decided
# not to do either because I'm not sure this would have added much clarity.
# let's expire every 10 seconds
EXPIRE_FREQUENCY = 10
expire_hashtable_items_lock = createlock()
# This is the new handler. This works on serialized requests...
def handlerequest(remoteIP, remoteport, rawrequestdata):
start_time = getruntime()
# This is when I last ran expire_hashtable_items
if (getruntime() > mycontext['last_expire_hashtable_items_time'] + EXPIRE_FREQUENCY):
# if we can get the lock, great! If not, continue...
if expire_hashtable_items_lock.acquire(False):
# expire the hashtable items that are obsolete and update the time...
expire_hashtable_items()
mycontext['last_expire_hashtable_items_time'] = getruntime()
expire_hashtable_items_lock.release()
# Now let's process the request...
try:
try:
requesttuple = serialize_deserializedata(rawrequestdata)
except ValueError, e:
print 'serialize_deserializedata:',e,' with string "'+rawrequestdata+'"'
return
if not type(requesttuple) is tuple:
print 'Request is '+str(type(requesttuple))+' not tuple.'
return
if requesttuple[0] == 'PUT':
############# START Tons of type checking
try:
(key, value, ttlval, requestid) = requesttuple[1:]
except ValueError, e:
print 'Incorrect format for request tuple: ',str(requesttuple)
return
if type(key) is not str:
print 'Key type for PUT must be str, not',type(key)
return
if type(value) is not str:
print 'Value type must be str, not',type(value)
return
if type(ttlval) is not int and type(ttlval) is not long:
print 'TTL type must be int or long, not',type(ttlval)
return
if ttlval <=0:
print 'TTL must be positive, not ',ttlval
return
############# END Tons of type checking
now = getruntime()
# insert the items...
insert_hashtable(key, value, ttlval+now)
insert_hashtable('%all', value, ttlval+now)
senddata = serialize_serializedata(("OK",requestid))
# all is well...
sendmessage(remoteIP, remoteport, senddata, servername, serverport)
mycontext["put_times"].append(getruntime() - start_time)
return
elif requesttuple[0] == 'GET':
############# START Tons of type checking (similar to above
try:
(key, maxvals, requestid) = requesttuple[1:]
except ValueError, e:
print 'Incorrect format for request tuple: ',str(requesttuple)
return
if type(key) is not str:
print 'Key type for GET must be str, not',type(key)
return
if type(maxvals) is not int and type(maxvals) is not long:
print 'Maximum value type must be int or long, not',type(maxvals)
return
if maxvals <=0:
print 'Value type must be positive, not ',maxvals
return
############# END Tons of type checking
# do the actual work... read the value and send the response...
readlist = read_hashtable(key, maxvals)
senddata = serialize_serializedata(("OK",readlist, requestid))
sendmessage(remoteIP, remoteport, senddata, servername, serverport)
mycontext["get_times"].append(getruntime() - start_time)
return
else:
print 'Unknown request:',str(requesttuple)
return
except Exception,e:
print "While handling request, received: ",e
def _calcmean(collection):
if len(collection) == 0:
return 0
return sum(collection) / len(collection)
def _perform_logging():
query_log_index = 0
while True:
# It's a shame, but this is a MESS. Suffice it to say, it constructs a
# data string that looks something like this when written:
# [3456785.2432] PUT: 120 GET: 35
# AVG PUT: 0.001341
# AVG GET: 0.002340
# I do not know offhand how to timestamp queries in a way legible to the
# uninitiated, so this will have to do for now.
helper_string = ""
for i in range(len(str(getruntime()))):
helper_string = helper_string + " "
query_log_str = "[" + str(getruntime()) + "] PUT: "
query_log_str = query_log_str + str(len(mycontext["put_times"]))
query_log_str = query_log_str + " GET: " + str(len(mycontext["get_times"])) + "\n"
query_log_str = query_log_str + " " + helper_string
query_log_str = query_log_str + "AVG PUT: " + str(_calcmean(mycontext["put_times"])) + "\n"
query_log_str = query_log_str + " " + helper_string
query_log_str = query_log_str + "AVG GET: " + str(_calcmean(mycontext["get_times"])) + "\n"
# Reset counts
mycontext["put_times"] = []
mycontext["get_times"] = []
# Flush the data and update index.
mycontext["query_log"].writeat(query_log_str, query_log_index)
query_log_index += len(query_log_str)
# Wait for another log spacing.
sleep(mycontext["log_spacing"])
if callfunc=='initialize':
print "Started"
# time_updatetime(34612)
mycontext["query_log"] = openfile("log.volume", True)
createthread(_perform_logging)
# This is when I last ran expire_hashtable_items
mycontext['last_expire_hashtable_items_time'] = getruntime()
#initialize the log files
# logfiledata["call_trace"] = open("log.calltrace",'a')
# logfiledata["expanded_log"] = open("log.expanded",'a')
print "Detailed logging has been disabled, see write_to_log to enable it"
# recvmess(servername,serverport,handlerequest)
udpserversock = listenformessage(servername, serverport)
while True:
try:
remoteIP, remoteport, message = udpserversock.getmessage()
handlerequest(remoteIP, remoteport, message)
except SocketWouldBlockError, e:
continue