Skip to content

Commit

Permalink
Merge pull request #13 from villmow/faster_extraction
Browse files Browse the repository at this point in the history
Faster extraction
  • Loading branch information
jcpeterson authored Mar 18, 2020
2 parents 56a78f0 + f4f9aeb commit 26ee7bf
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 42 deletions.
99 changes: 57 additions & 42 deletions extract_text.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from __future__ import print_function
from __future__ import division

from glob import glob
import os.path as op
import argparse, time, tarfile
import multiprocessing as mpl
from glob import glob
from hashlib import md5
import multiprocessing as mpl
import os.path as op
import pathlib as pl

import newspaper
from tqdm import tqdm

from utils import mkdir, chunks, extract_month


parser = argparse.ArgumentParser()
parser.add_argument("--html_archive", type=str, default="scraped/RS_2017-04-4_data.xz")
parser.add_argument("--chunk_size", type=int, default=100)
Expand All @@ -19,20 +22,20 @@
args = parser.parse_args()


def parse_file(file_entry):
file_name, html = file_entry
url_hash = md5(html).hexdigest()
article = newspaper.Article(url=url_hash, fetch_images=False)
article.set_html(html)
article.parse()
return (file_name, article.text)
def parse_file(filename):
with open(filename, "rt") as f:
html = f.read()
url_hash = md5(html.encode("utf-8")).hexdigest()
article = newspaper.Article(url=url_hash, fetch_images=False)
article.set_html(html)
article.parse()
return filename, article.text


def save_parsed_text(parsed_entries, out_dir):
for fn, txt in parsed_entries:
txt_fp = op.join(out_dir, fn)
with open(txt_fp, "w") as handle:
handle.write(txt)
def save_parsed_file(filename, text, out_dir):
txt_fp = out_dir / filename.name
with open(txt_fp, "wt") as handle:
handle.write(text)


def get_processed_files(out_dir):
Expand All @@ -41,33 +44,45 @@ def get_processed_files(out_dir):


def parse_archive(archive_fp, out_dir, n_procs, chunk_size=100):
processed = get_processed_files(out_dir)
with tarfile.open(archive_fp, "r") as tf:
files = list(set(tf.getnames()) - set(processed))
if len(files) == 0:
return

if len(processed) > 0:
print("{} files already processed.".format(len(processed)))

pool = mpl.Pool(n_procs)
for ci, chunk in enumerate(chunks(files, chunk_size)):
file_entries = [(fn, tf.extractfile(fn).read()) for fn in chunk]

t1 = time.time()
parsed = list(pool.imap(parse_file, file_entries, chunksize=1))

# remove empty strings from output
parsed = [p for p in parsed if len(p[1]) != 0]

hit_rate = len(parsed) / len(chunk) * 100
print("Parsing chunk {} took {} seconds".format(ci + 1, time.time() - t1))
print(" -- {}% of chunk {}'s docs yielded text.".format(hit_rate, ci + 1))

t1 = time.time()
save_parsed_text(parsed, out_dir)
print("Saving chunk {} took {} seconds".format(ci + 1, time.time() - t1))

tmp_data_dir = pl.Path(archive_fp).with_suffix(".tmp")

# extract tar first
if tmp_data_dir.exists():
raise FileExistsError("Trying to extract archive to {}".format(tmp_data_dir))
else:
tar = tarfile.open(archive_fp)
tar.extractall(tmp_data_dir)
tar.close()

# get files to process
processed_files = set(get_processed_files(out_dir))
num_total_files = len([_ for _ in tmp_data_dir.iterdir()])
num_remaining_files = num_total_files - len(processed_files)
print("{}/{} files already processed.".format(len(processed_files), num_total_files))

def file_gen():
for filename in tmp_data_dir.iterdir():
if filename.name not in processed_files and filename.is_file():
yield filename

out_dir = pl.Path(out_dir)
unparsable = 0

with mpl.Pool(n_procs) as pool:
for filename, text in tqdm(pool.imap(parse_file, file_gen(), chunksize=chunk_size), total=num_remaining_files):
if not text:
unparsable += 1
continue

save_parsed_file(filename, text, out_dir)
print("Could not parse {} files".format(unparsable))

# remove the extracted files
for filename in tmp_data_dir.iterdir():
if filename.is_file():
filename.unlink()
# and then the now (hopefully) empty directory
tmp_data_dir.rmdir()

if __name__ == "__main__":
month = extract_month(args.html_archive)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ soupsieve==1.8
spacy
tinysegmenter==0.3
tldextract==2.2.0
tqdm==4.32.2
urllib3==1.24.1
urlparse2==1.1.1
pycurl==7.21.5

0 comments on commit 26ee7bf

Please sign in to comment.