Skip to content

Commit

Permalink
chore: single process import (#265)
Browse files Browse the repository at this point in the history
to ease debug
  • Loading branch information
alexgarel authored Dec 2, 2024
1 parent fda44e1 commit 7c809ca
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion app/_import.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import abc
import math
import time
from datetime import datetime
from multiprocessing import Pool
from pathlib import Path
from typing import Iterator, cast

import elasticsearch
import tqdm
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
Expand Down Expand Up @@ -523,6 +525,16 @@ def run_items_import(
index = generate_index_object(next_index, config)
# create the index
index.save(using=es_client)
# it may take some time to create the index
for i in range(60):
try:
index.refresh()
break
except elasticsearch.NotFoundError:
logger.info("Index not ready, waiting 10 seconds")
time.sleep(10)
else:
raise RuntimeError("Index not ready after 600 seconds")
else:
# use current index
next_index = config.index.name
Expand All @@ -543,7 +555,15 @@ def run_items_import(
# run in parallel
num_errors = 0
with Pool(num_processes) as pool:
for i, success, errors in pool.starmap(import_parallel, args):
if num_processes > 1:
logger.info("Running in parallel with %d processes", num_processes)
result_iter = iter(pool.starmap(import_parallel, args))
else:
# run sequentially, it's easier to debug if we need it
# we won't use the pool in this case
logger.info("Running in a single processes")
result_iter = iter(map(lambda a: import_parallel(*a), args))
for i, success, errors in result_iter:
# Note: we log here instead of in sub-process because
# it's easier to avoid mixing logs, and it works better for pytest
logger.info("[%d] Indexed %d documents", i, success)
Expand Down

0 comments on commit 7c809ca

Please sign in to comment.