From 29a2549ee8b9d1e18003ba9897556b2fcd7effae Mon Sep 17 00:00:00 2001 From: eukaryote Date: Sat, 9 Mar 2019 10:47:04 -0700 Subject: [PATCH] Add sqlite --- download.py | 227 +++++++++++++++++++++++++++++++++++++--------------- scrapers.py | 32 +++++--- utils.py | 37 ++++++++- 3 files changed, 217 insertions(+), 79 deletions(-) diff --git a/download.py b/download.py index 721b52e..6108ef3 100644 --- a/download.py +++ b/download.py @@ -5,16 +5,21 @@ import json import tarfile import warnings -import argparse +import argparse +import os import os.path as op from glob import glob from hashlib import md5 import multiprocessing as mpl +from tqdm import tqdm +import sqlite3 +import tldextract +import math # for backward compatibility from six.moves.urllib.request import urlopen - -from utils import mkdir, chunks, extract_month + +from utils import mkdir, chunks, extract_month, linecount from scrapers import bs4_scraper, newspaper_scraper, raw_scraper parser = argparse.ArgumentParser() @@ -33,8 +38,8 @@ ) parser.add_argument( "--n_procs", - type=int, - default=1, + type=int, + default=1, help="how many processes (cores) to use for parallel scraping", ) parser.add_argument( @@ -87,20 +92,26 @@ default=False, help="whether to show warnings in general during scraping", ) + +parser.add_argument( + "--sqlite_meta", + action="store_true", + default=False, + help="whether to use sqlite for storing metadata", +) args = parser.parse_args() if not args.show_warnings: # avoid lots of datetime warnings warnings.filterwarnings("ignore") - -def load_urls(url_file, completed_fids, max_urls=-1): - with open(url_file) as fh: - url_entries = [ - (fid, url) for (fid, url) in enumerate(fh) if fid not in completed_fids - ] - if max_urls != -1: - url_entries = url_entries[:max_urls] + +def load_urls(fh, completed_fids, max_urls=-1): + url_entries = ( + (fid, url) for (fid, url) in enumerate(fh) if fid not in completed_fids + ) + if max_urls != -1: + url_entries = list(url_entries)[:max_urls] return url_entries @@ -127,12 +138,25 @@ def download( url_entry, scraper=args.scraper, save_uncompressed=args.save_uncompressed, - memoize=args.scraper_memoize, + memoize=args.scraper_memoize, + arch_meta=not args.sqlite_meta ): + uid, url = url_entry url = url.strip() fid = "{:07d}-{}".format(uid, md5(url.encode()).hexdigest()) + data_dir = mkdir(op.join(args.output_dir, "data")) + text_fp = op.join(data_dir, "{}.txt".format(fid)) + + if arch_meta: + meta_dir = mkdir(op.join(args.output_dir, "meta")) + meta_fp = op.join(meta_dir, "{}.json".format(fid)) + + # already downloaded! + if op.exists(text_fp): + return + # is_good_link, link_type = vet_link(url) # if not is_good_link: # return @@ -145,31 +169,41 @@ def download( scrape = raw_scraper text, meta = scrape(url, memoize) + + # add domain column + ext = tldextract.extract(url) + domain = '.'.join([x for x in ext if x]) + meta["domain"] = domain + if text is None or text.strip() == "": - return ("", "", fid, uid) + return ("", meta, fid, uid) if save_uncompressed: month = extract_month(args.url_file) data_dir = mkdir(op.join(args.output_dir, "data", month)) - meta_dir = mkdir(op.join(args.output_dir, "meta", month)) text_fp = op.join(data_dir, "{}.txt".format(fid)) - meta_fp = op.join(meta_dir, "{}.json".format(fid)) with open(text_fp, "w") as out: out.write(text) - with open(meta_fp, "w") as out: - json.dump(meta, out) + if arch_meta: + meta_dir = mkdir(op.join(args.output_dir, "meta", month)) + meta_fp = op.join(meta_dir, "{}.json".format(fid)) + with open(meta_fp, "w") as out: + json.dump(meta, out) return (text, meta, fid, uid) - - -def archive_chunk(month, cid, cdata, out_dir, fmt): + + +def archive_chunk(month, cid, cdata, out_dir, fmt, arch_meta): mkdir(out_dir) texts, metas, fids, uids = zip(*cdata) data_tar = op.join(out_dir, "{}-{}_data.{}".format(month, cid, fmt)) - meta_tar = op.join(out_dir, "{}-{}_meta.{}".format(month, cid, fmt)) - tar_fps, texts, exts = [data_tar, meta_tar], [texts, metas], ["txt", "json"] + if arch_meta: + meta_tar = op.join(out_dir, "{}-{}_meta.{}".format(month, cid, fmt)) + tar_fps, texts, exts = [data_tar, meta_tar], [texts, metas], ["txt", "json"] + else: + tar_fps, texts, exts = [data_tar], [texts], ["txt"] doc_count = 0 docs_counted = False @@ -193,7 +227,7 @@ def archive_chunk(month, cid, cdata, out_dir, fmt): return doc_count - + ####################################################################### # Util functions # ####################################################################### @@ -219,51 +253,114 @@ def set_state(state_fp, cdata): handle.write("{}\n".format(uid)) +def sqlite_conn(): + conn = sqlite3.connect('metadata.db') + conn.execute(''' + CREATE TABLE IF NOT EXISTS metadata ( + fid char(64) not null primary key, + month char(32) null, + url varchar(2048) not null, + domain varchar(255) not null, + word_count int null, + elapsed int null, + scraper varchar(255) not null, + success boolean not null + ); + ''') + conn.execute(''' + CREATE INDEX IF NOT EXISTS ix_meta_url ON metadata(url); + ''') + conn.execute(''' + CREATE INDEX IF NOT EXISTS ix_meta_domain ON metadata(domain); + ''') + conn.execute(''' + CREATE INDEX IF NOT EXISTS ix_meta_month ON metadata(month); + ''') + + return conn + + if __name__ == "__main__": + if args.sqlite_meta: + conn = sqlite_conn() + cur = conn.cursor() + month = extract_month(args.url_file) # in case we are resuming from a previous run completed_uids, state_fp, prev_cid = get_state(month, args.output_dir) - # URLs we haven't scraped yet (if first run, all URLs in file) - url_entries = load_urls(args.url_file, completed_uids, args.max_urls) - - pool = mpl.Pool(args.n_procs) - - # process one "chunk" of args.chunk_size URLs at a time - for i, chunk in enumerate(chunks(url_entries, args.chunk_size)): - cid = prev_cid + i + 1 - - print("Downloading chunk {}".format(cid)) - t1 = time.time() - - if args.timeout > 0: - # imap as iterator allows .next() w/ timeout. - # ordered version doesn't seem to work correctly. - # for some reason, you CANNOT track j or chunk[j] in the loop, - # so don't add anything else to the loop below! - # confusingly, chunksize below is unrelated to our chunk_size - chunk_iter = pool.imap_unordered(download, chunk, chunksize=1) - cdata = [] - for j in range(len(chunk)): - try: - result = chunk_iter.next(timeout=args.timeout) - cdata.append(result) - except mpl.TimeoutError: - print(" --- Timeout Error --- ") - else: - cdata = list(pool.imap(download, chunk, chunksize=1)) - - set_state(state_fp, cdata) - print("{} / {} downloads timed out".format(len(chunk) - len(cdata), len(chunk))) - print("Chunk time: {} seconds".format(time.time() - t1)) - - # archive and save this chunk to file - if args.compress: - print("Compressing...") - t2 = time.time() - count = archive_chunk(month, cid, cdata, args.output_dir, args.compress_fmt) - print("Archive created in {} seconds".format(time.time() - t2)) - print("{} out of {} URLs yielded content\n".format(count, len(chunk))) + with open(args.url_file) as fh: + # URLs we haven't scraped yet (if first run, all URLs in file) + url_entries = load_urls(fh, completed_uids, args.max_urls) + + pool = mpl.Pool(args.n_procs) + + # process one "chunk" of args.chunk_size URLs at a time + total = math.ceil(linecount(args.url_file) / args.chunk_size) + print('Total chunks: ', total) + + for i, chunk in tqdm(enumerate(chunks(url_entries, args.chunk_size)), + total=total): + cid = prev_cid + i + 1 + + tqdm.write("Downloading chunk {}".format(cid)) + t1 = time.time() + + if args.timeout > 0: + # imap as iterator allows .next() w/ timeout. + # ordered version doesn't seem to work correctly. + # for some reason, you CANNOT track j or chunk[j] in the loop, + # so don't add anything else to the loop below! + # confusingly, chunksize below is unrelated to our chunk_size + chunk_iter = pool.imap_unordered(download, chunk, chunksize=1) + cdata = [] + for j in range(len(chunk)): + try: + result = chunk_iter.next(timeout=args.timeout) + cdata.append(result) + except mpl.TimeoutError: + tqdm.write(" --- Timeout Error --- ") + else: + cdata = list(pool.imap(download, chunk, chunksize=1)) + + set_state(state_fp, cdata) + tqdm.write("{} / {} downloads timed out".format(len(chunk) - len(cdata), len(chunk))) + tqdm.write("Chunk time: {} seconds".format(time.time() - t1)) + + # write metadata to sqlite + if args.sqlite_meta: + for text, meta, fid, _ in filter(lambda x: x, cdata): + if text: + params = ( + fid, + month, + meta["url"], + meta["domain"], + meta["elapsed"], + meta["word_count"], + meta["scraper"], + True + ) + else: + params = ( + fid, + month, + meta["url"], + meta["domain"], + None, + None, + meta["scraper"], + False + ) + cur.execute("insert or ignore into metadata (fid, month, url, domain, elapsed, word_count, scraper, success) values (?, ?, ?, ?, ?, ?, ?, ?)", params) + conn.commit() + # archive and save this chunk to file + if args.compress: + tqdm.write("Compressing...") + t2 = time.time() + count = archive_chunk(month, cid, cdata, args.output_dir, args.compress_fmt, not args.sqlite_meta) + tqdm.write("Archive created in {} seconds".format(time.time() - t2)) + tqdm.write("{} out of {} URLs yielded content\n".format(len(list(filter(lambda x: x and x[0], cdata))), len(chunk))) print("Done!") diff --git a/scrapers.py b/scrapers.py index 7ca186b..927fd71 100644 --- a/scrapers.py +++ b/scrapers.py @@ -28,7 +28,7 @@ def find_and_filter_tag(tag, soup): def raw_scraper(url, memoize): - t1 = time.time() + t1 = time.time() try: cleaner = Cleaner() @@ -39,17 +39,23 @@ def raw_scraper(url, memoize): html = minify(article.html) html = cleaner.clean_html(html) article.parse() - except: - return None, None + except: + return None, { + "url": url, + "scraper": "raw", + } if article.text == "": - return None, None + return None, { + "url": url, + "scraper": "raw", + } metadata = {"url": url, "elapsed": time.time() - t1, "scraper": "raw"} return html, metadata def newspaper_scraper(url, memoize): - t1 = time.time() + t1 = time.time() try: article = newspaper.Article(url, fetch_images=False, memoize_articles=memoize) @@ -57,8 +63,11 @@ def newspaper_scraper(url, memoize): article.parse() text = article.text count = len(text.split()) - except: - return None, None + except: + return None, { + "url": url, + "scraper": "newspaper", + } metadata = { "url": url, @@ -70,7 +79,7 @@ def newspaper_scraper(url, memoize): def bs4_scraper(url, memoize): - t1 = time.time() + t1 = time.time() try: article = newspaper.Article(url, fetch_images=False, memoize_articles=memoize) @@ -81,8 +90,11 @@ def bs4_scraper(url, memoize): # DDB: keep text as a single string for consistency with # newspaper_scraper text = " ".join(text) - except: - return None, None + except: + return None, { + "url": url, + "scraper": "bs4", + } metadata = { "url": url, diff --git a/utils.py b/utils.py index e802fb3..a10ef71 100644 --- a/utils.py +++ b/utils.py @@ -2,6 +2,7 @@ import os.path as op import tarfile import re +import collections def extract_month(url_file_name): @@ -11,10 +12,24 @@ def extract_month(url_file_name): return month -def chunks(l, n): - """Yield successive n-sized chunks from l.""" - for i in range(0, len(l), n): - yield l[i : i + n] +def chunks(l, n, s=0): + """Yield successive n-sized chunks from l, skipping the first s chunks.""" + if isinstance(l, collections.Iterable): + chnk = [] + for i, elem in enumerate(l): + if i < s: + continue + + chnk.append(elem) + if len(chnk) == n: + yield chnk + chnk = [] + if len(chnk) != 0: + yield chnk + + else: + for i in range(s, len(l), n): + yield l[i : i + n] def extract_archive(archive_fp, outdir="."): @@ -27,3 +42,17 @@ def mkdir(fp): if not op.exists(fp): os.makedirs(fp) return fp + + +def linecount(filename): + f = open(filename, 'rb') + lines = 0 + buf_size = 1024 * 1024 + read_f = f.raw.read + + buf = read_f(buf_size) + while buf: + lines += buf.count(b'\n') + buf = read_f(buf_size) + + return lines