Skip to content

Commit 3595189

Browse files
committed
OpenConceptLab/ocl_issues#2035 Fix new bulk import queuing
1 parent f10e0f9 commit 3595189

File tree

4 files changed

+57
-41
lines changed

4 files changed

+57
-41
lines changed

core/importers/importer.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@
99
import tempfile
1010
import zipfile
1111
from zipfile import ZipFile
12-
1312
from celery.result import AsyncResult, result_from_tuple
14-
from celery import group, chain, chord
13+
from celery import group, chain
1514

1615
import ijson
1716
import requests
@@ -216,7 +215,7 @@ class Importer:
216215
owner_type: str
217216
owner: str
218217
import_type: str = 'default'
219-
BATCH_SIZE: int = 100
218+
MIN_BATCH_SIZE: int = 50
220219
IMPORT_CACHE: str = "import_cache/"
221220

222221
# pylint: disable=too-many-arguments
@@ -376,14 +375,24 @@ def traverse_dependencies(self, package_file, path, resource_types, dependencies
376375

377376
def prepare_tasks(self, resource_types, packages, resources):
378377
tasks = []
378+
# Count all items to determine batch size
379+
all_count = 0
380+
for resource, item in resources.items():
381+
for filepath, count in item.items():
382+
all_count += count
383+
if all_count > 50000:
384+
task_batch_size = (all_count / 1000)
385+
else:
386+
task_batch_size = self.MIN_BATCH_SIZE
387+
379388
# Import in groups in order. Resources within groups are imported in parallel.
380389
for package in packages:
381390
# Import dependencies in order.
382391
for resource_type in resource_types:
383392
# Import resource types in order.
384393
files = []
385394
groups = []
386-
batch_size = self.BATCH_SIZE
395+
batch_size = task_batch_size
387396
for filepath, count in resources.get(resource_type).items():
388397
if not filepath.startswith(package):
389398
continue
@@ -403,11 +412,16 @@ def prepare_tasks(self, resource_types, packages, resources):
403412
batch_size -= end_index - start_index
404413
start_index = end_index
405414

406-
if batch_size <= 0 or start_index == end_index:
415+
if batch_size <= 0:
407416
groups.append({"path": package, "username": self.username, "owner_type": self.owner_type,
408417
"owner": self.owner, "resource_type": resource_type, "files": files})
409418
files = []
410-
batch_size = self.BATCH_SIZE
419+
batch_size = task_batch_size
420+
421+
if files:
422+
# Append last task to the group
423+
groups.append({"path": package, "username": self.username, "owner_type": self.owner_type,
424+
"owner": self.owner, "resource_type": resource_type, "files": files})
411425

412426
if groups:
413427
tasks.append(groups)
@@ -426,7 +440,7 @@ def schedule_tasks(self, tasks):
426440
if len(group_tasks) == 1: # Prevent celery from converting group to a single task
427441
group_tasks.append(bulk_import_subtask_empty.si().set(queue='concurrent'))
428442

429-
chained_tasks |= chord(group(group_tasks), bulk_import_subtask_empty.si().set(queue='concurrent'))
443+
chained_tasks |= group(group_tasks)
430444
chained_tasks |= import_finisher.si(self.task_id).set(queue='concurrent')
431445

432446
final_task = chained_tasks.apply_async(queue='concurrent')

core/importers/tests.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2064,36 +2064,38 @@ def test_prepare_tasks(self):
20642064
'CodeSystem': {'/package1/path4:': 10},
20652065
'ConceptMap': {'/package2/path5': 250}
20662066
})
2067-
20682067
self.assertEqual(tasks, [
2069-
# Executed in sequence
2070-
[ # Executed in parallel
2071-
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2072-
'resource_type': 'ValueSet', 'files': [{'filepath': 'path3', 'start_index': 0, 'end_index': 50}]}
2073-
], [ # Executed in parallel
2074-
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2075-
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 0, 'end_index': 100}]},
2076-
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2077-
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 100,
2078-
'end_index': 200}]},
2079-
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2080-
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 200, 'end_index': 250}]}
2081-
], [ # Executed in parallel
2082-
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2083-
'resource_type': 'CodeSystem', 'files': [{'filepath': 'path4:', 'start_index': 0, 'end_index': 10}]}
2084-
], [ # Executed in parallel
2085-
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2086-
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 0, 'end_index': 100}]},
2087-
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2088-
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 100, 'end_index': 101}]},
2089-
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2090-
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 0, 'end_index': 100}]},
2091-
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2092-
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 100, 'end_index': 200}]},
2093-
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2094-
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 200,
2095-
'end_index': 299}]}
2096-
]])
2068+
[{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2069+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path3', 'start_index': 0, 'end_index': 50}]}],
2070+
[{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2071+
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 0, 'end_index': 50}]},
2072+
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2073+
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 50, 'end_index': 100}]},
2074+
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2075+
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 100, 'end_index': 150}]},
2076+
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2077+
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 150, 'end_index': 200}]},
2078+
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2079+
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 200, 'end_index': 250}]}],
2080+
[{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2081+
'resource_type': 'CodeSystem', 'files': [{'filepath': 'path4:', 'start_index': 0, 'end_index': 10}]}],
2082+
[{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2083+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 0, 'end_index': 50}]},
2084+
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2085+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 50, 'end_index': 100}]},
2086+
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2087+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 100, 'end_index': 101},
2088+
{'filepath': 'path2', 'start_index': 0, 'end_index': 49}]},
2089+
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2090+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 49, 'end_index': 99}]},
2091+
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2092+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 99, 'end_index': 149}]},
2093+
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2094+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 149, 'end_index': 199}]},
2095+
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2096+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 199, 'end_index': 249}]},
2097+
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
2098+
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 249, 'end_index': 299}]}]])
20972099

20982100
hl7_fhir_fr_core_resources = {
20992101
'CodeSystem': {'http://fetch/npm/package/package/CodeSystem-fr-core-cs-circonstances-sortie.json': 1,

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ services:
88
healthcheck:
99
test: "pg_isready -U postgres"
1010
volumes:
11-
- postgres-data:/usr/share/postgres/data
11+
- postgres-data:/var/lib/postgresql/data
1212
redis:
1313
image: bitnami/redis:7.0.12
1414
restart: "always"

requirements.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ python-dateutil==2.8.2
1414
requests==2.32.3
1515
factory_boy==3.3.0
1616
# Celery
17-
celery[redis]==5.3.1
17+
celery[redis]==5.4.0
1818
celery_once==3.0.1
1919
git+https://github.com/snyaggarwal/flower # use until https://github.com/mher/flower/issues/1231 issue is resolved
20-
redis==4.6.0
21-
django-redis==5.3.0 # needed for redis sentinel support
22-
kombu==5.3.1
20+
redis==5.2.1
21+
django-redis==5.4.0 # needed for redis sentinel support
22+
kombu==5.4.2
2323
django-elasticsearch-dsl==7.3
2424
drf-yasg==1.21.5
2525
git+https://github.com/snyaggarwal/django-queryset-csv

0 commit comments

Comments
 (0)