Skip to content

Commit

Permalink
polished comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mahinth1 committed Nov 2, 2024
1 parent a8f2cf3 commit fac299f
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions stages/02_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def download_pdf(url, file_output_dir, downloaded_hashes, session=None):
if session is None:
session = requests.Session()

# skip a particular url if a connection error happens
# RequestsException covers ConnectionError, HTTPError and JSONError subclasses
# skip an url if connection error occurs (RequestsException covers ConnectionError, HTTPError and JSONError)
try:
response = requests.get(f"http://api.scraperapi.com?api_key={scraperapi_key}&url={url}&render=true")
response.raise_for_status()
Expand All @@ -49,8 +48,7 @@ def download_pdf(url, file_output_dir, downloaded_hashes, session=None):
with outfile_path.open('wb') as file:
file.write(content)

# check validity of pdf
# Exception covers PdfReadError and other PyPDF errors
# check validity of pdf (Exception covers PdfReadError and other PyPDF errors)
try:
with outfile_path.open('rb') as file:
reader = pypdf.PdfReader(file)
Expand Down Expand Up @@ -84,16 +82,15 @@ def download_pdf(url, file_output_dir, downloaded_hashes, session=None):

# process each file
for file in input_dir.glob('*.parquet'):
df = pd.read_parquet(file)[:60000]
df = pd.read_parquet(file)[:150000]

file_stem = file.stem

# create output directory (url_00, url_01, ...) for downloaded pdfs
file_output_dir = output_dir / file_stem
file_output_dir.mkdir(parents=True, exist_ok=True)

# Get the latest row where content_hash is assigned
# Start from where it is left off (after last downloaded pdf) instead of starting from row 0
# Get the latest row where content_hash is assigned (start from where it is left off instead of starting from row 0)
metadata_file = metadata_output_dir / f"{file_stem}_pdfs.parquet"
existing_metadata = pd.read_parquet(metadata_file) if metadata_file.exists() else pd.DataFrame(columns=['content_hash'])

Expand All @@ -109,7 +106,7 @@ def download_pdf(url, file_output_dir, downloaded_hashes, session=None):

downloaded_hashes = set(existing_metadata['content_hash'].tolist())

# Execute download in parallel
# Execute download using multiple threads (I/O bound operation)
results_list = []
with ThreadPoolExecutor(max_workers=40) as executor:
results = list(tqdm(executor.map(
Expand All @@ -135,8 +132,8 @@ def download_pdf(url, file_output_dir, downloaded_hashes, session=None):
results_df = pd.DataFrame(results_list).drop_duplicates(subset=['doi','content_hash'])


# merge hash, pdf path, journal and publisher to original data
# handle issues when no new data is generated
# merge hash and pdf path to original data
# append or add updates or new data to existing file
if not results_df.empty:

# descriptors in the input parquet file
Expand All @@ -155,7 +152,6 @@ def download_pdf(url, file_output_dir, downloaded_hashes, session=None):


output_file = metadata_output_dir / f"{file_stem}_pdfs.parquet"
# append or add updates or new data to existing file
if output_file.exists():
existing_df = pd.read_parquet(output_file)
combined_df = pd.concat([existing_df, metadata_df], ignore_index=True).drop_duplicates(subset=['doi', 'content_hash'])
Expand Down

0 comments on commit fac299f

Please sign in to comment.