Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 162 additions & 65 deletions download.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
import json
import tarfile
import warnings
import argparse
import argparse
import os
import os.path as op
from glob import glob
from hashlib import md5
import multiprocessing as mpl
from tqdm import tqdm
import sqlite3
import tldextract
import math

# for backward compatibility
from six.moves.urllib.request import urlopen
from utils import mkdir, chunks, extract_month

from utils import mkdir, chunks, extract_month, linecount
from scrapers import bs4_scraper, newspaper_scraper, raw_scraper

parser = argparse.ArgumentParser()
Expand All @@ -33,8 +38,8 @@
)
parser.add_argument(
"--n_procs",
type=int,
default=1,
type=int,
default=1,
help="how many processes (cores) to use for parallel scraping",
)
parser.add_argument(
Expand Down Expand Up @@ -87,20 +92,26 @@
default=False,
help="whether to show warnings in general during scraping",
)

parser.add_argument(
"--sqlite_meta",
action="store_true",
default=False,
help="whether to use sqlite for storing metadata",
)
args = parser.parse_args()

if not args.show_warnings:
# avoid lots of datetime warnings
warnings.filterwarnings("ignore")


def load_urls(url_file, completed_fids, max_urls=-1):
with open(url_file) as fh:
url_entries = [
(fid, url) for (fid, url) in enumerate(fh) if fid not in completed_fids
]
if max_urls != -1:
url_entries = url_entries[:max_urls]

def load_urls(fh, completed_fids, max_urls=-1):
url_entries = (
(fid, url) for (fid, url) in enumerate(fh) if fid not in completed_fids
)
if max_urls != -1:
url_entries = list(url_entries)[:max_urls]
return url_entries


Expand All @@ -127,12 +138,25 @@ def download(
url_entry,
scraper=args.scraper,
save_uncompressed=args.save_uncompressed,
memoize=args.scraper_memoize,
memoize=args.scraper_memoize,
arch_meta=not args.sqlite_meta
):

uid, url = url_entry
url = url.strip()
fid = "{:07d}-{}".format(uid, md5(url.encode()).hexdigest())

data_dir = mkdir(op.join(args.output_dir, "data"))
text_fp = op.join(data_dir, "{}.txt".format(fid))

if arch_meta:
meta_dir = mkdir(op.join(args.output_dir, "meta"))
meta_fp = op.join(meta_dir, "{}.json".format(fid))

# already downloaded!
if op.exists(text_fp):
return

# is_good_link, link_type = vet_link(url)
# if not is_good_link:
# return
Expand All @@ -145,31 +169,41 @@ def download(
scrape = raw_scraper

text, meta = scrape(url, memoize)

# add domain column
ext = tldextract.extract(url)
domain = '.'.join([x for x in ext if x])
meta["domain"] = domain

if text is None or text.strip() == "":
return ("", "", fid, uid)
return ("", meta, fid, uid)

if save_uncompressed:
month = extract_month(args.url_file)
data_dir = mkdir(op.join(args.output_dir, "data", month))
meta_dir = mkdir(op.join(args.output_dir, "meta", month))
text_fp = op.join(data_dir, "{}.txt".format(fid))
meta_fp = op.join(meta_dir, "{}.json".format(fid))

with open(text_fp, "w") as out:
out.write(text)
with open(meta_fp, "w") as out:
json.dump(meta, out)
if arch_meta:
meta_dir = mkdir(op.join(args.output_dir, "meta", month))
meta_fp = op.join(meta_dir, "{}.json".format(fid))
with open(meta_fp, "w") as out:
json.dump(meta, out)

return (text, meta, fid, uid)
def archive_chunk(month, cid, cdata, out_dir, fmt):


def archive_chunk(month, cid, cdata, out_dir, fmt, arch_meta):
mkdir(out_dir)
texts, metas, fids, uids = zip(*cdata)

data_tar = op.join(out_dir, "{}-{}_data.{}".format(month, cid, fmt))
meta_tar = op.join(out_dir, "{}-{}_meta.{}".format(month, cid, fmt))
tar_fps, texts, exts = [data_tar, meta_tar], [texts, metas], ["txt", "json"]
if arch_meta:
meta_tar = op.join(out_dir, "{}-{}_meta.{}".format(month, cid, fmt))
tar_fps, texts, exts = [data_tar, meta_tar], [texts, metas], ["txt", "json"]
else:
tar_fps, texts, exts = [data_tar], [texts], ["txt"]

doc_count = 0
docs_counted = False
Expand All @@ -193,7 +227,7 @@ def archive_chunk(month, cid, cdata, out_dir, fmt):

return doc_count


#######################################################################
# Util functions #
#######################################################################
Expand All @@ -219,51 +253,114 @@ def set_state(state_fp, cdata):
handle.write("{}\n".format(uid))


def sqlite_conn():
conn = sqlite3.connect('metadata.db')
conn.execute('''
CREATE TABLE IF NOT EXISTS metadata (
fid char(64) not null primary key,
month char(32) null,
url varchar(2048) not null,
domain varchar(255) not null,
word_count int null,
elapsed int null,
scraper varchar(255) not null,
success boolean not null
);
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS ix_meta_url ON metadata(url);
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS ix_meta_domain ON metadata(domain);
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS ix_meta_month ON metadata(month);
''')

return conn


if __name__ == "__main__":
if args.sqlite_meta:
conn = sqlite_conn()
cur = conn.cursor()

month = extract_month(args.url_file)

# in case we are resuming from a previous run
completed_uids, state_fp, prev_cid = get_state(month, args.output_dir)

# URLs we haven't scraped yet (if first run, all URLs in file)
url_entries = load_urls(args.url_file, completed_uids, args.max_urls)

pool = mpl.Pool(args.n_procs)

# process one "chunk" of args.chunk_size URLs at a time
for i, chunk in enumerate(chunks(url_entries, args.chunk_size)):
cid = prev_cid + i + 1

print("Downloading chunk {}".format(cid))
t1 = time.time()

if args.timeout > 0:
# imap as iterator allows .next() w/ timeout.
# ordered version doesn't seem to work correctly.
# for some reason, you CANNOT track j or chunk[j] in the loop,
# so don't add anything else to the loop below!
# confusingly, chunksize below is unrelated to our chunk_size
chunk_iter = pool.imap_unordered(download, chunk, chunksize=1)
cdata = []
for j in range(len(chunk)):
try:
result = chunk_iter.next(timeout=args.timeout)
cdata.append(result)
except mpl.TimeoutError:
print(" --- Timeout Error --- ")
else:
cdata = list(pool.imap(download, chunk, chunksize=1))

set_state(state_fp, cdata)
print("{} / {} downloads timed out".format(len(chunk) - len(cdata), len(chunk)))
print("Chunk time: {} seconds".format(time.time() - t1))

# archive and save this chunk to file
if args.compress:
print("Compressing...")
t2 = time.time()
count = archive_chunk(month, cid, cdata, args.output_dir, args.compress_fmt)
print("Archive created in {} seconds".format(time.time() - t2))
print("{} out of {} URLs yielded content\n".format(count, len(chunk)))
with open(args.url_file) as fh:
# URLs we haven't scraped yet (if first run, all URLs in file)
url_entries = load_urls(fh, completed_uids, args.max_urls)

pool = mpl.Pool(args.n_procs)

# process one "chunk" of args.chunk_size URLs at a time
total = math.ceil(linecount(args.url_file) / args.chunk_size)
print('Total chunks: ', total)

for i, chunk in tqdm(enumerate(chunks(url_entries, args.chunk_size)),
total=total):
cid = prev_cid + i + 1

tqdm.write("Downloading chunk {}".format(cid))
t1 = time.time()

if args.timeout > 0:
# imap as iterator allows .next() w/ timeout.
# ordered version doesn't seem to work correctly.
# for some reason, you CANNOT track j or chunk[j] in the loop,
# so don't add anything else to the loop below!
# confusingly, chunksize below is unrelated to our chunk_size
chunk_iter = pool.imap_unordered(download, chunk, chunksize=1)
cdata = []
for j in range(len(chunk)):
try:
result = chunk_iter.next(timeout=args.timeout)
cdata.append(result)
except mpl.TimeoutError:
tqdm.write(" --- Timeout Error --- ")
else:
cdata = list(pool.imap(download, chunk, chunksize=1))

set_state(state_fp, cdata)
tqdm.write("{} / {} downloads timed out".format(len(chunk) - len(cdata), len(chunk)))
tqdm.write("Chunk time: {} seconds".format(time.time() - t1))

# write metadata to sqlite
if args.sqlite_meta:
for text, meta, fid, _ in filter(lambda x: x, cdata):
if text:
params = (
fid,
month,
meta["url"],
meta["domain"],
meta["elapsed"],
meta["word_count"],
meta["scraper"],
True
)
else:
params = (
fid,
month,
meta["url"],
meta["domain"],
None,
None,
meta["scraper"],
False
)
cur.execute("insert or ignore into metadata (fid, month, url, domain, elapsed, word_count, scraper, success) values (?, ?, ?, ?, ?, ?, ?, ?)", params)
conn.commit()
# archive and save this chunk to file
if args.compress:
tqdm.write("Compressing...")
t2 = time.time()
count = archive_chunk(month, cid, cdata, args.output_dir, args.compress_fmt, not args.sqlite_meta)
tqdm.write("Archive created in {} seconds".format(time.time() - t2))
tqdm.write("{} out of {} URLs yielded content\n".format(len(list(filter(lambda x: x and x[0], cdata))), len(chunk)))

print("Done!")
32 changes: 22 additions & 10 deletions scrapers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def find_and_filter_tag(tag, soup):


def raw_scraper(url, memoize):
t1 = time.time()
t1 = time.time()

try:
cleaner = Cleaner()
Expand All @@ -39,26 +39,35 @@ def raw_scraper(url, memoize):
html = minify(article.html)
html = cleaner.clean_html(html)
article.parse()
except:
return None, None
except:
return None, {
"url": url,
"scraper": "raw",
}
if article.text == "":
return None, None
return None, {
"url": url,
"scraper": "raw",
}

metadata = {"url": url, "elapsed": time.time() - t1, "scraper": "raw"}
return html, metadata


def newspaper_scraper(url, memoize):
t1 = time.time()
t1 = time.time()

try:
article = newspaper.Article(url, fetch_images=False, memoize_articles=memoize)
article.download()
article.parse()
text = article.text
count = len(text.split())
except:
return None, None
except:
return None, {
"url": url,
"scraper": "newspaper",
}

metadata = {
"url": url,
Expand All @@ -70,7 +79,7 @@ def newspaper_scraper(url, memoize):


def bs4_scraper(url, memoize):
t1 = time.time()
t1 = time.time()

try:
article = newspaper.Article(url, fetch_images=False, memoize_articles=memoize)
Expand All @@ -81,8 +90,11 @@ def bs4_scraper(url, memoize):
# DDB: keep text as a single string for consistency with
# newspaper_scraper
text = " ".join(text)
except:
return None, None
except:
return None, {
"url": url,
"scraper": "bs4",
}

metadata = {
"url": url,
Expand Down
Loading