-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy patheve.py
452 lines (381 loc) · 19.1 KB
/
eve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#py 3.5 or 3.6
import json
import itertools
import random
import logging
import traceback
import io
import os
import collections
import tempfile
import time
import shutil
import erequests
import cfscrape
import eventlet
import eventlet.db_pool
import eventlet.backdoor
import eventlet.semaphore
import eventlet.event
from eventlet.green import MySQLdb
import config
import utils
boards = []
#concurrency control
connectionPool = eventlet.db_pool.ConnectionPool(MySQLdb, host=config.host, user=config.user, passwd=config.passwd, db=config.db, charset='utf8mb4', sql_mode='ANSI', max_idle=10, max_size=8)
ratelimitSemaphore = eventlet.semaphore.BoundedSemaphore()
#network objects
fourChanSession = erequests.session()
cfScraper = cfscrape.create_scraper(sess=fourChanSession)
#namedtuples
Request = collections.namedtuple('Request', ['url', 'event'])
MediaRow = collections.namedtuple('MediaRow',
["media_id",
"media_hash", #base64 encoded MD5 or something?
"media", #full size filename?
"preview_op", #OP preview filename
"preview_reply", #replay preview filename
"total", # number of instances?
"banned"])
#SQL queries
insertQuery = ("INSERT INTO `{board}`"
" (poster_ip, num, subnum, thread_num, op, timestamp, timestamp_expired, preview_orig, preview_w, preview_h, "
" media_filename, media_w, media_h, media_size, media_hash, media_orig, spoiler, deleted, "
" capcode, email, name, trip, title, comment, delpass, sticky, locked, poster_hash, poster_country, exif) "
" SELECT 0,%s,0,%s,%s,%s,0,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s "
" FROM DUAL WHERE NOT EXISTS (SELECT 1 FROM `{board}` WHERE num = %s AND subnum = 0)"
" AND NOT EXISTS (SELECT 1 FROM `{board}_deleted` WHERE num = %s AND subnum = 0);\n")
updateQuery = "UPDATE `{board}` SET comment = %s, deleted = %s, media_filename = COALESCE(%s, media_filename), sticky = (%s OR sticky), locked = (%s or locked) WHERE num = %s AND subnum = %s"
updateDeletedQuery = "UPDATE `{board}` SET deleted = 1, timestamp_expired = %s WHERE num = %s AND subnum = 0"
selectMediaQuery = 'SELECT * FROM `{board}_images` WHERE `media_hash` = %s'
with open('create board.sql') as f:
createTablesQuery = f.read()
#logger stuff
logger = logging.getLogger("eve")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s:%(message)s')
if getattr(config, "logToFile", True):
fh = logging.FileHandler(getattr(config, "logFile", "eve.log"))
fh.setLevel(logging.WARNING)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.info("file logging initialized")
if getattr(config, "logToStdout", False):
ch = logging.StreamHandler()
ch.setLevel(getattr(config, "stdoutLogLevel", logging.INFO))
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("stdout logging initialized")
#https://stackoverflow.com/a/9929970/432690
def add_custom_print_exception():
old_print_exception = traceback.print_exception
def custom_print_exception(etype, value, tb, limit=None, file=None):
logger.error(traceback.format_exc())
old_print_exception(etype, value, tb, limit=None, file=None)
traceback.print_exception = custom_print_exception
add_custom_print_exception()
def ratelimit():
ratelimitSemaphore.acquire()
eventlet.greenthread.spawn_after(config.ratelimitRate, ratelimitSemaphore.release)
return
class Board(object):
"""Each Board manages polling the site and fetching updated
threads and queueing DB updates and media downloads."""
def __init__(self, board):
super(Board, self).__init__()
self.board = board
self.threads = {}
self.insertQueue = eventlet.queue.Queue()
self.insertQuery = insertQuery.format(board=board)
self.updateQuery = updateQuery.format(board=board)
self.threadUpdateQueue = eventlet.queue.PriorityQueue()
self.mediaFetcher = MediaFetcher(board)
#check for board tables and create them if necessary
with connectionPool.item() as conn:
c = conn.cursor()
c.execute("SHOW TABLES LIKE '{board}'".format(board = board))
if c.rowcount == 0:
self.createTables()
elif c.rowcount == 1:
pass
else:
logger.error("Something weird happened when checking to see if the board tables exist!")
logger.error("board: {} rowcount: {}".format(board, c.rowcount))
eventlet.spawn(self.threadListUpdater)
eventlet.spawn(self.threadUpdateQueuer)
eventlet.spawn(self.inserter)
def createTables(self):
logger.warning("creating tables for "+self.board)
with connectionPool.item() as conn:
c = conn.cursor()
for statement in createTablesQuery.split('\n\n\n'):
c.execute(statement.format(board = self.board))
conn.commit()
def markDeleted(self, postID):
logger.debug("post {}/{} deleted".format(self.board, postID))
with connectionPool.item() as conn:
c = conn.cursor()
c.execute(updateDeletedQuery.format(board = self.board), (int(time.time()), postID))
conn.commit()
def threadListUpdater(self):
logger.debug('threadListUpdater for {} started'.format(self.board))
while True:
evt = eventlet.event.Event()
# V high but not maximum priority allows threads that are about to be deleted to be fetched first
scraper.get(2, "https://a.4cdn.org/{}/threads.json".format(self.board), evt)
threadsJson = evt.wait().json()
utils.status('fetched {}/threads.json'.format(self.board), linefeed=True)
tmp = []
for page in threadsJson:
for thread in page['threads']:
tmp.append(thread)
for priority, thread in enumerate(tmp[::-1]):#fetch oldest threads first
if thread['no'] not in self.threads:
logger.debug("Thread %s is new, queueing", thread['no'])
self.threads[thread['no']] = thread
self.threads[thread['no']]['posts'] = {} #used to track seen posts
self.threads[thread['no']]['update_queued'] = True
self.threadUpdateQueue.put((priority, thread['no']))
elif thread['last_modified'] != self.threads[thread['no']]['last_modified']: #thread updated
if not self.threads[thread['no']].get('update_queued', False):
logger.debug("Thread %s is updated, queueing", thread['no'])
self.threadUpdateQueue.put((priority, thread['no']))
self.threads[thread['no']]['last_modified'] = thread['last_modified']
self.threads[thread['no']]['update_queued'] = True
#Clear old threads from memory
newThreads = [x['no'] for x in tmp]
for thread in self.threads:
if thread not in newThreads:
logger.debug("thread {}/{} archived".format(self.board, thread))
eventlet.greenthread.spawn_after(1, self.threads.pop, thread) #can't modify dict while iterating over it - lazy solution
eventlet.sleep(config.boardUpdateDelay)
def threadUpdateQueuer(self):
logger.debug('threadUpdateQueuer for {} started'.format(self.board))
while True:
task = self.threadUpdateQueue.get()
eventlet.greenthread.spawn_n(self.updateThread, task)
self.threadUpdateQueue.task_done()
def updateThread(self, task):
'''Fetch thread and queue changes'''
priority, thread = task
while True:
evt = eventlet.event.Event()
scraper.get(priority, "https://a.4cdn.org/{}/thread/{}.json".format(self.board, thread), evt)
r = evt.wait()
if r.status_code == 404:
utils.status("404'd: {}/{}".format(self.board, thread), linefeed=True)
try:
del self.threads[thread]
except KeyError:
#threadListUpdater may delete threads from the internal threadlist before this
#having the updater unqueue the request would save a request, but be harder to implement
#well thought out pull requests that don't shit up the codebase welcome
pass
return
else:
utils.status("fetched {}/{}".format(self.board, thread), linefeed=True)
try:
r = r.json()
except json.decoder.JSONDecodeError:
continue #4chan/CloudFlare sometimes sends invalid JSON; try again
break
self.threads[thread]['update_queued'] = False
logger.debug("adding {} {} posts to queue".format(len(r['posts']), self.board))
for post in r['posts']:
post['board'] = self.board
oldPost = self.threads[thread]['posts'].get(post['no'])
if post != oldPost: #post is new or has been modified since we last saw it
self.threads[thread]['posts'][post['no']] = post
self.insertQueue.put(post)
for postID in self.threads[thread]['posts']: #Check for deletions
if postID not in [post['no'] for post in r['posts']]:
self.markDeleted(postID)
def inserter(self):
logger.debug('self for {} started'.format(self.board))
while True:
post = self.insertQueue.get()
with connectionPool.item() as conn:
utils.status("processing post {}:{}".format(post['board'], post['no']))
result = conn.cursor().execute(insertQuery.format(board=post['board']),
(post['no'], #post number
post['resto'] if post['resto'] != 0 else post['no'], #resto is RESponse TO (thread number)
0 if post['resto'] != 0 else 1,
post.get('time', None),
str(post.get('tim')) + "s.jpg" if post.get('tim') else None,
post.get('tn_w', 0),
post.get('tn_h', 0),
post['filename']+post['ext'] if 'md5' in post else None,
post.get('w', 0),
post.get('h', 0),
post.get('fsize', 0),
post.get('md5', None),
str(post['tim'])+post['ext'] if 'md5' in post else None,
post.get('spoiler', 0),
0,
post.get('capcode', "N")[0].upper(),
None,
utils.doClean(post.get('name', 'Anonymous')),
post.get('trip', None),
utils.doClean(post.get('sub', None)),
utils.doCleanFull(post.get('com', None)),
None, #No idea if this is right
post.get('sticky', 0),
post.get('closed', 0),
"Dev" if post.get('id', None) == "Developer" else post.get('id', None),
post.get('country', None),
None, #The setter for this in Asagi is never referenced anywhere, so this should always be null, right?
post['no'], #post number
post['no'], #post number
))
result = conn.cursor().execute(updateQuery.format(board=post['board']),
(post.get('com', None),
0,
post.get('filename', None),
post.get('sticky', 0),
post.get('closed', 0),
post['no'], #post number
post['resto'] if post['resto'] != 0 else post['no'], #resto is RESponse TO (thread number)
))
conn.commit()
if post.get('md5', False) and (getattr(config, "downloadMedia", False) or getattr(config, "downloadThumbs", False)): #queue media download
self.mediaFetcher.put(post)
self.insertQueue.task_done()
class Scraper(object):
"""Manages access to the 4chan API. Satisfies requests to
the API in priority order without violating the ratelimit"""
def __init__(self):
super(Scraper, self).__init__()
self.requestQueue = eventlet.queue.PriorityQueue()
eventlet.spawn(self.fetcher)
def get(self, priority, url, evt):
self.requestQueue.put((priority, Request(url,evt)))
def fetcher(self):
logger.debug('scraper loop started')
while True:
ratelimit()
request = self.requestQueue.get()[1]
response = self.download(request)
request.event.send(response)
self.requestQueue.task_done()
def download(self, request):
while True:
delay = 5
response = None
try:
logger.debug('fetching url %s', request.url)
response = cfScraper.get(request.url)
response.raise_for_status()
return response
except Exception as e:
if isinstance(e, erequests.HTTPError):
if response.status_code == 404: #let caller handle 404s
return response
#For everything else, log it and try again
logger.warning('{} while fetching {}, will try again in {} seconds'.format(e.__class__.__name__, request.url, delay))
logger.warning('exception args: '+repr(e.args)) #not sure how useful this will be
eventlet.sleep(delay)
delay = min(delay + 5, 300)
continue
class MediaFetcher(object):
"""Handles media downloads for a single board. Instantiated by each Board.
doesn't support the old directory structure; does anyone care?"""
def __init__(self, board):
super(MediaFetcher, self).__init__()
self.mediaDLQueue = eventlet.queue.Queue()
self.selectMediaQuery = selectMediaQuery.format(board = board)
self.board = board
eventlet.spawn(self.fetcher)
def put(self, post):
self.mediaDLQueue.put(post)
def fetcher(self):
while True:
post = self.mediaDLQueue.get()
logger.debug('fetching media %s', post['md5'])
if getattr(config, "downloadMedia", False):
self.download(post['no'], post['resto'] == 0, False, post['tim'], post['ext'], post['md5'])
if getattr(config, "downloadThumbs", False):
self.download(post['no'], post['resto'] == 0, True, post['tim'], post['ext'], post['md5'])
self.mediaDLQueue.task_done()
utils.status()
def download(self, postNum, isOp, isPreview, tim, ext, mediaHash):
#Local.java:198
#Get metadata from DB
with connectionPool.item() as conn:
c = conn.cursor()
result = c.execute(self.selectMediaQuery, (mediaHash,))
assert result == 1
mediaRow = MediaRow(*c.fetchone())
if mediaRow.banned:
logger.info('Skipping download of banned file ', mediaHash)
return
#determine filename
# Added to DB by insert_image_<board> procedure - triggered by before-ins-<board>
if isPreview:
filename = mediaRow.preview_op if isOp else mediaRow.preview_reply
else:
filename = mediaRow.media
#if(filename == null) return;
if filename == None:
logger.warning("media download failed to determine destination filename")
logger.warning("post {} hash {}".format(postNum, mediaHash))
return
#make directories
subdirs = (filename[:4], filename[4:6])
destinationFolder = "{}/{}/{}/{}".format(config.imageDir+"/"+self.board, "thumb" if isPreview else "image", *subdirs) #FIXME use os.path.join
os.makedirs(destinationFolder, exist_ok = True) #TODO maybe just skip this and use os.renames at the end?
#set perms on directories
#TODO
#determine final file path, and bail if it already exists
destinationPath = destinationFolder + os.sep + filename
if os.path.exists(destinationPath):
logger.debug('skipping download of already downloaded media')
logger.debug("post {} hash {}".format(postNum, mediaHash))
return
#download the URL into a tempfile
tmp = tempfile.NamedTemporaryFile(delete = False, dir=destinationFolder, suffix="_tmp") #FIXME handle leaks on error
url = "https://i.4cdn.org/{}/{}{}{}".format(self.board, tim, "s" if isPreview else "", ".jpg" if isPreview else ext)
delay = 5
while True:
try:
logger.debug('fetching media: post {} hash {}'.format(postNum, mediaHash))
request = cfScraper.get(url)
request.raise_for_status()
break
except Exception as e:
if isinstance(e, erequests.HTTPError):
if request.status_code == 404: #404s are to be expected, just bail when they happen
logger.info("404 when downloading media")
logger.info("post {} hash {}".format(postNum, mediaHash))
return
# log everything else and try again
logger.warning('{} while fetching media post {} hash {}, will try again in {} seconds'.format(e.__class__.__name__, postNum, mediaHash, delay))
logger.warning('exception args: '+repr(e.args)) #not sure how useful this will be
eventlet.sleep(delay)
delay = min(delay + 5, 300)
continue
for chunk in request.iter_content(chunk_size=1024*512):
tmp.write(chunk)
tmp.close()
#move the tempfile to the final file path
#Temp file and destination are on same device so rename is atomic
os.rename(tmp.name, destinationPath)
#set permissions on file path
#webGroupId is never set in asagi, so should we even do this? Is this even relevant today?
# os.chmod(destinationPath, 0o644)
#posix.chown(outputFile.getCanonicalPath(), -1, this.webGroupId);
logger.debug('downloaded media: {}/{}'.format(self.board, filename))
scraper = Scraper()
if config.boardUpdateDelay < len(config.boards)*2:
newDelay = len(config.boards)*2
logger.warning("boardUpdateDelay is too low for the number of configured boards! Increasing delay to %s", newDelay)
config.boardUpdateDelay = newDelay
for board in config.boards:
boards.append(Board(board))
logger.debug("created Board %s", board)
utils.setObjects(boards, scraper, config) #pass these to utils for easy referencing in status code
if __name__ == "__main__":
eventlet.spawn(eventlet.backdoor.backdoor_server, eventlet.listen(('localhost', 3000)), locals())
while True:
eventlet.sleep(1) #This busy loop keeps all the threads running - this can't possibly be how I'm supposed to do things!