diff --git a/app/container.py b/app/container.py index e0635585..e917cb3e 100644 --- a/app/container.py +++ b/app/container.py @@ -1,5 +1,6 @@ from app.services.entity.supplier_ignored_directory_service import SupplierIgnoredDirectoryService from app.services.entity.supplier_info_service import SupplierInfoService +from app.services.fhir.fhir_service import FhirService from app.services.supplier_provider.factory import SupplierProviderFactory from app.services.supplier_provider.supplier_provider import SupplierProvider from app.services.update.mass_update_consumer_service import MassUpdateConsumerService @@ -9,6 +10,7 @@ from app.config import get_config from app.services.entity.resource_map_service import ResourceMapService from app.services.api.authenticators.factory import AuthenticatorFactory +from app.services.update.new_service import DataPreparationService from app.services.update.update_consumer_service import UpdateConsumerService from typing import cast @@ -31,7 +33,9 @@ def container_config(binder: inject.Binder) -> None: supplier_provider_factory = SupplierProviderFactory(config=config, database=db, auth=auth) supplier_provider = supplier_provider_factory.create() binder.bind(SupplierProvider, supplier_provider) - + + new_service = DataPreparationService(fhir_service=FhirService(config.mcsd.strict_validation)) + update_service = UpdateConsumerService( consumer_url=config.mcsd.consumer_url, strict_validation=config.mcsd.strict_validation, @@ -40,14 +44,15 @@ def container_config(binder: inject.Binder) -> None: request_count=config.mcsd.request_count, resource_map_service=resource_map_service, auth=auth, + new_service=new_service ) binder.bind(UpdateConsumerService, update_service) supplier_ignored_directory_service = SupplierIgnoredDirectoryService(db) binder.bind(SupplierIgnoredDirectoryService, supplier_ignored_directory_service) - + supplier_info_service = SupplierInfoService( - db, + db, supplier_stale_timeout_seconds=config.scheduler.supplier_stale_timeout_in_sec, # type: ignore ) binder.bind(SupplierInfoService, supplier_info_service) @@ -61,7 +66,7 @@ def container_config(binder: inject.Binder) -> None: cleanup_client_directory_after_success_timeout_seconds=config.scheduler.cleanup_client_directory_after_success_timeout_in_sec, # type: ignore stats=get_stats(), ) - + update_scheduler = Scheduler( function=update_all_service.update_all, diff --git a/app/cron.py b/app/cron.py deleted file mode 100644 index c821b297..00000000 --- a/app/cron.py +++ /dev/null @@ -1,54 +0,0 @@ -import argparse -import logging -from typing import Protocol, Any - -import inject - -import application - -from cron.cleanup_expired import CleanupExpired - -logger = logging.getLogger(__name__) - - -class CronCommand(Protocol): - def init_arguments(self, subparser: Any) -> None: - ... - - def run(self, args: argparse.Namespace) -> int: - ... - - -CRON_COMMANDS: dict[str, CronCommand] = { - "cleanup_expired": CleanupExpired, -} - - -def main() -> None: - application.application_init() - - parser = argparse.ArgumentParser(description="Cron command line interface") - subparser = parser.add_subparsers( - dest="command", title="cron commands", help="valid cron commands", required=True - ) - for name in CRON_COMMANDS.keys(): - command_get(name).init_arguments(subparser) - - args = parser.parse_args() - - # Run command - logger.info("Running command %s", args.command) - code = command_get(args.command).run(args) - exit(code) - - -def command_exists(name: str) -> bool: - return name in CRON_COMMANDS - - -def command_get(name: str) -> CronCommand: - return inject.instance(CRON_COMMANDS[name]) - - -if __name__ == "__main__": - main() diff --git a/app/models/adjacency/adjacency_map.py b/app/models/adjacency/adjacency_map.py index 349a0619..b5831e04 100644 --- a/app/models/adjacency/adjacency_map.py +++ b/app/models/adjacency/adjacency_map.py @@ -31,7 +31,7 @@ def get_group(self, node: Node) -> List[Node]: while queue: current = queue.popleft() - for ref in current.supplier_data.references: + for ref in current.references: sibling = self.data[ref.id] if sibling.visited is False: sibling.visited = True @@ -43,7 +43,7 @@ def get_group(self, node: Node) -> List[Node]: def get_missing_refs(self) -> List[NodeReference]: refs = list( chain.from_iterable( - [node.supplier_data.references for node in self.data.values()] + [node.references for node in self.data.values()] ) ) return list(filter(self._ref_in_ajd_map, refs)) diff --git a/app/models/adjacency/node.py b/app/models/adjacency/node.py index 93df1f4e..23b86f3f 100644 --- a/app/models/adjacency/node.py +++ b/app/models/adjacency/node.py @@ -1,14 +1,8 @@ -import copy -from typing import List, Literal -from fhir.resources.R4B.domainresource import DomainResource -from pydantic import BaseModel, computed_field -from fhir.resources.R4B.bundle import BundleEntry, BundleEntryRequest +from typing import List +from pydantic import BaseModel +from fhir.resources.R4B.bundle import BundleEntry -from app.models.fhir.types import HttpValidVerbs from app.models.resource_map.dto import ResourceMapDto, ResourceMapUpdateDto -from app.services.fhir.references.reference_namespacer import ( - namespace_resource_reference, -) class NodeReference(BaseModel): @@ -20,41 +14,6 @@ def namespace_id(self, namespace: str) -> None: self.id = new_id -class SupplierNodeData(BaseModel): - supplier_id: str - references: List[NodeReference] - method: HttpValidVerbs - entry: BundleEntry - - @computed_field # type: ignore - @property - def hash_value(self) -> int | None: - if self.entry.resource is None: - return None - - resource = copy.deepcopy(self.entry.resource) - namespace_resource_reference(resource, self.supplier_id) - resource.meta = None - resource.id = None - - return hash(resource.model_dump().__repr__()) - - -class ConsumerNodeData(BaseModel): - resource: DomainResource | None - - @computed_field # type: ignore - @property - def hash_value(self) -> int | None: - if self.resource is None: - return None - - res = copy.deepcopy(self.resource) - res.meta = None - res.id = None - return hash(res.model_dump().__repr__()) - - class NodeUpdateData(BaseModel): bundle_entry: BundleEntry | None = None resource_map_dto: ResourceMapDto | ResourceMapUpdateDto | None = None @@ -65,131 +24,8 @@ class Node(BaseModel): resource_type: str visited: bool = False updated: bool = False - supplier_data: SupplierNodeData - consumer_data: ConsumerNodeData - resource_map: ResourceMapDto | ResourceMapUpdateDto | None = None - - @computed_field # type: ignore - @property - def update_status( - self, - ) -> Literal["ignore", "equal", "delete", "update", "new"]: - - if ( - self.supplier_data.method == "DELETE" - and self.consumer_data.resource is None - ): - return "ignore" - - if ( - self.supplier_data.method != "DELETE" - and self.supplier_data.hash_value - and self.consumer_data.hash_value - and self.supplier_data.hash_value == self.consumer_data.hash_value - ): - return "equal" - - if ( - self.supplier_data.method == "DELETE" - and self.consumer_data.resource is not None - ): - return "delete" - - if ( - self.supplier_data.method != "DELETE" - and self.supplier_data.hash_value is not None - and self.consumer_data.hash_value is None - and self.resource_map is None - ): - return "new" - return "update" - - @computed_field # type: ignore - @property - def update_data(self) -> NodeUpdateData | None: - consumer_resource_id = f"{self.supplier_data.supplier_id}-{self.resource_id}" - url = f"{self.resource_type}/{consumer_resource_id}" - dto: ResourceMapDto | ResourceMapUpdateDto | None = None - match self.update_status: - case "ignore": - return None - - case "equal": - return None - - case "delete": - entry = BundleEntry.model_construct() - entry_request = BundleEntryRequest.model_construct( - method="DELETE", url=url - ) - entry.request = entry_request - - if self.resource_map is None: - raise Exception( - f"Resource map for {self.resource_id} {self.resource_type} cannot be None and marked as delete " - ) - - dto = ResourceMapUpdateDto( - history_size=self.resource_map.history_size + 1, - supplier_id=self.supplier_data.supplier_id, - resource_type=self.resource_type, - supplier_resource_id=self.resource_id, - ) - - return NodeUpdateData(bundle_entry=entry, resource_map_dto=dto) - - case "new": - entry = BundleEntry.model_construct() - entry_request = BundleEntryRequest.model_construct( - method="PUT", url=url - ) - resource = copy.deepcopy(self.supplier_data.entry.resource) - if resource is None: - raise Exception( - f"Resource {self.resource_id} {self.resource_type} cannot be None when a node is marked `new`" - ) - namespace_resource_reference(resource, self.supplier_data.supplier_id) - resource.id = consumer_resource_id - entry.resource = resource - entry.request = entry_request - - dto = ResourceMapDto( - supplier_id=self.supplier_data.supplier_id, - supplier_resource_id=self.resource_id, - consumer_resource_id=consumer_resource_id, - resource_type=self.resource_type, - history_size=1, - ) - - return NodeUpdateData(bundle_entry=entry, resource_map_dto=dto) - - case "update": - entry = BundleEntry.model_construct() - entry_request = BundleEntryRequest.model_construct( - method="PUT", url=url - ) - resource = copy.deepcopy(self.supplier_data.entry.resource) - if resource is None: - raise Exception( - f"Resource {self.resource_id} {self.resource_type} cannot be None and node marked as `update`" - ) - - namespace_resource_reference(resource, self.supplier_data.supplier_id) - resource.id = consumer_resource_id - entry.request = entry_request - entry.resource = resource - - if self.resource_map is None: - raise Exception( - f"Resource map for {self.resource_id} {self.resource_type} cannot be None and marked as `update`" - ) - - dto = ResourceMapUpdateDto( - supplier_id=self.supplier_data.supplier_id, - supplier_resource_id=self.resource_id, - resource_type=self.resource_type, - history_size=self.resource_map.history_size + 1, - ) + references: List[NodeReference] - return NodeUpdateData(bundle_entry=entry, resource_map_dto=dto) + existing_resource_map: ResourceMapDto | ResourceMapUpdateDto | None = None + supplier_bundle_entry: BundleEntry diff --git a/app/services/update/adjacency_map_service.py b/app/services/update/adjacency_map_service.py index d51a80ec..e3128de4 100644 --- a/app/services/update/adjacency_map_service.py +++ b/app/services/update/adjacency_map_service.py @@ -3,10 +3,8 @@ from app.models.adjacency.adjacency_map import AdjacencyMap from app.models.adjacency.node import ( - ConsumerNodeData, Node, NodeReference, - SupplierNodeData, ) from app.models.resource_map.dto import ResourceMapDto from app.services.entity.resource_map_service import ResourceMapService @@ -40,21 +38,21 @@ def build_adjacency_map( missing_nodes = [self.create_node(entry) for entry in missing_entries] adj_map.add_nodes(missing_nodes) missing_refs = adj_map.get_missing_refs() - - consumer_targets = [ - NodeReference( - id=f"{self.supplier_id}-{node.resource_id}", - resource_type=node.resource_type, - ) - for node in adj_map.data.values() - ] - consumer_entries = self.get_consumer_data(consumer_targets) - for entry in consumer_entries: - _, id = self.__fhir_service.get_resource_type_and_id_from_entry(entry) - supplier_id = id.replace(f"{self.supplier_id}-", "") - node = adj_map.data[supplier_id] - node.consumer_data = ConsumerNodeData(resource=entry.resource) return adj_map + # consumer_targets = [ + # NodeReference( + # id=f"{self.supplier_id}-{node.resource_id}", + # resource_type=node.resource_type, + # ) + # for node in adj_map.data.values() + # ] + # consumer_entries = self.get_consumer_data(consumer_targets) + # for entry in consumer_entries: + # _, id = self.__fhir_service.get_resource_type_and_id_from_entry(entry) + # supplier_id = id.replace(f"{self.supplier_id}-", "") + # node = adj_map.data[supplier_id] + # node.consumer_data = ConsumerNodeData(resource=entry.resource) + # return adj_map def get_entries( self, refs: List[NodeReference], fhir_api: FhirApi @@ -75,37 +73,28 @@ def get_consumer_data(self, refs: List[NodeReference]) -> List[BundleEntry]: def create_node(self, entry: BundleEntry) -> Node: res_type, id = self.__fhir_service.get_resource_type_and_id_from_entry(entry) - supplier_node_data = self.create_supplier_data(self.supplier_id, entry) + refs = [] + if entry.resource: + res_refs = self.__fhir_service.get_references(entry.resource) + for ref in res_refs: + _res_type, ref_id = self.__fhir_service.split_reference(ref) + refs.append(NodeReference(id=ref_id, resource_type=_res_type)) - node = Node( - resource_id=id, - resource_type=res_type, - supplier_data=supplier_node_data, - consumer_data=ConsumerNodeData(resource=None), - ) resource_map = self.__resource_map_service.get( supplier_id=self.supplier_id, resource_type=res_type, supplier_resource_id=id, ) - node.resource_map = ( + + node = Node( + resource_id=id, + resource_type=res_type, + supplier_bundle_entry=entry, + references=refs + ) + + node.existing_resource_map = ( ResourceMapDto(**resource_map.to_dict()) if resource_map else None ) return node - def create_supplier_data( - self, supplier_id: str, entry: BundleEntry - ) -> SupplierNodeData: - method = self.__fhir_service.get_request_method_from_entry(entry) - refs = [] - if entry.resource: - res_refs = self.__fhir_service.get_references(entry.resource) - for ref in res_refs: - res_type, ref_id = self.__fhir_service.split_reference(ref) - refs.append(NodeReference(id=ref_id, resource_type=res_type)) - return SupplierNodeData( - supplier_id=supplier_id, - method=method, - references=refs, - entry=entry, - ) diff --git a/app/services/update/new_service.py b/app/services/update/new_service.py new file mode 100644 index 00000000..fc565618 --- /dev/null +++ b/app/services/update/new_service.py @@ -0,0 +1,173 @@ +import copy +import logging +from typing import Literal + +from fhir.resources.R4B.bundle import BundleEntry, BundleEntryRequest +from fhir.resources.R4B.domainresource import DomainResource + +from app.models.adjacency.node import Node, NodeUpdateData +from app.models.fhir.types import HttpValidVerbs +from app.models.resource_map.dto import ResourceMapDto, ResourceMapUpdateDto +from app.services.fhir.references.reference_namespacer import namespace_resource_reference +from app.services.fhir.fhir_service import FhirService + +logger = logging.getLogger(__name__) + +class DataPreparationService(): + def __init__(self, fhir_service: FhirService): + self.__fhir_service = fhir_service + + def __update_status( + self, + supplier_bundle_method: HttpValidVerbs, + namespaced_supplier_data: DomainResource, + consumer_data: BundleEntry | None, + ) -> Literal["ignore", "equal", "delete", "update", "new"]: + + if ( + supplier_bundle_method == "DELETE" + and consumer_data is None + ): + return "ignore" + supplier_hash = self.hash_value(namespaced_supplier_data) + consumer_hash = self.hash_value(consumer_data.resource) if consumer_data else None + if ( + supplier_bundle_method != "DELETE" + and supplier_hash + and consumer_hash + and supplier_hash == consumer_hash + ): + return "equal" + + if ( + supplier_bundle_method == "DELETE" + and consumer_data is not None + ): + return "delete" + + if ( + supplier_bundle_method != "DELETE" + and supplier_hash is not None + and consumer_hash is None + ): + return "new" + + return "update" + + + def hash_value(self, resource: DomainResource | None) -> int | None: + if resource is None: + return None + + res = copy.deepcopy(resource) + res.meta = None + res.id = None + return hash(res.model_dump().__repr__()) + + def prepare_update_data(self, + node: Node, + supplier_id: str, + consumer_data: BundleEntry | None + ) -> NodeUpdateData | None: + resource = copy.deepcopy(node.supplier_bundle_entry.resource) + namespaced_supplier_data = self.__fhir_service.namespace_resource_references(resource, supplier_id) + update_status = self.__update_status( + node.supplier_bundle_entry.request.method, + namespaced_supplier_data, + consumer_data + ) + + consumer_resource_id = f"{supplier_id}-{node.resource_id}" + url = f"{node.resource_type}/{consumer_resource_id}" + + new_resource_map: ResourceMapDto | ResourceMapUpdateDto | None = None + match update_status: + case "ignore": + logger.info( + f"{node.resource_id} {node.resource_type} is not needed, ignoring..." + ) + return None + + case "equal": + logger.info( + f"{node.resource_id} {node.resource_type} has not changed, ignoring..." + ) + return None + + case "delete": + entry = BundleEntry.model_construct() + entry_request = BundleEntryRequest.model_construct( + method="DELETE", url=url + ) + entry.request = entry_request + + if node.existing_resource_map is None: + raise Exception( + f"Resource map for {node.resource_id} {node.resource_type} cannot be None and marked as delete " + ) + + new_resource_map = ResourceMapUpdateDto( + history_size=node.existing_resource_map.history_size + 1, + supplier_id=supplier_id, + resource_type=node.resource_type, + supplier_resource_id=node.resource_id, + ) + + return NodeUpdateData(bundle_entry=entry, resource_map_dto=new_resource_map) + + case "new": + entry = BundleEntry.model_construct() + entry_request = BundleEntryRequest.model_construct( + method="PUT", url=url + ) + resource = copy.deepcopy(node.supplier_bundle_entry.resource) + if resource is None: + raise Exception( + f"Resource {node.resource_id} {node.resource_type} cannot be None when a node is marked `new`" + ) + namespace_resource_reference(resource, supplier_id) + resource.id = consumer_resource_id + entry.resource = resource + entry.request = entry_request + + new_resource_map = ResourceMapDto( + supplier_id=supplier_id, + supplier_resource_id=node.resource_id, + consumer_resource_id=consumer_resource_id, + resource_type=node.resource_type, + history_size=1, + ) + + return NodeUpdateData(bundle_entry=entry, resource_map_dto=new_resource_map) + + case "update": + entry = BundleEntry.model_construct() + entry_request = BundleEntryRequest.model_construct( + method="PUT", url=url + ) + resource = copy.deepcopy(node.supplier_bundle_entry.resource) + if resource is None: + raise Exception( + f"Resource {node.resource_id} {node.resource_type} cannot be None and node marked as `update`" + ) + + namespace_resource_reference(resource, supplier_id) + resource.id = consumer_resource_id + entry.request = entry_request + entry.resource = resource + + if node.existing_resource_map is None: + raise Exception( + f"Resource map for {node.resource_id} {node.resource_type} cannot be None and marked as `update`" + ) + + new_resource_map = ResourceMapUpdateDto( + supplier_id=supplier_id, + supplier_resource_id=node.resource_id, + resource_type=node.resource_type, + history_size=node.existing_resource_map.history_size + 1, + ) + + return NodeUpdateData(bundle_entry=entry, resource_map_dto=new_resource_map) + case _: + raise Exception("Should match one of this blabla todo fixme") diff --git a/app/services/update/update_consumer_service.py b/app/services/update/update_consumer_service.py index 302aac02..d2b29786 100644 --- a/app/services/update/update_consumer_service.py +++ b/app/services/update/update_consumer_service.py @@ -3,15 +3,16 @@ import copy from enum import Enum import logging -from typing import List, Any +from typing import List, Any, Literal from uuid import uuid4 from fhir.resources.R4B.bundle import Bundle, BundleEntry, BundleEntryRequest from yarl import URL from app.models.resource_map.dto import ResourceMapDto, ResourceMapUpdateDto from app.models.adjacency.node import ( - Node, + Node, NodeReference, NodeUpdateData, ) from app.models.supplier.dto import SupplierDto +from app.services.fhir.references.reference_namespacer import namespace_resource_reference from app.services.update.adjacency_map_service import ( AdjacencyMapService, ) @@ -21,6 +22,7 @@ from app.services.api.authenticators.authenticator import Authenticator from app.services.fhir.fhir_service import FhirService from app.services.api.fhir_api import FhirApi +from app.services.update.new_service import DataPreparationService logger = logging.getLogger(__name__) @@ -45,6 +47,7 @@ def __init__( request_count: int, resource_map_service: ResourceMapService, auth: Authenticator, + new_service: DataPreparationService ) -> None: self.strict_validation = strict_validation self.timeout = timeout @@ -58,6 +61,7 @@ def __init__( ) self.__fhir_service = FhirService(strict_validation) self.__cache: List[str] = [] + self.__new_service = new_service def cleanup(self, supplier_id: str) -> None: for res_type in McsdResources: @@ -124,13 +128,14 @@ def update_resource( continue targets.append(e) - updated_nodes = self.update_page(targets, adjacency_map_service) + updated_nodes = self.update_page(targets, adjacency_map_service, supplier.id) for node in updated_nodes: if node.resource_id not in self.__cache: self.__cache.append(node.resource_id) def update_page( - self, entries: List[BundleEntry], adjacency_map_service: AdjacencyMapService + self, entries: List[BundleEntry], adjacency_map_service: AdjacencyMapService, + supplier_id: str ) -> List[Node]: updated = [] adj_map = adjacency_map_service.build_adjacency_map(entries, self.__cache) @@ -142,33 +147,44 @@ def update_page( continue group = adj_map.get_group(node) - results = self.update_with_bundle(group) + results = self.update_with_bundle(group,adjacency_map_service, supplier_id) updated.extend(results) return updated - def update_with_bundle(self, nodes: List[Node]) -> List[Node]: + def update_with_bundle(self, nodes: List[Node],adjacency_map_service: AdjacencyMapService, supplier_id: str) -> List[Node]: bundle = Bundle.model_construct(id=uuid4(), type="transaction") bundle.entry = [] dtos = [] - for node in nodes: - if node.update_status == "equal": - logger.info( - f"{node.resource_id} {node.resource_type} has not changed, ignoring..." - ) + consumer_targets = [ + NodeReference( + id=f"{supplier_id}-{node.resource_id}", + resource_type=node.resource_type, + ) + for node in nodes + ] - if node.update_status == "ignore": - logger.info( - f"{node.resource_id} {node.resource_type} is not needed, ignoring..." - ) + consumer_entries = adjacency_map_service.get_consumer_data(consumer_targets) + + consumer_data = {} + for entry in consumer_entries: + _, id = self.__fhir_service.get_resource_type_and_id_from_entry(entry) + supplier_id = id.replace(f"{supplier_id}-", "") + consumer_data[supplier_id] = entry + + for node in nodes: + update_data = self.__new_service.prepare_update_data( + node, + supplier_id, + consumer_data.get(node.resource_id)) - if node.update_data is not None: - if node.update_data.bundle_entry is not None: - bundle.entry.append(node.update_data.bundle_entry) + if update_data is not None: + if update_data.bundle_entry is not None: + bundle.entry.append(update_data.bundle_entry) - if node.update_data.resource_map_dto is not None: - dtos.append(node.update_data.resource_map_dto) + if update_data.resource_map_dto is not None: + dtos.append(update_data.resource_map_dto) self.__consumer_fhir_api.post_bundle(bundle)