From 8da41032d80535ed72e7f10934a3e9e1086dbff4 Mon Sep 17 00:00:00 2001 From: Volker Schmidt Date: Wed, 30 Jan 2019 17:25:40 +0100 Subject: [PATCH] Implemented subscription payload handling in order to support special cases of subscriptions (custom subscriptions). --- .../impl/AbstractQueuedDataProcessorImpl.java | 62 ++++-- .../TooManyPersistRetriesException.java | 46 ++++ .../dhis/sync/impl/DhisSyncProcessorImpl.java | 4 +- .../data/model/SubscriptionFhirResource.java | 166 ++++++++++++++ ...tomSubscriptionFhirResourceRepository.java | 52 +++++ .../SubscriptionFhirResourceRepository.java | 43 ++++ ...ubscriptionFhirResourceRepositoryImpl.java | 202 ++++++++++++++++++ .../adapter/fhir/repository/FhirResource.java | 33 ++- .../repository/impl/FhirRepositoryImpl.java | 64 +++++- .../fhir/script/ScriptExecutionContext.java | 6 +- .../server/FhirServerRestHookController.java | 88 +++++++- .../server/FhirServerRestHookProcessor.java | 6 +- .../impl/FhirServerRestHookProcessorImpl.java | 58 ++++- .../impl/AbstractDhisToFhirTransformer.java | 5 +- .../fhir/util/FhirParserException.java | 49 +++++ .../adapter/fhir/util/FhirParserUtils.java | 89 ++++++++ .../V1.1.0.6_0_0__Advanced_Subscription.sql | 44 ++++ 17 files changed, 960 insertions(+), 57 deletions(-) create mode 100644 common/src/main/java/org/dhis2/fhir/adapter/data/repository/TooManyPersistRetriesException.java create mode 100644 fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/SubscriptionFhirResource.java create mode 100644 fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomSubscriptionFhirResourceRepository.java create mode 100644 fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/SubscriptionFhirResourceRepository.java create mode 100644 fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomSubscriptionFhirResourceRepositoryImpl.java create mode 100644 fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserException.java create mode 100644 fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserUtils.java create mode 100644 fhir/src/main/resources/db/migration/production/V1.1.0.6_0_0__Advanced_Subscription.sql diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/AbstractQueuedDataProcessorImpl.java b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/AbstractQueuedDataProcessorImpl.java index 8d1c2629..26eb0be4 100644 --- a/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/AbstractQueuedDataProcessorImpl.java +++ b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/AbstractQueuedDataProcessorImpl.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.data.processor.impl; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -177,7 +177,7 @@ protected void process( @Nonnull G group, @Nullable Integer rateMillis ) } catch ( IgnoredQueuedItemException e ) { - // has already been logger with sufficient details + // has already been logged with sufficient details return; } @@ -260,27 +260,10 @@ protected void receiveAuthenticated( @Nonnull DataGroupQueueItem dataGroupQu { // persist processed item processedItemRepository.process( createProcessedItem( group, processedId, processedAt ), p -> { - final TransactionStatus transactionStatus = platformTransactionManager.getTransaction( - new DefaultTransactionDefinition( TransactionDefinition.PROPAGATION_NOT_SUPPORTED ) ); - try + if ( enqueueDataItem( group, item, false, true ) ) { - queuedItemRepository.enqueue( createQueuedItemId( group, item ) ); - itemQueueJmsTemplate.convertAndSend( createDataItemQueueItem( group, item ) ); - logger.debug( "Item {} of group {} has been enqueued.", item.getId(), group.getGroupId() ); count.incrementAndGet(); } - catch ( AlreadyQueuedException e ) - { - logger.debug( "Item {} of group {} is still queued.", item.getId(), group.getGroupId() ); - } - catch ( IgnoredQueuedItemException e ) - { - // has already been logged with sufficient details - } - finally - { - finalizeTransaction( transactionStatus ); - } } ); } } ) ); @@ -308,6 +291,43 @@ else if ( isPeriodicInfoLogging() ) } } + protected boolean enqueueDataItem( @Nonnull G group, @Nonnull ProcessedItemInfo item, boolean persistedDataItem, boolean autonomousTransaction ) + { + final TransactionStatus transactionStatus; + if ( autonomousTransaction ) + { + transactionStatus = platformTransactionManager.getTransaction( + new DefaultTransactionDefinition( TransactionDefinition.PROPAGATION_NOT_SUPPORTED ) ); + } + else + { + transactionStatus = null; + } + try + { + queuedItemRepository.enqueue( createQueuedItemId( group, item ) ); + itemQueueJmsTemplate.convertAndSend( createDataItemQueueItem( group, item, persistedDataItem ) ); + logger.debug( "Item {} of group {} has been enqueued.", item.getId(), group.getGroupId() ); + return true; + } + catch ( AlreadyQueuedException e ) + { + logger.debug( "Item {} of group {} is still queued.", item.getId(), group.getGroupId() ); + } + catch ( IgnoredQueuedItemException e ) + { + // has already been logged with sufficient details + } + finally + { + if ( transactionStatus != null ) + { + finalizeTransaction( transactionStatus ); + } + } + return false; + } + private void awaitTaskTermination( @Nonnull ForkJoinTask task ) { try @@ -384,7 +404,7 @@ protected Authentication createAuthentication() protected abstract QI createQueuedItemId( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo ); @Nonnull - protected abstract DataItemQueueItem createDataItemQueueItem( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo ); + protected abstract DataItemQueueItem createDataItemQueueItem( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo, boolean persistedDataItem ); @Nonnull protected abstract SG getStoredItemGroup( @Nonnull G group ); diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/repository/TooManyPersistRetriesException.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/TooManyPersistRetriesException.java new file mode 100644 index 00000000..93d0f1d6 --- /dev/null +++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/TooManyPersistRetriesException.java @@ -0,0 +1,46 @@ +package org.dhis2.fhir.adapter.data.repository; + +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Thrown if it has been tried too many times to persist an entity that may be + * tried to insert multiple times at the same time and need to be updated in case + * it does exist. + * + * @author volsch + */ +public class TooManyPersistRetriesException extends RuntimeException +{ + private static final long serialVersionUID = -6868353655551006879L; + + public TooManyPersistRetriesException( String message ) + { + super( message ); + } +} diff --git a/dhis/src/main/java/org/dhis2/fhir/adapter/dhis/sync/impl/DhisSyncProcessorImpl.java b/dhis/src/main/java/org/dhis2/fhir/adapter/dhis/sync/impl/DhisSyncProcessorImpl.java index 15929f51..27ca8fc0 100644 --- a/dhis/src/main/java/org/dhis2/fhir/adapter/dhis/sync/impl/DhisSyncProcessorImpl.java +++ b/dhis/src/main/java/org/dhis2/fhir/adapter/dhis/sync/impl/DhisSyncProcessorImpl.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.dhis.sync.impl; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -191,7 +191,7 @@ protected QueuedDhisResourceId createQueuedItemId( @Nonnull DhisSyncGroup group, @Nonnull @Override - protected DhisResourceQueueItem createDataItemQueueItem( @Nonnull DhisSyncGroup group, @Nonnull ProcessedItemInfo processedItemInfo ) + protected DhisResourceQueueItem createDataItemQueueItem( @Nonnull DhisSyncGroup group, @Nonnull ProcessedItemInfo processedItemInfo, boolean persistedDataItem ) { return new DhisResourceQueueItem( group.getGroupId(), processedItemInfo ); } diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/SubscriptionFhirResource.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/SubscriptionFhirResource.java new file mode 100644 index 00000000..7cb1f66a --- /dev/null +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/SubscriptionFhirResource.java @@ -0,0 +1,166 @@ +package org.dhis2.fhir.adapter.fhir.data.model; + +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServerResource; +import org.dhis2.fhir.adapter.fhir.model.FhirVersion; +import org.hibernate.annotations.GenericGenerator; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.JoinColumn; +import javax.persistence.Lob; +import javax.persistence.ManyToOne; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import java.io.Serializable; +import java.time.Instant; +import java.util.UUID; + +/** + * Contains a FHIR resource that has been delivered by a subscription notification. + * + * @author volsch + */ +@Entity +@Table( name = "fhir_subscription_resource" ) +@NamedQuery( name = SubscriptionFhirResource.RESOURCE_NAMED_QUERY, query = "SELECT sfr FROM SubscriptionFhirResource sfr WHERE sfr.fhirServerResource=:fhirServerResource AND sfr.fhirResourceId=:fhirResourceId" ) +public class SubscriptionFhirResource implements Serializable +{ + private static final long serialVersionUID = -7965763701550940008L; + + public static final String RESOURCE_NAMED_QUERY = "SubscriptionFhirResource.resource"; + + private UUID id; + + private Instant createdAt; + + private FhirServerResource fhirServerResource; + + private String fhirResourceId; + + private String contentType; + + private FhirVersion fhirVersion; + + private String fhirResource; + + @GeneratedValue( generator = "uuid2" ) + @GenericGenerator( name = "uuid2", strategy = "uuid2" ) + @Id + @Column( name = "id", nullable = false ) + public UUID getId() + { + return id; + } + + public void setId( UUID id ) + { + this.id = id; + } + + @Basic + @Column( name = "created_at", nullable = false ) + public Instant getCreatedAt() + { + return createdAt; + } + + public void setCreatedAt( Instant createdAt ) + { + this.createdAt = createdAt; + } + + @ManyToOne( optional = false ) + @JoinColumn( name = "fhir_server_resource_id", nullable = false ) + public FhirServerResource getFhirServerResource() + { + return fhirServerResource; + } + + public void setFhirServerResource( FhirServerResource fhirServerResource ) + { + this.fhirServerResource = fhirServerResource; + } + + @Basic + @Column( name = "fhir_resource_id", length = 200 ) + public String getFhirResourceId() + { + return fhirResourceId; + } + + public void setFhirResourceId( String fhirResourceId ) + { + this.fhirResourceId = fhirResourceId; + } + + @Basic + @Column( name = "content_type", length = 100, nullable = false ) + public String getContentType() + { + return contentType; + } + + public void setContentType( String contentType ) + { + this.contentType = contentType; + } + + @Basic + @Column( name = "fhir_version", nullable = false ) + @Enumerated( EnumType.STRING ) + public FhirVersion getFhirVersion() + { + return fhirVersion; + } + + public void setFhirVersion( FhirVersion fhirVersion ) + { + this.fhirVersion = fhirVersion; + } + + @Basic + @Column( name = "fhir_resource" ) + @Lob + public String getFhirResource() + { + return fhirResource; + } + + public void setFhirResource( String fhirResource ) + { + this.fhirResource = fhirResource; + } +} diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomSubscriptionFhirResourceRepository.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomSubscriptionFhirResourceRepository.java new file mode 100644 index 00000000..ae933deb --- /dev/null +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomSubscriptionFhirResourceRepository.java @@ -0,0 +1,52 @@ +package org.dhis2.fhir.adapter.fhir.data.repository; + +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import org.dhis2.fhir.adapter.fhir.data.model.SubscriptionFhirResource; +import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServerResource; +import org.dhis2.fhir.adapter.fhir.model.FhirVersion; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Optional; + +/** + * Custom repository that contains {@link org.dhis2.fhir.adapter.fhir.data.model.SubscriptionFhirResource}s. + * + * @author volsch + */ +public interface CustomSubscriptionFhirResourceRepository +{ + void enqueue( @Nonnull FhirServerResource fhirServerResource, @Nullable String contentType, @Nonnull FhirVersion fhirVersion, @Nonnull String fhirResourceId, @Nonnull String fhirResource ); + + boolean deleteEnqueued( @Nonnull SubscriptionFhirResource subscriptionFhirResource ); + + @Nonnull + Optional findResource( @Nonnull FhirServerResource fhirServerResource, @Nonnull String fhirResourceId ); +} diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/SubscriptionFhirResourceRepository.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/SubscriptionFhirResourceRepository.java new file mode 100644 index 00000000..6851075b --- /dev/null +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/SubscriptionFhirResourceRepository.java @@ -0,0 +1,43 @@ +package org.dhis2.fhir.adapter.fhir.data.repository; + +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import org.dhis2.fhir.adapter.fhir.data.model.SubscriptionFhirResource; +import org.springframework.data.jpa.repository.JpaRepository; + +import java.util.UUID; + +/** + * Repository that contains {@link SubscriptionFhirResource}s. + * + * @author volsch + */ +public interface SubscriptionFhirResourceRepository extends JpaRepository, CustomSubscriptionFhirResourceRepository +{ +} diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomSubscriptionFhirResourceRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomSubscriptionFhirResourceRepositoryImpl.java new file mode 100644 index 00000000..3d491487 --- /dev/null +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomSubscriptionFhirResourceRepositoryImpl.java @@ -0,0 +1,202 @@ +package org.dhis2.fhir.adapter.fhir.data.repository.impl; + +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import org.dhis2.fhir.adapter.data.repository.TooManyPersistRetriesException; +import org.dhis2.fhir.adapter.fhir.data.model.SubscriptionFhirResource; +import org.dhis2.fhir.adapter.fhir.data.repository.CustomSubscriptionFhirResourceRepository; +import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServerResource; +import org.dhis2.fhir.adapter.fhir.model.FhirVersion; +import org.dhis2.fhir.adapter.util.SqlExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.dao.support.DataAccessUtils; +import org.springframework.dao.support.PersistenceExceptionTranslator; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.DefaultTransactionDefinition; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.persistence.EntityManager; +import javax.persistence.LockModeType; +import javax.persistence.NoResultException; +import javax.persistence.PersistenceContext; +import javax.persistence.PersistenceException; +import java.time.Instant; +import java.util.Optional; + +/** + * Implementation of {@link CustomSubscriptionFhirResourceRepository}. + * + * @author volsch + */ +public class CustomSubscriptionFhirResourceRepositoryImpl implements CustomSubscriptionFhirResourceRepository +{ + private final Logger logger = LoggerFactory.getLogger( CustomSubscriptionFhirResourceRepositoryImpl.class ); + + protected static final int MAX_TRY_COUNT = 3; + + @PersistenceContext + private EntityManager entityManager; + + private PlatformTransactionManager platformTransactionManager; + + private PersistenceExceptionTranslator persistenceExceptionTranslator; + + public CustomSubscriptionFhirResourceRepositoryImpl( @Nonnull EntityManager entityManager, + @Nonnull PlatformTransactionManager platformTransactionManager, + @Nonnull @Qualifier( "&entityManagerFactory" ) PersistenceExceptionTranslator persistenceExceptionTranslator ) + { + this.entityManager = entityManager; + this.platformTransactionManager = platformTransactionManager; + this.persistenceExceptionTranslator = persistenceExceptionTranslator; + } + + @Override + @Transactional( propagation = Propagation.NOT_SUPPORTED ) + public void enqueue( @Nonnull FhirServerResource fhirServerResource, @Nullable String contentType, @Nonnull FhirVersion fhirVersion, @Nonnull String fhirResourceId, @Nonnull String fhirResource ) + { + enqueue( fhirServerResource, contentType, fhirVersion, fhirResourceId, fhirResource, 1 ); + } + + protected void enqueue( @Nonnull FhirServerResource fhirServerResource, @Nullable String contentType, @Nonnull FhirVersion fhirVersion, @Nonnull String fhirResourceId, @Nonnull String fhirResource, int tryCount ) + { + SubscriptionFhirResource subscriptionFhirResource = new SubscriptionFhirResource(); + subscriptionFhirResource.setCreatedAt( Instant.now() ); + subscriptionFhirResource.setFhirServerResource( fhirServerResource ); + subscriptionFhirResource.setContentType( contentType ); + subscriptionFhirResource.setFhirVersion( fhirVersion ); + subscriptionFhirResource.setFhirResourceId( fhirResourceId ); + subscriptionFhirResource.setFhirResource( fhirResource ); + + TransactionStatus transactionStatus = platformTransactionManager.getTransaction( new DefaultTransactionDefinition() ); + try + { + // FHIR server resource may be not a valid entity and must be reloaded (cached before) + subscriptionFhirResource.setFhirServerResource( entityManager.getReference( FhirServerResource.class, fhirServerResource.getId() ) ); + + entityManager.persist( subscriptionFhirResource ); + entityManager.flush(); + return; + } + catch ( PersistenceException e ) + { + // may have been inserted already and not been processed and needs to be updated + final RuntimeException runtimeException = DataAccessUtils.translateIfNecessary( e, persistenceExceptionTranslator ); + if ( !(runtimeException instanceof DataIntegrityViolationException) || + !SqlExceptionUtils.isUniqueKeyViolation( ((DataIntegrityViolationException) runtimeException).getMostSpecificCause() ) ) + { + throw runtimeException; + } + logger.debug( "FHIR Server Resource " + fhirServerResource.getId() + " contains already FHIR Resource " + fhirResourceId, e ); + } + finally + { + completeTransaction( transactionStatus ); + } + + transactionStatus = platformTransactionManager.getTransaction( new DefaultTransactionDefinition() ); + try + { + subscriptionFhirResource = entityManager.createNamedQuery( SubscriptionFhirResource.RESOURCE_NAMED_QUERY, SubscriptionFhirResource.class ) + .setLockMode( LockModeType.PESSIMISTIC_WRITE ) + .setParameter( "fhirServerResource", fhirServerResource ) + .setParameter( "fhirResourceId", fhirResourceId ).getSingleResult(); + subscriptionFhirResource.setCreatedAt( Instant.now() ); + subscriptionFhirResource.setContentType( contentType ); + subscriptionFhirResource.setFhirVersion( fhirVersion ); + subscriptionFhirResource.setFhirResource( fhirResource ); + } + catch ( NoResultException e ) + { + // may have been deleted again in the meantime (dequeued) + if ( tryCount + 1 > MAX_TRY_COUNT ) + { + throw new TooManyPersistRetriesException( "Storing subscription FHIR resource has been retried too many times." ); + } + logger.debug( "FHIR Server Resource " + fhirServerResource.getId() + " does no longer contain FHIR Resource " + fhirResourceId, e ); + enqueue( fhirServerResource, contentType, fhirVersion, fhirResourceId, fhirResource, tryCount + 1 ); + } + finally + { + completeTransaction( transactionStatus ); + } + } + + @Override + @Transactional + public boolean deleteEnqueued( @Nonnull SubscriptionFhirResource subscriptionFhirResource ) + { + final SubscriptionFhirResource sfr = entityManager.find( SubscriptionFhirResource.class, subscriptionFhirResource.getId(), LockModeType.PESSIMISTIC_WRITE ); + if ( sfr == null ) + { + return false; + } + if ( !sfr.getCreatedAt().equals( subscriptionFhirResource.getCreatedAt() ) ) + { + // has been updated in the meantime and will be reused for further processing + return false; + } + entityManager.remove( sfr ); + return true; + } + + @Nonnull + @Override + public Optional findResource( @Nonnull FhirServerResource fhirServerResource, @Nonnull String fhirResourceId ) + { + try + { + return Optional.of( entityManager.createNamedQuery( SubscriptionFhirResource.RESOURCE_NAMED_QUERY, SubscriptionFhirResource.class ) + .setParameter( "fhirServerResource", fhirServerResource ) + .setParameter( "fhirResourceId", fhirResourceId ).getSingleResult() ); + } + catch ( NoResultException e ) + { + return Optional.empty(); + } + } + + private void completeTransaction( @Nonnull TransactionStatus transactionStatus ) + { + if ( transactionStatus.isRollbackOnly() ) + { + platformTransactionManager.rollback( transactionStatus ); + } + else + { + platformTransactionManager.commit( transactionStatus ); + } + } +} diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/FhirResource.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/FhirResource.java index d1206c66..2691e93b 100644 --- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/FhirResource.java +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/FhirResource.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.fhir.repository; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -50,14 +50,17 @@ public class FhirResource extends DataItemQueueItem implements { private static final long serialVersionUID = 1642564911249098319L; + private boolean persistedDataItem; + public FhirResource() { super(); } - public FhirResource( @Nonnull UuidDataGroupId dataGroupId, @Nonnull ProcessedItemInfo processedItemInfo ) + public FhirResource( @Nonnull UuidDataGroupId dataGroupId, @Nonnull ProcessedItemInfo processedItemInfo, boolean persistedDataItem ) { super( dataGroupId, processedItemInfo ); + this.persistedDataItem = persistedDataItem; } @JsonIgnore @@ -110,4 +113,30 @@ public void setVersion( String version ) { super.setVersion( version ); } + + public boolean isPersistedDataItem() + { + return persistedDataItem; + } + + public void setPersistedDataItem( boolean persistedDataItem ) + { + this.persistedDataItem = persistedDataItem; + } + + @JsonIgnore + public String getIdPart() + { + final String id = getId(); + if ( id == null ) + { + return null; + } + final int index = id.indexOf( '/' ); + if ( index < 0 ) + { + return id; + } + return id.substring( index + 1 ); + } } diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java index ee518a78..a9fd7ebe 100644 --- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.fhir.repository.impl; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -28,6 +28,7 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +import ca.uhn.fhir.context.FhirContext; import com.google.common.collect.ArrayListMultimap; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.dhis2.fhir.adapter.auth.Authorization; @@ -40,8 +41,10 @@ import org.dhis2.fhir.adapter.dhis.model.DhisResource; import org.dhis2.fhir.adapter.dhis.sync.DhisResourceRepository; import org.dhis2.fhir.adapter.fhir.data.model.QueuedFhirResourceId; +import org.dhis2.fhir.adapter.fhir.data.model.SubscriptionFhirResource; import org.dhis2.fhir.adapter.fhir.data.repository.FhirDhisAssignmentRepository; import org.dhis2.fhir.adapter.fhir.data.repository.QueuedFhirResourceRepository; +import org.dhis2.fhir.adapter.fhir.data.repository.SubscriptionFhirResourceRepository; import org.dhis2.fhir.adapter.fhir.metadata.model.AuthenticationMethod; import org.dhis2.fhir.adapter.fhir.metadata.model.FhirResourceType; import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServer; @@ -65,6 +68,7 @@ import org.dhis2.fhir.adapter.fhir.transform.fhir.model.FhirRequestMethod; import org.dhis2.fhir.adapter.fhir.transform.fhir.model.ResourceSystem; import org.dhis2.fhir.adapter.fhir.transform.fhir.model.WritableFhirRequest; +import org.dhis2.fhir.adapter.fhir.util.FhirParserUtils; import org.dhis2.fhir.adapter.lock.LockContext; import org.dhis2.fhir.adapter.lock.LockManager; import org.dhis2.fhir.adapter.queue.RetryQueueDeliveryException; @@ -116,6 +120,8 @@ public class FhirRepositoryImpl implements FhirRepository private final QueuedFhirResourceRepository queuedFhirResourceRepository; + private final SubscriptionFhirResourceRepository subscriptionFhirResourceRepository; + private final StoredFhirResourceService storedItemService; private final FhirResourceRepository fhirResourceRepository; @@ -133,9 +139,12 @@ public FhirRepositoryImpl( @Nonnull AuthorizationContext authorizationContext, @Nonnull FhirServerSystemRepository fhirServerSystemRepository, @Nonnull FhirServerResourceRepository fhirServerResourceRepository, @Nonnull QueuedFhirResourceRepository queuedFhirResourceRepository, + @Nonnull SubscriptionFhirResourceRepository subscriptionFhirResourceRepository, @Nonnull StoredFhirResourceService storedItemService, - @Nonnull FhirResourceRepository fhirResourceRepository, @Nonnull FhirToDhisTransformerService fhirToDhisTransformerService, - @Nonnull DhisResourceRepository dhisResourceRepository, @Nonnull FhirDhisAssignmentRepository fhirDhisAssignmentRepository ) + @Nonnull FhirResourceRepository fhirResourceRepository, + @Nonnull FhirToDhisTransformerService fhirToDhisTransformerService, + @Nonnull DhisResourceRepository dhisResourceRepository, + @Nonnull FhirDhisAssignmentRepository fhirDhisAssignmentRepository ) { this.authorizationContext = authorizationContext; this.lockManager = lockManager; @@ -143,6 +152,7 @@ public FhirRepositoryImpl( @Nonnull AuthorizationContext authorizationContext, this.fhirServerSystemRepository = fhirServerSystemRepository; this.fhirServerResourceRepository = fhirServerResourceRepository; this.queuedFhirResourceRepository = queuedFhirResourceRepository; + this.subscriptionFhirResourceRepository = subscriptionFhirResourceRepository; this.storedItemService = storedItemService; this.fhirResourceRepository = fhirResourceRepository; this.fhirToDhisTransformerService = fhirToDhisTransformerService; @@ -216,9 +226,19 @@ protected void receiveAuthenticated( @Nonnull FhirResource fhirResource ) } final FhirServer fhirServer = fhirServerResource.getFhirServer(); - final Optional resource = fhirResourceRepository.findRefreshed( - fhirServer.getId(), fhirServer.getFhirVersion(), fhirServer.getFhirEndpoint(), - fhirServerResource.getFhirResourceType().getResourceTypeName(), fhirResource.getId() ); + final SubscriptionFhirResource subscriptionFhirResource = subscriptionFhirResourceRepository.findResource( fhirServerResource, fhirResource.getIdPart() ).orElse( null ); + final Optional resource; + if ( fhirResource.isPersistedDataItem() ) + { + resource = getParsedFhirResource( fhirResource, fhirServerResource, subscriptionFhirResource ); + } + else + { + resource = fhirResourceRepository.findRefreshed( + fhirServer.getId(), fhirServer.getFhirVersion(), fhirServer.getFhirEndpoint(), + fhirServerResource.getFhirResourceType().getResourceTypeName(), fhirResource.getId() ); + } + if ( resource.isPresent() ) { final ProcessedItemInfo processedItemInfo = ProcessedFhirItemInfoUtils.create( resource.get() ); @@ -231,8 +251,8 @@ protected void receiveAuthenticated( @Nonnull FhirResource fhirResource ) { try ( final MDC.MDCCloseable c = MDC.putCloseable( "fhirId", fhirServerResource.getId() + ":" + resource.get().getIdElement().toUnqualifiedVersionless() ) ) { - logger.info( "Processing FHIR resource {} of FHIR server resource {}.", - resource.get().getIdElement().toUnqualified(), fhirServerResource.getId() ); + logger.info( "Processing FHIR resource {} of FHIR server resource {} (persisted={}).", + resource.get().getIdElement().toUnqualified(), fhirServerResource.getId(), fhirResource.isPersistedDataItem() ); try { save( fhirServerResource, resource.get() ); @@ -254,9 +274,33 @@ protected void receiveAuthenticated( @Nonnull FhirResource fhirResource ) } else { - logger.info( "FHIR resource {}/{} for FHIR server resource {} is no longer available. Skipping processing of updated FHIR resource.", - fhirServerResource.getFhirResourceType().getResourceTypeName(), fhirResource.getId(), fhirServerResource.getId() ); + logger.info( "FHIR resource {}/{} for FHIR server resource {} is no longer available (persisted={}). Skipping processing of updated FHIR resource.", + fhirServerResource.getFhirResourceType().getResourceTypeName(), fhirResource.getId(), fhirServerResource.getId(), fhirResource.isPersistedDataItem() ); + } + + // must not be deleted before since it is still required when a retry must be performed + if ( subscriptionFhirResource != null ) + { + subscriptionFhirResourceRepository.deleteEnqueued( subscriptionFhirResource ); + } + } + + @Nonnull + private Optional getParsedFhirResource( @Nonnull FhirResource fhirResource, @Nonnull FhirServerResource fhirServerResource, @Nullable SubscriptionFhirResource subscriptionFhirResource ) + { + final Optional resource; + if ( subscriptionFhirResource == null ) + { + resource = Optional.empty(); + } + else + { + final FhirContext fhirContext = fhirResourceRepository.findFhirContext( subscriptionFhirResource.getFhirVersion() ) + .orElseThrow( () -> new FatalTransformerException( "FHIR context for FHIR version " + subscriptionFhirResource.getFhirVersion() + " has not been configured." ) ); + resource = Optional.of( FhirParserUtils.parse( fhirContext, + subscriptionFhirResource.getFhirResource(), subscriptionFhirResource.getContentType() ) ); } + return resource; } protected boolean saveRetriedWithoutTrackedEntityInstance( @Nonnull FhirServerResource fhirServerResource, @Nonnull IBaseResource resource ) diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/script/ScriptExecutionContext.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/script/ScriptExecutionContext.java index 694f4909..7de25e1d 100644 --- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/script/ScriptExecutionContext.java +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/script/ScriptExecutionContext.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.fhir.script; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,8 +53,8 @@ public interface ScriptExecutionContext * * @param scriptExecution the script execution that should be used. * @return the script execution context that has been replaced by the specified - * script execution context or null if there was no - * script execution context in the current scope. + * script execution context or null if there was no + * script execution context in the current scope. */ @Nullable ScriptExecution setScriptExecution( @Nonnull ScriptExecution scriptExecution ); diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookController.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookController.java index f8368ac1..c9a7bd52 100644 --- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookController.java +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookController.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.fhir.server; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,14 +35,22 @@ import org.dhis2.fhir.adapter.rest.RestUnauthorizedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.UUID; /** @@ -70,27 +78,58 @@ public FhirServerRestHookController( @Nonnull FhirServerResourceRepository resou this.processor = processor; } - @PutMapping( path = "/{subscriptionId}/{fhirServerResourceId}/**" ) - public void receiveWithPayload( @PathVariable UUID subscriptionId, @PathVariable UUID fhirServerResourceId, - @RequestHeader( value = "Authorization", required = false ) String authorization ) + @RequestMapping( path = "/{fhirServerId}/{fhirServerResourceId}/{resourceType}/{resourceId}/**", method = { RequestMethod.POST, RequestMethod.PUT } ) + public ResponseEntity receiveWithPayloadAndHistory( + @PathVariable( "fhirServerId" ) UUID fhirServerId, @PathVariable( "fhirServerResourceId" ) UUID fhirServerResourceId, + @PathVariable( "resourceType" ) String resourceType, @PathVariable( "resourceId" ) String resourceId, + @RequestHeader( value = "Authorization", required = false ) String authorization, + @Nonnull HttpEntity requestEntity ) { - receive( subscriptionId, fhirServerResourceId, authorization ); + return receiveWithPayload( fhirServerId, fhirServerResourceId, resourceType, resourceId, authorization, requestEntity ); } - @PostMapping( path = "/{subscriptionId}/{fhirServerResourceId}" ) - public void receive( @PathVariable UUID subscriptionId, @PathVariable UUID fhirServerResourceId, + @RequestMapping( path = "/{fhirServerId}/{fhirServerResourceId}/{resourceType}/{resourceId}", method = { RequestMethod.POST, RequestMethod.PUT } ) + public ResponseEntity receiveWithPayload( + @PathVariable( "fhirServerId" ) UUID fhirServerId, @PathVariable( "fhirServerResourceId" ) UUID fhirServerResourceId, + @PathVariable( "resourceType" ) String resourceType, @PathVariable( "resourceId" ) String resourceId, + @RequestHeader( value = "Authorization", required = false ) String authorization, + @Nonnull HttpEntity requestEntity ) + { + if ( (requestEntity.getBody() == null) || (requestEntity.getBody().length == 0) ) + { + return createBadRequestResponse( "Payload expected." ); + } + + final FhirServerResource fhirServerResource = lookupFhirServerResource( fhirServerId, fhirServerResourceId, authorization ); + final MediaType mediaType = requestEntity.getHeaders().getContentType(); + final String fhirResource = new String( requestEntity.getBody(), getCharset( mediaType ) ); + processor.process( fhirServerResource, (mediaType == null) ? null : mediaType.toString(), + resourceType, resourceId, fhirResource ); + + return new ResponseEntity<>( HttpStatus.OK ); + } + + @PostMapping( path = "/{fhirServerId}/{fhirServerResourceId}" ) + public void receive( @PathVariable UUID fhirServerId, @PathVariable UUID fhirServerResourceId, @RequestHeader( value = "Authorization", required = false ) String authorization ) + { + final FhirServerResource fhirServerResource = lookupFhirServerResource( fhirServerId, fhirServerResourceId, authorization ); + processor.process( fhirServerResource ); + } + + @Nonnull + protected FhirServerResource lookupFhirServerResource( @Nonnull UUID fhirServerId, @Nonnull UUID fhirServerResourceId, String authorization ) { final FhirServerResource fhirServerResource = resourceRepository.findOneByIdCached( fhirServerResourceId ) .orElseThrow( () -> new RestResourceNotFoundException( "FHIR server data for resource cannot be found: " + fhirServerResourceId ) ); - if ( !fhirServerResource.getFhirServer().getId().equals( subscriptionId ) ) + if ( !fhirServerResource.getFhirServer().getId().equals( fhirServerId ) ) { // do not give detail if the resource or the subscription cannot be found throw new RestResourceNotFoundException( "FHIR server data for resource cannot be found: " + fhirServerResourceId ); } if ( fhirServerResource.isExpOnly() ) { - throw new RestResourceNotFoundException( "Subscription resource is intended for export only: " + fhirServerResourceId ); + throw new RestResourceNotFoundException( "FHIR server resource is intended for export only: " + fhirServerResourceId ); } if ( StringUtils.isNotBlank( fhirServerResource.getFhirServer().getAdapterEndpoint().getAuthorizationHeader() ) && @@ -98,7 +137,34 @@ public void receive( @PathVariable UUID subscriptionId, @PathVariable UUID fhirS { throw new RestUnauthorizedException( "Authentication has failed." ); } + return fhirServerResource; + } - processor.process( fhirServerResource ); + @Nonnull + private Charset getCharset( @Nullable MediaType contentType ) + { + Charset charset; + if ( contentType == null ) + { + charset = StandardCharsets.UTF_8; + } + else + { + charset = contentType.getCharset(); + if ( charset == null ) + { + charset = StandardCharsets.UTF_8; + } + } + return charset; + } + + @Nonnull + private ResponseEntity createBadRequestResponse( @Nonnull String message ) + { + final HttpHeaders headers = new HttpHeaders(); + headers.add( HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8" ); + return new ResponseEntity<>( + message.getBytes( StandardCharsets.UTF_8 ), headers, HttpStatus.BAD_REQUEST ); } } diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookProcessor.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookProcessor.java index a2824a0b..66c6caf8 100644 --- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookProcessor.java +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/FhirServerRestHookProcessor.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.fhir.server; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,6 +31,9 @@ import org.dhis2.fhir.adapter.data.processor.QueuedDataProcessor; import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServerResource; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + /** * Processes incoming web hook requests in two steps. First the web hook request * is added to a queue (if there are not yet any web hook requests). Then the web @@ -41,4 +44,5 @@ */ public interface FhirServerRestHookProcessor extends QueuedDataProcessor { + void process( @Nonnull FhirServerResource fhirServerResource, @Nullable String contentType, @Nonnull String fhirResourceType, @Nonnull String fhirResourceId, @Nonnull String fhirResource ); } diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/impl/FhirServerRestHookProcessorImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/impl/FhirServerRestHookProcessorImpl.java index 8df1e679..f82c2591 100644 --- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/impl/FhirServerRestHookProcessorImpl.java +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/server/impl/FhirServerRestHookProcessorImpl.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.fhir.server.impl; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -28,6 +28,7 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +import ca.uhn.fhir.context.FhirContext; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.dhis2.fhir.adapter.data.model.ProcessedItemInfo; import org.dhis2.fhir.adapter.data.model.UuidDataGroupId; @@ -45,6 +46,8 @@ import org.dhis2.fhir.adapter.fhir.data.repository.ProcessedFhirResourceRepository; import org.dhis2.fhir.adapter.fhir.data.repository.QueuedFhirResourceRepository; import org.dhis2.fhir.adapter.fhir.data.repository.QueuedFhirServerRequestRepository; +import org.dhis2.fhir.adapter.fhir.data.repository.SubscriptionFhirResourceRepository; +import org.dhis2.fhir.adapter.fhir.metadata.model.FhirResourceType; import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServer; import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServerResource; import org.dhis2.fhir.adapter.fhir.metadata.repository.FhirServerResourceRepository; @@ -52,8 +55,12 @@ import org.dhis2.fhir.adapter.fhir.model.FhirVersion; import org.dhis2.fhir.adapter.fhir.repository.FhirResource; import org.dhis2.fhir.adapter.fhir.server.FhirServerRestHookProcessor; +import org.dhis2.fhir.adapter.fhir.server.ProcessedFhirItemInfoUtils; import org.dhis2.fhir.adapter.fhir.server.StoredFhirResourceService; +import org.dhis2.fhir.adapter.fhir.util.FhirParserException; +import org.dhis2.fhir.adapter.fhir.util.FhirParserUtils; import org.dhis2.fhir.adapter.security.SystemAuthenticationToken; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.ObjectProvider; @@ -73,7 +80,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; /** * Implementation of {@link FhirServerRestHookProcessor}. @@ -88,10 +97,14 @@ public class FhirServerRestHookProcessorImpl extends { private final Logger logger = LoggerFactory.getLogger( getClass() ); + private final Map fhirContexts; + private final FhirServerProcessorConfig processorConfig; private final FhirServerResourceRepository fhirServerResourceRepository; + private final SubscriptionFhirResourceRepository subscriptionFhirResourceRepository; + private final Map itemRetrievers = new HashMap<>(); public FhirServerRestHookProcessorImpl( @@ -106,13 +119,18 @@ public FhirServerRestHookProcessorImpl( @Nonnull FhirServerProcessorConfig processorConfig, @Nonnull SystemAuthenticationToken systemAuthenticationToken, @Nonnull FhirServerResourceRepository fhirServerResourceRepository, - @Nonnull ObjectProvider> itemRetrievers ) + @Nonnull SubscriptionFhirResourceRepository subscriptionFhirResourceRepository, + @Nonnull ObjectProvider> itemRetrievers, + @Nonnull Set fhirContexts ) { super( queuedGroupRepository, groupQueueJmsTemplate, dataGroupUpdateRepository, storedItemService, processedItemRepository, queuedItemRepository, itemQueueJmsTemplate, platformTransactionManager, systemAuthenticationToken, new ForkJoinPool( processorConfig.getParallelCount() ) ); this.processorConfig = processorConfig; this.fhirServerResourceRepository = fhirServerResourceRepository; + this.subscriptionFhirResourceRepository = subscriptionFhirResourceRepository; + this.fhirContexts = fhirContexts.stream().filter( fc -> (FhirVersion.get( fc.getVersion().getVersion() ) != null) ) + .collect( Collectors.toMap( fc -> FhirVersion.get( fc.getVersion().getVersion() ), fc -> fc ) ); itemRetrievers.getIfAvailable( Collections::emptyList ).forEach( br -> { for ( final FhirVersion version : br.getFhirVersions() ) { @@ -121,6 +139,38 @@ public FhirServerRestHookProcessorImpl( } ); } + @HystrixCommand + @Transactional( propagation = Propagation.REQUIRES_NEW ) + @Override + public void process( @Nonnull FhirServerResource fhirServerResource, @Nullable String contentType, @Nonnull String fhirResourceType, @Nonnull String fhirResourceId, @Nonnull String fhirResource ) + { + final FhirVersion fhirVersion = fhirServerResource.getFhirServer().getFhirVersion(); + final FhirContext fhirContext = fhirContexts.get( fhirVersion ); + if ( fhirContext == null ) + { + throw new IllegalStateException( "No FHIR Context for FHIR version " + fhirVersion + " has been configured." ); + } + + final IBaseResource parsedFhirResource = FhirParserUtils.parse( fhirContext, fhirResource, contentType ); + final FhirResourceType parsedFhirResourceType = FhirResourceType.getByResource( parsedFhirResource ); + if ( !fhirServerResource.getFhirResourceType().equals( parsedFhirResourceType ) ) + { + throw new FhirParserException( "Received FHIR resource " + parsedFhirResourceType + " does not match FHIR resource type " + fhirServerResource.getFhirResourceType() + " of FHIR server resource." ); + } + if ( !fhirResourceType.equals( parsedFhirResourceType.getResourceTypeName() ) ) + { + throw new FhirParserException( "Received FHIR resource type " + parsedFhirResourceType + " does not match FHIR resource ID " + fhirResourceType + " of FHIR subscription notification." ); + } + if ( !fhirResourceId.equals( parsedFhirResource.getIdElement().getIdPart() ) ) + { + throw new FhirParserException( "Received FHIR resource type " + parsedFhirResource.getIdElement().getIdPart() + " does not match FHIR resource ID " + fhirResourceId + " of FHIR subscription notification." ); + } + + final ProcessedItemInfo processedItemInfo = ProcessedFhirItemInfoUtils.create( parsedFhirResource ); + subscriptionFhirResourceRepository.enqueue( fhirServerResource, contentType, fhirVersion, fhirResourceId, fhirResource ); + super.enqueueDataItem( fhirServerResource, processedItemInfo, true, false ); + } + @HystrixCommand @Transactional( propagation = Propagation.NOT_SUPPORTED ) @JmsListener( destination = "#{@fhirServerConfig.restHookRequestQueue.queueName}", @@ -199,8 +249,8 @@ protected QueuedFhirResourceId createQueuedItemId( @Nonnull FhirServerResource g @Nonnull @Override - protected DataItemQueueItem createDataItemQueueItem( @Nonnull FhirServerResource group, @Nonnull ProcessedItemInfo processedItemInfo ) + protected DataItemQueueItem createDataItemQueueItem( @Nonnull FhirServerResource group, @Nonnull ProcessedItemInfo processedItemInfo, boolean persistedDataItem ) { - return new FhirResource( group.getGroupId(), processedItemInfo ); + return new FhirResource( group.getGroupId(), processedItemInfo, persistedDataItem ); } } diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/transform/dhis/impl/AbstractDhisToFhirTransformer.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/transform/dhis/impl/AbstractDhisToFhirTransformer.java index c85177f6..f92224c6 100644 --- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/transform/dhis/impl/AbstractDhisToFhirTransformer.java +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/transform/dhis/impl/AbstractDhisToFhirTransformer.java @@ -1,7 +1,7 @@ package org.dhis2.fhir.adapter.fhir.transform.dhis.impl; /* - * Copyright (c) 2004-2018, University of Oslo + * Copyright (c) 2004-2019, University of Oslo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -474,8 +474,7 @@ private class TrackedEntityIdentifierValueProvider implements IdentifierValuePro @Nullable @Override public String getIdentifierValue( @Nonnull DhisToFhirTransformerContext context, @Nonnull RuleInfo ruleInfo, @Nullable ExecutableScript identifierLookupScript, @Nonnull ScriptedTrackedEntityInstance scriptedDhisResource, - @Nonnull Map scriptVariables ) + @Nonnull Map scriptVariables ) { if ( identifierLookupScript != null ) { diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserException.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserException.java new file mode 100644 index 00000000..86ad6752 --- /dev/null +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserException.java @@ -0,0 +1,49 @@ +package org.dhis2.fhir.adapter.fhir.util; + +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Thrown if the FHIR resource cannot be parsed. + * + * @author volsch + */ +public class FhirParserException extends RuntimeException +{ + private static final long serialVersionUID = 2169213509672577359L; + + public FhirParserException( String message ) + { + super( message ); + } + + public FhirParserException( String message, Throwable cause ) + { + super( message, cause ); + } +} diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserUtils.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserUtils.java new file mode 100644 index 00000000..82f80e81 --- /dev/null +++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/util/FhirParserUtils.java @@ -0,0 +1,89 @@ +package org.dhis2.fhir.adapter.fhir.util; + +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.parser.DataFormatException; +import org.hl7.fhir.instance.model.api.IBaseResource; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * FHIR parser utilities for parsing JSON and XML. + * + * @author volsch + */ +public abstract class FhirParserUtils +{ + @Nonnull + public static IBaseResource parse( @Nonnull FhirContext fhirContext, @Nonnull String resource, @Nullable String contentType ) throws FhirParserException + { + boolean xml; + if ( contentType == null ) + { + int xmlIndex = resource.indexOf( '<' ); + if ( xmlIndex < 0 ) + { + xmlIndex = Integer.MAX_VALUE; + } + int jsonIndex = resource.indexOf( '{' ); + if ( jsonIndex < 0 ) + { + jsonIndex = Integer.MAX_VALUE; + } + xml = xmlIndex < jsonIndex; + } + else + { + xml = contentType.toLowerCase().contains( "xml" ); + } + + try + { + if ( xml ) + { + return fhirContext.newXmlParser().parseResource( resource ); + } + else + { + return fhirContext.newJsonParser().parseResource( resource ); + } + } + catch ( DataFormatException e ) + { + throw new FhirParserException( "Could not parse FHIR resource: " + e.getMessage(), e ); + } + } + + private FhirParserUtils() + { + super(); + } +} diff --git a/fhir/src/main/resources/db/migration/production/V1.1.0.6_0_0__Advanced_Subscription.sql b/fhir/src/main/resources/db/migration/production/V1.1.0.6_0_0__Advanced_Subscription.sql new file mode 100644 index 00000000..6c2c1c8f --- /dev/null +++ b/fhir/src/main/resources/db/migration/production/V1.1.0.6_0_0__Advanced_Subscription.sql @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2004-2019, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO PROGRAM_STAGE_EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +-- @formatter:off + +CREATE TABLE fhir_subscription_resource( + id UUID NOT NULL, + created_at TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP NOT NULL, + fhir_server_resource_id UUID NOT NULL, + fhir_resource_id VARCHAR(200) NOT NULL, + content_type VARCHAR(100), + fhir_version VARCHAR(30) NOT NULL, + fhir_resource TEXT, + CONSTRAINT fhir_subscription_resource_pk PRIMARY KEY(id), + CONSTRAINT fhir_subscription_resource_fk1 FOREIGN KEY(fhir_server_resource_id) + REFERENCES fhir_server_resource(id) ON DELETE CASCADE, + CONSTRAINT fhir_subscription_resource_uk1 UNIQUE(fhir_server_resource_id, fhir_resource_id) +); +CREATE INDEX fhir_subscription_resource_i1 ON fhir_subscription_resource(fhir_server_resource_id, created_at);