diff --git a/Dockerfile b/Dockerfile index de662710b..fd3e3067b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,7 +27,7 @@ RUN chown -R ocl:ocl $APP_HOME /temp /staticfiles WORKDIR $APP_HOME -RUN apk update && apk upgrade && apk add --no-cache libpq bash curl +RUN apk update && apk upgrade && apk add --no-cache libpq bash curl busybox-extras RUN pip install --upgrade pip diff --git a/core/code_systems/converter.py b/core/code_systems/converter.py new file mode 100644 index 000000000..c3ca2b2e0 --- /dev/null +++ b/core/code_systems/converter.py @@ -0,0 +1,52 @@ +from fhir.resources.codesystem import CodeSystem + +from core.settings import DEFAULT_LOCALE +from core.sources.models import Source + + +class CodeSystemConverter: + """It is intended as a replacement for serializers""" + + @staticmethod + def can_convert_from_fhir(obj): + return isinstance(obj, CodeSystem) or (isinstance(obj, dict) and obj.get('resourceType') == 'CodeSystem') + + @staticmethod + def can_convert_to_fhir(obj): + return isinstance(obj, Source) or (isinstance(obj, dict) and obj.get('type') == 'Source') + + @staticmethod + def to_fhir(source): + if isinstance(source, dict): + res = source + res_type = res.pop('type') + source = Source(**res) + res.update({'type', res_type}) + + code_system = { + 'id': source.mnemonic, + 'status': 'retired' if source.retired else 'active' if source.released else 'draft', + 'content': source.content_type if source.content_type else 'complete', + 'url': source.canonical_url, + 'title': source.name, + 'language': source.default_locale, + 'count': source.active_concepts, + } + return CodeSystem(**code_system).dict() + + @staticmethod + def from_fhir(code_system): + if isinstance(code_system, dict): + code_system = CodeSystem(**code_system) + + source = { + 'type': 'Source', + 'mnemonic': code_system.id, + 'canonical_url': code_system.url, + 'name': code_system.title, + 'default_locale': code_system.language if code_system.language else DEFAULT_LOCALE, + 'content_type': code_system.content, + 'retired': code_system.status == 'retired', + 'released': code_system.status == 'active', + } + return source diff --git a/core/code_systems/tests.py b/core/code_systems/tests.py index d8296fb6c..17296ae7f 100644 --- a/core/code_systems/tests.py +++ b/core/code_systems/tests.py @@ -1,7 +1,10 @@ import json +import responses +from fhir.resources.codesystem import CodeSystem from rest_framework.test import APIClient +from core.code_systems.converter import CodeSystemConverter from core.code_systems.serializers import CodeSystemDetailSerializer from core.code_systems.views import CodeSystemLookupNotFoundError from core.common.tests import OCLTestCase @@ -589,3 +592,18 @@ def test_unable_to_represent_as_fhir(self): self.assertDictEqual(serialized, { 'resourceType': 'OperationOutcome', 'issue': [{'severity': 'error', 'details': 'Failed to represent "/invalid/uri" as CodeSystem'}]}) + + def test_conversion_from_fhir(self): + code_system = CodeSystem(**{ + 'resourceType': 'CodeSystem', + 'id': 'test', + 'content': 'complete', + 'status': 'draft' + }) + source = CodeSystemConverter.from_fhir(code_system) + self.assertEqual(source['mnemonic'], 'test') + + def test_conversion_to_fhir(self): + source = OrganizationSourceFactory.build(mnemonic='test') + code_system = CodeSystemConverter.to_fhir(source) + self.assertEqual(code_system['id'], 'test') diff --git a/core/common/errbit.py b/core/common/errbit.py index fdcb40f84..9fefebd5e 100644 --- a/core/common/errbit.py +++ b/core/common/errbit.py @@ -174,11 +174,11 @@ def xml_raw(self, etype, value, trace, limit=None, file=None): original_print_exception = traceback.print_exception -def print_exception_with_errbit_logging(etype, value, tb, limit=None, file=None): +def print_exception_with_errbit_logging(etype, value, tb, limit=None, file=None, chain=True): if not (etype == KeyError and str(value) == "'cid'"): message = ERRBIT_LOGGER.xml_raw(etype, value, tb) ERRBIT_LOGGER.send_message(message.encode('utf-8')) - original_print_exception(etype, value, tb, limit=None, file=None) + original_print_exception(etype, value, tb, limit=limit, file=file, chain=chain) traceback.print_exception = print_exception_with_errbit_logging diff --git a/core/common/tasks.py b/core/common/tasks.py index 13d6a3824..ec56a02d6 100644 --- a/core/common/tasks.py +++ b/core/common/tasks.py @@ -271,6 +271,23 @@ def bulk_import_inline(to_import, username, update_if_exists): return BulkImportInline(content=to_import, username=username, update_if_exists=update_if_exists).run() +@app.task(base=QueueOnce, retry_kwargs={'max_retries': 0}) +def bulk_import_new(path, username, owner_type, owner, import_type='default'): + from core.importers.importer import Importer + return Importer(path, username, owner_type, owner, import_type).run() + + +@app.task(retry_kwargs={'max_retries': 0}) +def bulk_import_subtask(path, username, owner_type, owner, resource_type, files): + from core.importers.importer import ImporterSubtask + return ImporterSubtask(path, username, owner_type, owner, resource_type, files).run() + +@app.task +def chordfinisher(*args, **kwargs): + """Used for waiting for all results of a group of tasks. See Importer.run()""" + return "Done!" + + @app.task(bind=True, retry_kwargs={'max_retries': 0}) def bulk_import_parts_inline(self, input_list, username, update_if_exists): from core.importers.models import BulkImportInline diff --git a/core/importers/importer.py b/core/importers/importer.py new file mode 100644 index 000000000..57e5f1f83 --- /dev/null +++ b/core/importers/importer.py @@ -0,0 +1,351 @@ +import logging +from datetime import datetime +import tarfile +import tempfile +import zipfile +from typing import List +from zipfile import ZipFile + +import ijson as ijson +import requests +from pydantic import BaseModel + +from core.code_systems.serializers import CodeSystemDetailSerializer +from core.common.serializers import IdentifierSerializer +from core.common.tasks import bulk_import_subtask +from core.common.tasks import chordfinisher +from celery import chord, group, chain + +from core.code_systems.converter import CodeSystemConverter +from core.importers.models import SourceImporter, SourceVersionImporter, ConceptImporter, OrganizationImporter, \ + CollectionImporter, CollectionVersionImporter, MappingImporter, ReferenceImporter, CREATED, UPDATED, FAILED, \ + DELETED, NOT_FOUND, PERMISSION_DENIED, UNCHANGED +from core.sources.models import Source +from core.users.models import UserProfile + +logger = logging.getLogger('oclapi') + + +class ImporterUtils: + + @staticmethod + def fetch_to_temp_file(remote_file, temp): + for rf_block in remote_file.iter_content(1024): + temp.write(rf_block) + temp.flush() + + @staticmethod + def is_zipped_or_tarred(temp): + temp.seek(0) + is_zipped = zipfile.is_zipfile(temp) + if not is_zipped: + temp.seek(0) + is_tarred = tarfile.is_tarfile(temp) + temp.seek(0) + return is_zipped, is_tarred + + +class ImportResultSummaryResource(BaseModel): + type: str + total: int + imported: int = 0 + + +class ImportResultSummary(BaseModel): + resources: List[ImportResultSummaryResource] = [] + + +class ImportResult(BaseModel): + id: str + time_started: datetime = datetime.now() + time_finished: datetime = None + summary: ImportResultSummary = ImportResultSummary() + tasks: list = list() + + +class Importer: + path: str + username: str + owner_type: str + owner: str + import_type: str = 'default' + BATCH_SIZE: int = 100 + + def __init__(self, path, username, owner_type, owner, import_type='default'): + super().__init__() + self.path = path + self.username = username + self.owner_type = owner_type + self.owner = owner + self.import_type = import_type + + def is_npm_import(self) -> bool: + return self.import_type == 'npm' + + def run(self): + resource_types = ['CodeSystem'] # , 'ValueSet', 'ConceptMap'] + resource_types.extend(ResourceImporter.get_resource_types()) + + resources = {} + remote_file = requests.get(self.path, stream=True) + with tempfile.NamedTemporaryFile() as temp: + ImporterUtils.fetch_to_temp_file(remote_file, temp) + + is_zipped, is_tarred = ImporterUtils.is_zipped_or_tarred(temp) + + if is_zipped: + with ZipFile(temp) as package: + files = package.namelist() + for file_name in files: + if self.is_importable_file(file_name): + with package.open(file_name) as json_file: + self.categorize_resources(json_file, file_name, resource_types, resources) + elif is_tarred: + with tarfile.open(fileobj=temp, mode='r') as package: + for file_name in package.getnames(): + if self.is_importable_file(file_name): + with package.extractfile(file_name) as json_file: + self.categorize_resources(json_file, file_name, resource_types, resources) + else: + self.categorize_resources(temp, '', resource_types, resources) + + tasks = self.prepare_tasks(resource_types, resources) + + task = Importer.schedule_tasks(tasks) + + # Return the task id of the chain to track the end of execution. + # We do not wait for the end of execution of tasks here to free up worker and memory. + # It is also to be able to pick up running tasks in the event of restart and not having to handle restarting the + # main task. + # In the future we will let the user approve the import before scheduling tasks, thus we save tasks in results. + result = ImportResult(id=task.id, tasks=tasks) + + for resource_type, files in resources.items(): + resource_count = 0 + for file_name, count in files.items(): + resource_count += count + if resource_count > 0: + result.summary.resources.append(ImportResultSummaryResource(type=resource_type, total=resource_count)) + + return result.model_dump() + + def prepare_tasks(self, resource_types, resources): + tasks = [] + # Import in groups in order. Resources within groups are imported in parallel. + for resource_type in resource_types: + files = [] + groups = [] + batch_size = self.BATCH_SIZE + for file, count in resources.get(resource_type).items(): + start_index = 0 + while start_index < count: + if (count - start_index) < batch_size: + # If a file contains less than batch resources then include in a single task. + end_index = count + else: + # If a file contains more than batch resources then split in multiple tasks in batches. + end_index = start_index + batch_size + + files.append({"file": file, "start_index": start_index, "end_index": end_index}) + + batch_size -= end_index - start_index + start_index = end_index + + if batch_size <= 0: + groups.append({"path": self.path, "username": self.username, "owner_type": self.owner_type, + "owner": self.owner, "resource_type": resource_type, "files": files}) + files = [] + batch_size = self.BATCH_SIZE + + if groups: + tasks.append(groups) + return tasks + + @staticmethod + def schedule_tasks(tasks): + chord_tasks = [] + for task in tasks: + group_tasks = [] + for group_task in task: + # Wrap groups in chords with chordfinisher to wait for group results before running another group. + # TODO: create 2 queues for new bulk import subtasks: bulk_import_subtask and bulk_import_subtask_root + group_tasks.append(bulk_import_subtask.s(group_task['path'], group_task['username'], + group_task['owner_type'], group_task['owner'], + group_task['resource_type'], group_task['files']) + .set(queue='concurrent')) + chord_tasks.append(chord(group(group_tasks), chordfinisher.si())) + task = chain(chord_tasks)() + return task + + def is_importable_file(self, file_name): + return file_name.endswith('.json') and ((self.is_npm_import() and file_name.startswith('package/') + and file_name.count('/') == 1) + or not self.is_npm_import()) + + def categorize_resources(self, json_file, file_name, resource_types, resources={}): + for resource_type in resource_types: + if resource_type not in resources: + resources.update({resource_type: {}}) + + parser = ijson.parse(json_file, multiple_values=True, allow_comments=True) + for prefix, event, value in parser: + # Expect {"resourceType": ""}{"resourceType": ""} or {"type": ""}{"type": ""} + if event == 'string' and (prefix == 'resourceType' or prefix == 'type'): + # Categorize files based on resourceType + if value in resource_types: + resource_type = resources.get(value) + # Remember count of resources of the given type within a file + resource_type.update({file_name: resource_type.get(file_name, 0) + 1}) + + if self.is_npm_import(): # Expect only one resource per file for npm + break + + +class ImportRequest: + path: str + user: UserProfile + + def __init__(self, owner_type, owner, username, resource_type): + self.path = f'/{owner_type}/{owner}/{resource_type}/' + self.user = UserProfile.objects.filter(username=username).first() + + +class ResourceImporter: + resource_importers = [OrganizationImporter, SourceImporter, SourceVersionImporter, ConceptImporter, MappingImporter, + CollectionImporter, CollectionVersionImporter, ReferenceImporter] + converters = [CodeSystemConverter] + result_type = [CREATED, UPDATED, FAILED, DELETED, NOT_FOUND, PERMISSION_DENIED, UNCHANGED] + + @staticmethod + def get_resource_types(): + resource_types = [] + for resource_importer in ResourceImporter.resource_importers: + resource_types.append(resource_importer.get_resource_type()) + return resource_types + + def import_resource(self, resource, username, owner_type, owner): + resource_type = resource.get('resourceType', None) + if resource_type: + # Handle fhir resources + if resource_type == 'CodeSystem': + url = resource.get('url') + source = Source.objects.filter(canonical_url=url) + if not source: + url = IdentifierSerializer.convert_fhir_url_to_ocl_uri(url, 'sources') + source = Source.objects.filter(uri=url) + + context = { + 'request': ImportRequest(owner_type, owner, username, resource_type) + } + + if source: + serializer = CodeSystemDetailSerializer(source.first(), data=resource, context=context) + result = UPDATED + else: + serializer = CodeSystemDetailSerializer(data=resource, context=context) + result = CREATED + if serializer.is_valid(): + serializer.save() + return serializer.errors if serializer.errors else result + else: + # Handle other resources + for resource_importer in self.resource_importers: + if resource_importer.can_handle(resource): + user_profile = UserProfile.objects.get(username=username) + result = resource_importer(resource, user_profile, True).run() + return result + return None + + +class ImporterSubtask: + path: str + username: str + owner_type: str + owner: str + file: str + resource_type: str + start_index: int + end_index: int + progress: int + + def __init__(self, path, username, owner_type, owner, resource_type, files): + super().__init__() + self.path = path + self.username = username + self.owner_type = owner_type + self.owner = owner + self.resource_type = resource_type + self.files = files + + def run(self): + results = [] + remote_file = requests.get(self.path, stream=True) + with tempfile.NamedTemporaryFile() as temp: + ImporterUtils.fetch_to_temp_file(remote_file, temp) + + is_zipped, is_tarred = ImporterUtils.is_zipped_or_tarred(temp) + + if is_zipped: + with ZipFile(temp) as package: + for file in self.files: + with package.open(file.get("file")) as json_file: + result = self.import_resource(json_file, file.get("start_index"), file.get("end_index")) + results.extend(result) + elif is_tarred: + with tarfile.open(fileobj=temp, mode='r') as package: + for file in self.files: + with package.extractfile(file.get("file")) as json_file: + result = self.import_resource(file.get("file"), json_file, file.get("start_index"), + file.get("end_index")) + results.extend(result) + else: + result = self.import_resource(temp) + results.extend(result) + + return results + + def import_resource(self, file, json_file, start_index, end_index): + parse = self.move_to_start_index(json_file, start_index) + count = end_index - start_index + results = [] + for resource in ijson.items(parse, '', multiple_values=True, allow_comments=True): + if resource.get('__action', None) == 'DELETE': + continue + + if resource.get('resourceType', None) == self.resource_type \ + or resource.get('type', None) == self.resource_type: + try: + if self.resource_type.lower() in ['source', 'collection']: + if 'owner_type' not in resource: + resource['owner_type'] = self.owner_type + if 'owner' not in resource: + resource['owner'] = self.owner + result = ResourceImporter().import_resource(resource, self.username, self.owner_type, self.owner) + results.append(result) + except Exception as e: + error = f'Failed to import resource with id {resource.get("id", None)} from {self.path}/{file} to ' \ + f'{self.owner_type}/{self.owner} by {self.username}' + logger.exception(error) + results.append([f'{error} due to: {str(e)}']) + count -= 1 + if count <= 0: + break + return results + + def move_to_start_index(self, json_file, start_index): + if start_index != 0: + index = 0 + parse_events = ijson.parse(json_file, multiple_values=True, allow_comments=True) + for prefix, event, value in parse_events: + if event == 'string' and (prefix == 'resourceType' or prefix == 'type'): + if value == self.resource_type: + index += 1 + if index == start_index: + # Move the pointer to the next json top-level object + for prefix_inner, event_inner, value_inner in parse_events: + if event_inner == 'end_map' and prefix_inner == '': + return parse_events + # We should end up here only if no resources of the specified type + return parse_events + else: + return json_file diff --git a/core/importers/models.py b/core/importers/models.py index 44043240b..f7f6c13e9 100644 --- a/core/importers/models.py +++ b/core/importers/models.py @@ -113,6 +113,14 @@ def __init__(self, data, user, update_if_exists=False): self.update_if_exists = update_if_exists self.queryset = None + @classmethod + def can_handle(cls, obj): + return isinstance(obj, dict) and obj.get('type', '').lower() == cls.get_resource_type() + + @staticmethod + def get_resource_type(): + raise NotImplementedError() + def get(self, attr, default_value=None): return self.data.get(attr, default_value) @@ -176,6 +184,10 @@ class OrganizationImporter(BaseResourceImporter): mandatory_fields = {'id', 'name'} allowed_fields = ["id", "company", "extras", "location", "name", "public_access", "website"] + @staticmethod + def get_resource_type(): + return 'organization' + def exists(self): return self.get_queryset().exists() @@ -215,6 +227,10 @@ class SourceImporter(BaseResourceImporter): 'hierarchy_meaning', 'compositional', 'version_needed', 'meta', ] + @staticmethod + def get_resource_type(): + return 'source' + def exists(self): return self.get_queryset().exists() @@ -264,6 +280,10 @@ class SourceVersionImporter(BaseResourceImporter): mandatory_fields = {"id"} allowed_fields = ["id", "external_id", "description", "released"] + @staticmethod + def get_resource_type(): + return 'source version' + def exists(self): return Source.objects.filter( **{self.get_owner_type_filter(): self.get('owner'), @@ -298,6 +318,10 @@ class CollectionImporter(BaseResourceImporter): 'revision_date', 'text', 'immutable', 'experimental', 'locked_date', 'meta', ] + @staticmethod + def get_resource_type(): + return 'collection' + def exists(self): return self.get_queryset().exists() @@ -347,6 +371,10 @@ class CollectionVersionImporter(BaseResourceImporter): mandatory_fields = {"id"} allowed_fields = ["id", "external_id", "description", "released"] + @staticmethod + def get_resource_type(): + return 'collection version' + def exists(self): return Collection.objects.filter( **{self.get_owner_type_filter(): self.get('owner'), @@ -379,6 +407,10 @@ class ConceptImporter(BaseResourceImporter): "parent_concept_urls", 'update_comment', 'comment' ] + @staticmethod + def get_resource_type(): + return 'concept' + def __init__(self, data, user, update_if_exists): super().__init__(data, user, update_if_exists) self.version = False @@ -469,6 +501,10 @@ class MappingImporter(BaseResourceImporter): "to_concept_name", "extras", "external_id", "retired", 'update_comment', 'comment', 'sort_weight' ] + @staticmethod + def get_resource_type(): + return 'mapping' + def __init__(self, data, user, update_if_exists): super().__init__(data, user, update_if_exists) self.version = False @@ -604,6 +640,10 @@ class ReferenceImporter(BaseResourceImporter): mandatory_fields = {"data"} allowed_fields = ["data", "collection", "owner", "owner_type", "__cascade", "collection_url"] + @staticmethod + def get_resource_type(): + return 'reference' + def exists(self): return False diff --git a/core/importers/tests.py b/core/importers/tests.py index 9801892a9..f5ebbfe1a 100644 --- a/core/importers/tests.py +++ b/core/importers/tests.py @@ -4,6 +4,7 @@ from json import JSONDecodeError from zipfile import ZipFile +import responses from celery_once import AlreadyQueued from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management import call_command @@ -18,6 +19,7 @@ from core.common.tests import OCLAPITestCase, OCLTestCase from core.concepts.models import Concept from core.concepts.tests.factories import ConceptFactory +from core.importers.importer import ImportSubtask from core.importers.input_parsers import ImportContentParser from core.importers.models import BulkImport, BulkImportInline, BulkImportParallelRunner from core.importers.views import csv_file_data_to_input_list @@ -1034,6 +1036,30 @@ def test_chunker_list(self): ] ) + @responses.activate + def test_import_subtask_single_resource_per_file(self): + pass + + @responses.activate + def test_import_subtask_multiple_resource_per_file(self): + with open(os.path.join(os.path.dirname(__file__), '..', 'samples/BI-FY19-baseline.json'), 'rb') \ + as file: + responses.add(responses.GET, 'http://fetch.com/some/npm/package', body=file.read(), status=200, + content_type='application/json', stream=True) + + org_result = ImportSubtask('http://fetch.com/some/npm/package', 'ocladmin', 'organization', + 'OCL', 'export.json', 'Organization', 0, 1).run() + self.assertEqual(org_result, [False]) + + source_result = ImportSubtask('http://fetch.com/some/npm/package', 'ocladmin', 'organization', + 'OCL', 'export.json', 'Source', 0, 1).run() + + self.assertEqual(source_result, '') + + concept_result = ImportSubtask('http://fetch.com/some/npm/package', 'ocladmin', 'organization', + 'OCL', 'export.json', 'Concept', 5, 10).run() + self.assertEqual(concept_result, '') + class BulkImportViewTest(OCLAPITestCase): def setUp(self): diff --git a/core/importers/views.py b/core/importers/views.py index 4537ee31f..0fb1ee59c 100644 --- a/core/importers/views.py +++ b/core/importers/views.py @@ -16,11 +16,12 @@ from core.celery import app from core.common.constants import DEPRECATED_API_HEADER +from core.common.tasks import bulk_import_new from core.services.storages.redis import RedisService from core.common.swagger_parameters import update_if_exists_param, task_param, result_param, username_param, \ file_upload_param, file_url_param, parallel_threads_param, verbose_param from core.common.utils import parse_bulk_import_task_id, task_exists, flower_get, queue_bulk_import, \ - get_bulk_import_celery_once_lock_key, is_csv_file, get_truthy_values + get_bulk_import_celery_once_lock_key, is_csv_file, get_truthy_values, get_queue_task_names from core.importers.constants import ALREADY_QUEUED, INVALID_UPDATE_IF_EXISTS, NO_CONTENT_TO_IMPORT from core.importers.input_parsers import ImportContentParser @@ -256,6 +257,18 @@ class ImportView(BulkImportParallelInlineView, ImportRetrieveDestroyMixin): manual_parameters=[update_if_exists_param, file_url_param, file_upload_param, parallel_threads_param], ) def post(self, request, import_queue=None): + if 'import_type' in request.data: + file_url = request.data.get('file_url') + queue_id, task_id = get_queue_task_names(import_queue, self.request.user.username) + task = bulk_import_new.apply_async( + (file_url, self.request.user.username, + request.data.get('owner_type', 'user'), request.data.get('owner', self.request.user.username), + request.data.get('import_type', 'npm')), task_id=task_id, queue=queue_id) + return Response({ + 'task': task.id, + 'state': task.state + }, status=status.HTTP_202_ACCEPTED) + return super().post(request, import_queue) diff --git a/hl7.fhir.r4b.core-4.3.0.tgz b/hl7.fhir.r4b.core-4.3.0.tgz new file mode 100644 index 000000000..65090042a Binary files /dev/null and b/hl7.fhir.r4b.core-4.3.0.tgz differ diff --git a/requirements.txt b/requirements.txt index 7bfba75fc..2c453cf4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,4 +38,8 @@ mozilla-django-oidc==3.0.0 django-celery-beat==2.5.0 jsondiff==2.0.0 django-silk==5.0.4 -azure-storage-blob==12.19.0 \ No newline at end of file +azure-storage-blob==12.19.0 +fhir.resources==7.1.0 +ijson==3.2.3 +responses==0.25.0 +pydantic==2.6.4 \ No newline at end of file