-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfeed_logger.py
319 lines (255 loc) · 9.69 KB
/
feed_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
#!/usr/bin/env python
"Load and display many feeds"
import functools
import optparse
import sys
import feedparser
# Helper methods for many of the main loops
def load_urls(filename):
"load all the feeds from a .lst file"
for line in open(filename):
if line.strip():
yield line.strip()
def load_feed(feed_url):
"This wrapper exists purely to silence feed loading errors"
try:
return feedparser.parse(feed_url)
except Exception, e:
print >> sys.stderr, "load", type(e)
return None
# handler methods
def handler_decorator(func):
"Decorator to make safe handling easier"
@functools.wraps(func)
def handler(feed):
"Handle any feed, ignoring empty feeds and errors"
if feed:
try:
func(feed)
except Exception, e:
print >> sys.stderr, "display", type(e), e
return handler
@handler_decorator
def display_feed(feed):
"""Dispaly every item in this feed to the termial"""
for item in feed.entries:
print item.updated, feed.feed.link, item.title
# single thread syncronus code
def main(filename, handler, count=None):
"""For all feeds in the file handle all
elements with the handler function"""
for feed_url in load_urls(filename):
handler(load_feed(feed_url))
#parallel versions of the code
def fork_main(filename, handler, count=3):
""" Capped forked child processed based off
http://jacobian.org/writing/python-is-unix/
"""
import os
urls = list(load_urls(filename))
running = 0
while urls:
if running < count:
# if we should make more processes pick an item
# and fork a process to download it
running += 1
feed_url = urls.pop()
pid = os.fork()
if pid == 0: #child process - handle one feed and die
handler(load_feed(feed_url))
break
else:
# if we have enough proccesses already
# wait for one to end
os.wait()
running -= 1
else:
# have the main process wait for all children to finish
while running > 0:
os.wait()
running -= 1
def multyprocess_main(filename, handler, count=3):
"""load all feeds over count processes
then handle all results with the handler function
Delays untill all feeds are read before handling
"""
import multiprocessing
pool = multiprocessing.Pool(count)
for feed in pool.map(load_feed, load_urls(filename)):
handler(feed)
pool.close()
def threaded_main(filename, handler, count=3):
"""load all feeds using count threads
results handled with the handler function
Used the fact that list.pop is attomic to avoid locking
"""
import threading
urls = list(load_urls(filename))
def thread_of_control():
while urls:
try:
handler(load_feed(urls.pop()))
except IndexError:
pass
for i in range(count):
threading.Thread(target=thread_of_control).start()
# as no threads are demons execution continues till compleation
def eventlet_main(filename, handler, count=3):
import eventlet
feedparser = eventlet.import_patched('feedparser')
urls = list(load_urls(filename))
def load_feed(feed_url):
"This wrapper exists purly to silence feed loading errors"
try:
return feedparser.parse(feed_url)
except Exception, e:
print >> sys.stderr, "load", type(e)
return None
pool = eventlet.GreenPool(count)
for feed in pool.imap(load_feed, urls):
handler(feed)
def queued_main(filename, handler, count=3):
"""load all feeds using count threads
communicate through queues to avoid both locking and waits
"""
import threading
import Queue
def start_demon(func):
t = threading.Thread(target=func)
t.daemon = True
t.start()
in_queue = Queue.Queue()
out_queue = Queue.Queue()
cap = threading.BoundedSemaphore(count)
def loader():
while True:
cap.acquire()
item = in_queue.get()
out_queue.put(load_feed(item))
in_queue.task_done()
cap.release()
def writer():
while True:
cap.acquire()
item = out_queue.get()
handler(item)
out_queue.task_done()
cap.release()
for i in range(count):
start_demon(loader)
start_demon(writer)
for item in load_urls(filename):
in_queue.put(item)
# wait till all feeds are fully processed
in_queue.join()
out_queue.join()
def queued_main_two(filename, handler, count=3):
"""load all feeds using count threads
communicate through queues to avoid both locking and delays
this version uses a higher abstraction (queue consumer thread)
"""
import threading
import Queue
# this would be better at the main level or in a library
# its only here to make it clear which block of code
# it is part of
def consumer_demon(source, consumer, cap):
"""Start a demon that consumes from source passing each
item to the consumer"""
def server():
"""Consume items from source passing each to the consumer
each task is marked as done before looking for another
"""
while True:
item = source.get()
cap.acquire()
try:
consumer(item)
except Exception, e:
print >> sys.stderr, type(e)
print >> sys.stderr, e
finally:
source.task_done()
cap.release()
t = threading.Thread(target=server)
t.daemon = True
return t
in_queue = Queue.Queue()
out_queue = Queue.Queue()
cap = threading.Semaphore(count)
download = lambda item: out_queue.put(load_feed(item))
for i in range(count):
consumer_demon(in_queue, download, cap).start()
consumer_demon(out_queue, handler, cap).start()
for item in load_urls(filename):
in_queue.put(item)
# wait till all feeds are fully processed
in_queue.join()
out_queue.join()
def queued_main_three(filename, handler, count=3):
"""load all feeds using count threads
communicate through queues to avoid both locking and delays
this version uses a class as the daemon controller
"""
import threading
import Queue
class Consumer(threading.Thread):
"""A thread to read from a queue handling
each item taken with the consumer method or function
"""
def __init__(self, source, consumer=None, cap=None, daemon=True):
super(Consumer, self).__init__()
self.source = source
self.daemon = daemon
self.cap = cap or threading.BoundedSemaphore()
if consumer:
self.consumer = consumer
def __enter__(self):
# This ordering avoids race conditions by allowing
# the option of slightly more than the requested number
# of threads to run for a short period of time
item = self.source.get()
self.cap.acquire()
return item
def __exit__(self, type, value, traceback):
if type != None:
print >> sys.stderr, type, value
self.source.task_done()
self.cap.release()
return True # supress any exception
def run(self):
while True:
with self as item:
self.consumer(item)
def consumer(self, item):
pass
in_queue = Queue.Queue()
out_queue = Queue.Queue()
cap = threading.BoundedSemaphore(count)
download = lambda item: out_queue.put(load_feed(item))
for i in range(count):
Consumer(in_queue, download, cap).start()
Consumer(out_queue, handler, cap).start()
for item in load_urls(filename):
in_queue.put(item)
# wait till all feeds are fully processed
in_queue.join()
out_queue.join()
if __name__ == '__main__':
parser = optparse.OptionParser(usage="usage: %prog [options] file1 [file2 ...]", version="%prog 0.1")
parser.add_option("-n", dest="processes", help="Number of parrallel 'threads'", default=3)
parser.add_option("-s", dest="main", action="store_const", const=main, help="Simple, non-threaded execution", default=main)
parser.add_option("-f", dest="main", action="store_const", const=fork_main, help="forked execution")
parser.add_option("-m", dest="main", action="store_const", const=multyprocess_main, help="multy-proccess execution")
parser.add_option("-t", dest="main", action="store_const", const=threaded_main, help="threaded execution")
parser.add_option("-e", dest="main", action="store_const", const=eventlet_main, help="asynchronously (eventlet) execution")
parser.add_option("-q", dest="main", action="store_const", const=queued_main, help="queued threaded execution")
parser.add_option("-Q", dest="main", action="store_const", const=queued_main_two, help="cleaner queued threaded execution")
parser.add_option("-O", dest="main", action="store_const", const=queued_main_three, help="cleaner queued threaded execution")
parser.add_option("-o", dest="handling", action="store_const", const=display_feed, help="Print breaf descriptions to stdout", default=display_feed)
options, args = parser.parse_args()
if args:
for link in args:
options.main(link, options.handling, options.processes)
else:
parser.error("provide at least one source file")