-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtinycomet.py
executable file
·214 lines (176 loc) · 6 KB
/
tinycomet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# -*- coding:utf-8 -*-
from eventlet import wsgi
from eventlet.green import socket, threading
from eventlet.corolocal import local
import urllib
import wsgiref
import functools
import time
import json
import binascii
ERRORS = {
400: 'Bad Request',
404: 'Not Found',
408: 'Request Timeout'
}
WAIT_INTERVAL = 2.5
_comet_storage = {}
_update_lock = threading.Condition()
class TimeoutException(Exception):
pass
class LogicalTimer(object):
def __init__(self):
self._time = 1
def get_global(self):
return self._time
def get_local(self):
return local.logical_time
def inc(self):
self._time += 1
return self._time
def timer_middleware(self, app):
def middleware(env, start_response):
local.logical_time = self.inc()
return app(env, start_response)
return middleware
_logical_timer = LogicalTimer()
class CometData(object):
def __init__(self, payload, last_update, content_type='application/octet-stream', finished=False):
self.payload = payload
self.last_update = last_update
self.content_type = content_type
self.finished = finished
def __repr__(self):
return '<CometData (%d):`%r`:%s>' % (self.last_update, self.payload, self.content_type)
def make_dispatch_middleware(urlmap):
def app(env, start_response):
path = env['PATH_INFO']
for k, v in urlmap.iteritems():
if path.startswith(k):
env['SHIFT_PATH_INFO'] = path[len(k):]
return v(env, start_response)
return error_response(start_response, 404)
return app
def error_response(start_response, code):
msg = ERRORS.get(code, '%d' % code)
start_response(
'%d %s' % (code, msg),
[('Content-Type', 'text/plain')]
)
return [msg, '\r\n']
def parse_query(env):
query = env['QUERY_STRING']
if not query:
return {}
query = [x.split('=', 1) for x in query.split('&')]
query = dict((x[0], urllib.unquote(x[1])) for x in query)
return query
def make_comet_receiver(func):
@functools.wraps(func)
def receiver(env, start_response):
path = env['SHIFT_PATH_INFO']
if not path:
return error_response(start_response, 404)
return func(path, env, start_response)
return receiver
@make_comet_receiver
def wait_receiver(uuid, env, start_response):
query = parse_query(env)
since = int(query['since'], 10) if 'since' in query else None
if since:
timeout = float(query['timeout']) if 'timeout' in query else None
last = time.time() + timeout if timeout else None
try:
# もっと綺麗にかけないかしら
with _update_lock:
while True:
data = _comet_storage[uuid]
if not since or data.last_update > since:
break
# wait for update
_update_lock.wait(timeout)
timeout = last - time.time() if last else None
if timeout is not None and timeout < 0:
raise TimeoutException
except KeyError:
return error_response(start_response, 404)
except TimeoutException:
return error_response(start_response, 408)
if data.finished:
# remove from storage
del _comet_storage[uuid]
jsonp = query.get('callback')
headers = [
('X-TC-Timestamp', '%d' % data.last_update)
]
if jsonp is None:
headers.append(('Content-Type', data.content_type))
if data.finished:
headers.append(('X-TC-Removed', 'removed'))
start_response('200 OK', headers)
return [data.payload]
else:
obj = {
'last_update': data.last_update,
'content_type': data.content_type,
'finished': data.finished
}
if data.content_type in ['application/json']:
obj['payload'] = json.loads(data.payload)
elif data.content_type.startswith('text/'):
obj['payload_text'] = data.payload
else:
obj['payload_base64'] = binascii.b2a_base64(data.payload)
headers.append(('Content-Type', 'text/javascript'))
print json.dumps(obj)
start_response('200 OK', headers)
return [jsonp, '(', json.dumps(obj), ')']
@make_comet_receiver
def update_receiver(uuid, env, start_response):
if env['REQUEST_METHOD'] != 'POST':
return error_response(start_response, 400)
query = parse_query(env)
last_update = _logical_timer.get_local()
with _update_lock:
_comet_storage[uuid] = CometData(
env['wsgi.input'].read(),
last_update,
content_type=query.get('content_type', 'application/octet-stream'),
finished=query.get('finished', '') == '1'
)
_update_lock.notify_all()
headers = [
# ('Content-Type', data.content_type),
('X-TC-Timestamp', '%d' % last_update)
]
start_response('201 Created', headers)
return []
def get_options(args):
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-b", "--bind", dest="bind",
help="bind address",
default='127.0.0.1:8090')
options, args = parser.parse_args(args)
return options
def main(args=''):
options = get_options(args)
# bind and listen
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
bind = options.bind.split(':')
if len(bind) == 1:
bind = t[0], '8090'
sock.bind((bind[0], int(bind[1], 10)))
sock.listen(500)
# launch wsgi app
app = make_dispatch_middleware({
'/wait/': wait_receiver,
'/update/': update_receiver,
# '/wait_multi': wait_multi_receiver
})
app = _logical_timer.timer_middleware(app)
wsgi.server(sock, app)
if __name__ == '__main__':
import sys
main(sys.argv[1:])