+{
+ void process( @Nonnull G group );
+}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookProcessorException.java b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/QueuedDataProcessorException.java
similarity index 84%
rename from fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookProcessorException.java
rename to common/src/main/java/org/dhis2/fhir/adapter/data/processor/QueuedDataProcessorException.java
index 6e2d3f6b..da442c81 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookProcessorException.java
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/QueuedDataProcessorException.java
@@ -1,4 +1,4 @@
-package org.dhis2.fhir.adapter.fhir.remote;
+package org.dhis2.fhir.adapter.data.processor;
/*
* Copyright (c) 2004-2018, University of Oslo
@@ -29,20 +29,20 @@
*/
/**
- * Thrown if a critical error occurred while processing the web hook.
+ * Thrown if a critical error occurred while processing the data.
*
* @author volsch
*/
-public class RemoteRestHookProcessorException extends RuntimeException
+public class QueuedDataProcessorException extends RuntimeException
{
private static final long serialVersionUID = -4539433728243920804L;
- public RemoteRestHookProcessorException( String message )
+ public QueuedDataProcessorException( String message )
{
super( message );
}
- public RemoteRestHookProcessorException( String message, Throwable cause )
+ public QueuedDataProcessorException( String message, Throwable cause )
{
super( message, cause );
}
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/AbstractQueuedDataProcessorImpl.java b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/AbstractQueuedDataProcessorImpl.java
new file mode 100644
index 00000000..cbe63cf3
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/AbstractQueuedDataProcessorImpl.java
@@ -0,0 +1,278 @@
+package org.dhis2.fhir.adapter.data.processor.impl;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.DataGroupId;
+import org.dhis2.fhir.adapter.data.model.DataGroupUpdate;
+import org.dhis2.fhir.adapter.data.model.ProcessedItem;
+import org.dhis2.fhir.adapter.data.model.ProcessedItemId;
+import org.dhis2.fhir.adapter.data.model.ProcessedItemInfo;
+import org.dhis2.fhir.adapter.data.model.QueuedItemId;
+import org.dhis2.fhir.adapter.data.processor.DataItemQueueItem;
+import org.dhis2.fhir.adapter.data.processor.DataProcessorItemRetriever;
+import org.dhis2.fhir.adapter.data.processor.QueuedDataProcessor;
+import org.dhis2.fhir.adapter.data.repository.AlreadyQueuedException;
+import org.dhis2.fhir.adapter.data.repository.DataGroupUpdateRepository;
+import org.dhis2.fhir.adapter.data.repository.IgnoredQueuedItemException;
+import org.dhis2.fhir.adapter.data.repository.ProcessedItemRepository;
+import org.dhis2.fhir.adapter.data.repository.QueuedItemRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.DefaultTransactionDefinition;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract implementation of {@link QueuedDataProcessor}.
+ *
+ * @param the concrete type of the processed item.
+ * @param the concrete type of the ID of the processed item.
+ * @param the concrete type of the queued item that is used for queuing the data group.
+ * @param the concrete type of the queued item that is used for queuing the item.
+ * @param the concrete type of the group of the ID that is constant for a specific use case.
+ * @param the concrete type of the group ID of the group G
.
+ * @author volsch
+ */
+public abstract class AbstractQueuedDataProcessorImpl, PI extends ProcessedItemId, QG extends QueuedItemId, QI extends QueuedItemId, G extends DataGroup, GI extends DataGroupId> implements QueuedDataProcessor
+{
+ private final Logger logger = LoggerFactory.getLogger( getClass() );
+
+ private final QueuedItemRepository queuedGroupRepository;
+
+ private final JmsTemplate groupQueueJmsTemplate;
+
+ private final DataGroupUpdateRepository, G> dataGroupUpdateRepository;
+
+ private final ProcessedItemRepository processedItemRepository;
+
+ private final QueuedItemRepository queuedItemRepository;
+
+ private final JmsTemplate itemQueueJmsTemplate;
+
+ private final PlatformTransactionManager platformTransactionManager;
+
+ public AbstractQueuedDataProcessorImpl(
+ @Nonnull QueuedItemRepository queuedGroupRepository,
+ @Nonnull JmsTemplate groupQueueJmsTemplate,
+ @Nonnull DataGroupUpdateRepository, G> dataGroupUpdateRepository,
+ @Nonnull ProcessedItemRepository processedItemRepository,
+ @Nonnull QueuedItemRepository queuedItemRepository,
+ @Nonnull JmsTemplate itemQueueJmsTemplate,
+ @Nonnull PlatformTransactionManager platformTransactionManager )
+ {
+ this.queuedGroupRepository = queuedGroupRepository;
+ this.groupQueueJmsTemplate = groupQueueJmsTemplate;
+ this.dataGroupUpdateRepository = dataGroupUpdateRepository;
+ this.processedItemRepository = processedItemRepository;
+ this.queuedItemRepository = queuedItemRepository;
+ this.itemQueueJmsTemplate = itemQueueJmsTemplate;
+ this.platformTransactionManager = platformTransactionManager;
+ }
+
+ @HystrixCommand
+ @Override
+ public void process( @Nonnull G group )
+ {
+ final TransactionStatus transactionStatus = platformTransactionManager.getTransaction( new DefaultTransactionDefinition() );
+ try
+ {
+ logger.debug( "Checking for a queued entry of group {}.", group.getGroupId() );
+ try
+ {
+ queuedGroupRepository.enqueue( createQueuedGroupId( group ) );
+ }
+ catch ( AlreadyQueuedException e )
+ {
+ logger.debug( "There is already a queued entry for group {}.", group.getGroupId() );
+ return;
+ }
+ catch ( IgnoredQueuedItemException e )
+ {
+ // has already been logger with sufficient details
+ return;
+ }
+
+ logger.debug( "Enqueuing entry for group {}.", group.getGroupId() );
+ groupQueueJmsTemplate.convertAndSend( createDataGroupQueueItem( group ), message -> {
+ // only one message for a single group must be processed at a specific time (grouping)
+ message.setStringProperty( "JMSXGroupID", group.getGroupId().toString() );
+ return message;
+ } );
+ logger.info( "Enqueued entry for group {}.", group.getGroupId() );
+ }
+ finally
+ {
+ finalizeTransaction( transactionStatus );
+ }
+ }
+
+ protected void receive( @Nonnull DataGroupQueueItem dataGroupQueueItem )
+ {
+ SecurityContextHolder.getContext().setAuthentication( createAuthentication() );
+ try
+ {
+ receiveAuthenticated( dataGroupQueueItem );
+ }
+ finally
+ {
+ SecurityContextHolder.clearContext();
+ }
+ }
+
+ protected void receiveAuthenticated( @Nonnull DataGroupQueueItem dataGroupQueueItem )
+ {
+ logger.info( "Processing queued group {}.", dataGroupQueueItem.getDataGroupId() );
+ final G group = findGroupByGroupId( dataGroupQueueItem.getDataGroupId() );
+ if ( group == null )
+ {
+ logger.warn( "Group {} is no longer available. Skipping processing of updated group.",
+ dataGroupQueueItem.getDataGroupId() );
+ return;
+ }
+
+ try
+ {
+ queuedGroupRepository.dequeued( createQueuedGroupId( group ) );
+ }
+ catch ( IgnoredQueuedItemException e )
+ {
+ // has already been logger with sufficient details
+ return;
+ }
+
+ final DataProcessorItemRetriever itemRetriever = getDataProcessorItemRetriever( group );
+ final Instant origLastUpdated = dataGroupUpdateRepository.getLastUpdated( group );
+ final AtomicLong count = new AtomicLong();
+ final Instant lastUpdated = itemRetriever.poll( group, origLastUpdated, getMaxSearchCount(), items -> {
+ final Instant processedAt = Instant.now();
+ final Set processedIds = processedItemRepository.find( group,
+ items.stream().map( sr -> sr.toIdString( processedAt ) ).collect( Collectors.toList() ) );
+
+ items.forEach( item -> {
+ final String processedId = item.toIdString( processedAt );
+ if ( !processedIds.contains( processedId ) )
+ {
+ // persist processed item
+ processedItemRepository.process( createProcessedItem( group, processedId, processedAt ), p -> {
+ final TransactionStatus transactionStatus = platformTransactionManager.getTransaction(
+ new DefaultTransactionDefinition( TransactionDefinition.PROPAGATION_NOT_SUPPORTED ) );
+ try
+ {
+ queuedItemRepository.enqueue( createQueuedItemId( group, item ) );
+ itemQueueJmsTemplate.convertAndSend( createDataItemQueueItem( group, item ) );
+ logger.debug( "Item {} of group {} has been enqueued.", item.getId(), group.getGroupId() );
+ count.incrementAndGet();
+ }
+ catch ( AlreadyQueuedException e )
+ {
+ logger.debug( "Item {} of group {} is still queued.", item.getId(), group.getGroupId() );
+ }
+ catch ( IgnoredQueuedItemException e )
+ {
+ // has already been logger with sufficient details
+ }
+ finally
+ {
+ finalizeTransaction( transactionStatus );
+ }
+ } );
+ }
+ } );
+ } );
+ dataGroupUpdateRepository.updateLastUpdated( group, lastUpdated );
+
+ // Purging old data must not be done before and also must not be done asynchronously. The ast updated
+ // timestamp may be older than the purged data. And before purging the old data, the last updated
+ // timestamp of the group must be updated by processing the complete items that belong to the group.
+ purgeOldestProcessed( group );
+ logger.info( "Processed queued group {} with {} enqueued items.",
+ dataGroupQueueItem.getDataGroupId(), count.longValue() );
+ }
+
+ protected void purgeOldestProcessed( @Nonnull G group )
+ {
+ final Instant from = Instant.now().minus( getMaxProcessedAgeMinutes(), ChronoUnit.MINUTES );
+ logger.debug( "Purging oldest processed items before {} for group {}.", from, group.getGroupId() );
+ final int count = processedItemRepository.deleteOldest( group, from );
+ logger.debug( "Purged {} oldest processed items before {} for group {}.", count, from, group.getGroupId() );
+ }
+
+ protected abstract QG createQueuedGroupId( @Nonnull G group );
+
+ @Nonnull
+ protected abstract DataGroupQueueItem createDataGroupQueueItem( @Nonnull G group );
+
+ @Nullable
+ protected abstract G findGroupByGroupId( @Nonnull GI groupId );
+
+ protected abstract int getMaxProcessedAgeMinutes();
+
+ protected abstract int getMaxSearchCount();
+
+ @Nonnull
+ protected abstract DataProcessorItemRetriever getDataProcessorItemRetriever( @Nonnull G group );
+
+ @Nonnull
+ protected abstract P createProcessedItem( @Nonnull G group, @Nonnull String id, @Nonnull Instant processedAt );
+
+ @Nonnull
+ protected abstract QI createQueuedItemId( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo );
+
+ @Nonnull
+ protected abstract DataItemQueueItem createDataItemQueueItem( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo );
+
+ @Nonnull
+ protected abstract Authentication createAuthentication();
+
+ private void finalizeTransaction( @Nonnull TransactionStatus transactionStatus )
+ {
+ if ( transactionStatus.isRollbackOnly() )
+ {
+ platformTransactionManager.rollback( transactionStatus );
+ }
+ else
+ {
+ platformTransactionManager.commit( transactionStatus );
+ }
+ }
+}
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/DataGroupQueueItem.java b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/DataGroupQueueItem.java
new file mode 100644
index 00000000..94d1542c
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/processor/impl/DataGroupQueueItem.java
@@ -0,0 +1,85 @@
+package org.dhis2.fhir.adapter.data.processor.impl;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.dhis2.fhir.adapter.data.model.DataGroupId;
+
+import javax.annotation.Nonnull;
+import java.io.Serializable;
+import java.time.ZonedDateTime;
+
+/**
+ * A data group that has been queued for processing. The instance
+ * will be serialized and de-serialized to and from JSON.
+ *
+ * @param the concrete type of the data group.
+ * @author volsch
+ */
+public class DataGroupQueueItem implements Serializable
+{
+ private static final long serialVersionUID = -7911324825049826913L;
+
+ private I dataGroupId;
+
+ private ZonedDateTime receivedAt;
+
+ public DataGroupQueueItem()
+ {
+ super();
+ }
+
+ public DataGroupQueueItem( @Nonnull I dataGroupId, @Nonnull ZonedDateTime receivedAt )
+ {
+ this.dataGroupId = dataGroupId;
+ this.receivedAt = receivedAt;
+ }
+
+ @JsonProperty
+ public I getDataGroupId()
+ {
+ return dataGroupId;
+ }
+
+ public void setDataGroupId( I dataGroupId )
+ {
+ this.dataGroupId = dataGroupId;
+ }
+
+ @JsonProperty
+ public ZonedDateTime getReceivedAt()
+ {
+ return receivedAt;
+ }
+
+ public void setReceivedAt( ZonedDateTime receivedAt )
+ {
+ this.receivedAt = receivedAt;
+ }
+}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/AlreadyQueuedException.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/AlreadyQueuedException.java
similarity index 97%
rename from fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/AlreadyQueuedException.java
rename to common/src/main/java/org/dhis2/fhir/adapter/data/repository/AlreadyQueuedException.java
index e0630476..1a6f7359 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/AlreadyQueuedException.java
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/AlreadyQueuedException.java
@@ -1,4 +1,4 @@
-package org.dhis2.fhir.adapter.fhir.data.repository;
+package org.dhis2.fhir.adapter.data.repository;
/*
* Copyright (c) 2004-2018, University of Oslo
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/repository/DataGroupUpdateRepository.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/DataGroupUpdateRepository.java
new file mode 100644
index 00000000..ec43415f
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/DataGroupUpdateRepository.java
@@ -0,0 +1,53 @@
+package org.dhis2.fhir.adapter.data.repository;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.DataGroupUpdate;
+import org.springframework.data.rest.core.annotation.RestResource;
+
+import javax.annotation.Nonnull;
+import java.time.Instant;
+
+/**
+ * Custom repository for {@link DataGroupUpdate}.
+ *
+ * @param the concrete type of the update data.
+ * @param the group to which the update data belongs to.
+ * @author volsch
+ */
+public interface DataGroupUpdateRepository, G extends DataGroup>
+{
+ @RestResource( exported = false )
+ @Nonnull
+ Instant getLastUpdated( @Nonnull G group );
+
+ @RestResource( exported = false )
+ boolean updateLastUpdated( @Nonnull G group, @Nonnull Instant lastUpdated );
+}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/IgnoredSubscriptionResourceException.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/IgnoredQueuedItemException.java
similarity index 84%
rename from fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/IgnoredSubscriptionResourceException.java
rename to common/src/main/java/org/dhis2/fhir/adapter/data/repository/IgnoredQueuedItemException.java
index 0e58c08d..6d77cf78 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/IgnoredSubscriptionResourceException.java
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/IgnoredQueuedItemException.java
@@ -1,4 +1,4 @@
-package org.dhis2.fhir.adapter.fhir.data.repository;
+package org.dhis2.fhir.adapter.data.repository;
/*
* Copyright (c) 2004-2018, University of Oslo
@@ -29,16 +29,15 @@
*/
/**
- * Thrown if the processed subscription resource can be ignored since it does
- * no longer exist.
+ * Thrown if the processed queue can be ignored since it does no longer exist.
*
* @author volsch
*/
-public class IgnoredSubscriptionResourceException extends RuntimeException
+public class IgnoredQueuedItemException extends RuntimeException
{
private static final long serialVersionUID = 4787054440737823557L;
- public IgnoredSubscriptionResourceException( String message, Throwable cause )
+ public IgnoredQueuedItemException( String message, Throwable cause )
{
super( message, cause );
}
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/repository/ProcessedItemRepository.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/ProcessedItemRepository.java
new file mode 100644
index 00000000..91384458
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/ProcessedItemRepository.java
@@ -0,0 +1,57 @@
+package org.dhis2.fhir.adapter.data.repository;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.ProcessedItem;
+import org.dhis2.fhir.adapter.data.model.ProcessedItemId;
+
+import javax.annotation.Nonnull;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * Custom repository for {@linkplain ProcessedItem processed items}.
+ *
+ * @param the concrete type of the processed item.
+ * @param the concrete type of the ID of the processed item.
+ * @param the group of the ID that is constant for a specific use case.
+ * @author volsch
+ */
+public interface ProcessedItemRepository, I extends ProcessedItemId, G extends DataGroup>
+{
+ @Nonnull
+ Set find( @Nonnull G prefix, @Nonnull Collection processedIds );
+
+ void process( @Nonnull T processedItem, @Nonnull Consumer consumer );
+
+ int deleteOldest( @Nonnull G prefix, @Nonnull Instant timestamp );
+}
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/repository/QueuedItemRepository.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/QueuedItemRepository.java
new file mode 100644
index 00000000..6bc0b2f8
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/QueuedItemRepository.java
@@ -0,0 +1,65 @@
+package org.dhis2.fhir.adapter.data.repository;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.QueuedItem;
+import org.dhis2.fhir.adapter.data.model.QueuedItemId;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Custom repository for {@link QueuedItem} entities.
+ *
+ * @param the ID class of the queued items.
+ * @param the concrete class of the data group.
+ * @author volsch
+ */
+public interface QueuedItemRepository, G extends DataGroup>
+{
+ /**
+ * Tries to insert a new entry as queued item. If the item already exists
+ * (there are still messages inside the queue for the specified ID),
+ * this method returns false
. Otherwise this method returns
+ * true
.
+ *
+ * @param id the ID of the queued item.
+ * @throws AlreadyQueuedException thrown if there are still messages inside the queue.
+ */
+ void enqueue( @Nonnull I id ) throws AlreadyQueuedException;
+
+ /**
+ * Tries to dequeue the queued item with the specified ID. If the entity does not
+ * exist, false
is returned.
+ *
+ * @param id the ID of the queued item for which a message should be dequeued.
+ * @return true
if the entity has been deleted, false
otherwise.
+ */
+ boolean dequeued( @Nonnull I id );
+}
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractDataGroupUpdateRepositoryImpl.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractDataGroupUpdateRepositoryImpl.java
new file mode 100644
index 00000000..77f18154
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractDataGroupUpdateRepositoryImpl.java
@@ -0,0 +1,110 @@
+package org.dhis2.fhir.adapter.data.repository.impl;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.DataGroupUpdate;
+import org.dhis2.fhir.adapter.data.repository.DataGroupUpdateRepository;
+import org.springframework.data.rest.core.annotation.RestResource;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.persistence.EntityManager;
+import javax.persistence.LockModeType;
+import javax.persistence.PersistenceContext;
+import javax.persistence.criteria.CriteriaBuilder;
+import javax.persistence.criteria.CriteriaQuery;
+import javax.persistence.criteria.Root;
+import java.time.Instant;
+
+/**
+ * Implementation of {@link DataGroupUpdateRepository}.
+ *
+ * @param the concrete type of the update data.
+ * @param the group to which the update data belongs to.
+ * @author volsch
+ */
+public abstract class AbstractDataGroupUpdateRepositoryImpl, G extends DataGroup> implements DataGroupUpdateRepository
+{
+ @PersistenceContext
+ private EntityManager entityManager;
+
+ public AbstractDataGroupUpdateRepositoryImpl( @Nonnull EntityManager entityManager )
+ {
+ this.entityManager = entityManager;
+ }
+
+ @RestResource( exported = false )
+ @Nonnull
+ @Override
+ public Instant getLastUpdated( @Nonnull G group )
+ {
+ T update = find( group, false );
+ if ( update == null )
+ {
+ update = createUpdate();
+ update.setGroup( group );
+ update.setLastUpdated( Instant.now() );
+ entityManager.persist( update );
+ }
+ return update.getLastUpdated();
+ }
+
+ @RestResource( exported = false )
+ @Transactional
+ @Override
+ public boolean updateLastUpdated( @Nonnull G group, @Nonnull Instant lastUpdated )
+ {
+ final T update = find( group, true );
+ if ( update == null )
+ {
+ return false;
+ }
+ update.setLastUpdated( lastUpdated );
+ return true;
+ }
+
+ @Nullable
+ protected T find( @Nonnull G group, boolean locked )
+ {
+ final CriteriaBuilder cb = entityManager.getCriteriaBuilder();
+ final CriteriaQuery extends T> criteria = cb.createQuery( getUpdateClass() );
+ final Root extends T> root = criteria.from( getUpdateClass() );
+ return entityManager.createQuery( criteria.where( cb.equal( root.get( "group" ), group ) ) )
+ .setLockMode( locked ? LockModeType.PESSIMISTIC_WRITE : LockModeType.NONE ).getResultList()
+ .stream().findFirst().orElse( null );
+ }
+
+ @Nonnull
+ protected abstract Class extends T> getUpdateClass();
+
+ @Nonnull
+ protected abstract T createUpdate();
+}
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractProcessedItemRepositoryImpl.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractProcessedItemRepositoryImpl.java
new file mode 100644
index 00000000..b85fd4bb
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractProcessedItemRepositoryImpl.java
@@ -0,0 +1,101 @@
+package org.dhis2.fhir.adapter.data.repository.impl;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.ProcessedItem;
+import org.dhis2.fhir.adapter.data.model.ProcessedItemId;
+import org.dhis2.fhir.adapter.data.repository.ProcessedItemRepository;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Nonnull;
+import javax.persistence.EntityManager;
+import javax.persistence.PersistenceContext;
+import javax.persistence.criteria.CriteriaBuilder;
+import javax.persistence.criteria.CriteriaDelete;
+import javax.persistence.criteria.CriteriaQuery;
+import javax.persistence.criteria.Root;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of a repository that stores already processed items.
+ *
+ * @param the concrete type of the processed item.
+ * @param the concrete type of the ID of the processed item.
+ * @param the group of the ID that is constant for a specific use case.
+ * @author volsch
+ */
+public abstract class AbstractProcessedItemRepositoryImpl, I extends ProcessedItemId, G extends DataGroup> implements ProcessedItemRepository
+{
+ @PersistenceContext
+ private EntityManager entityManager;
+
+ protected AbstractProcessedItemRepositoryImpl( @Nonnull EntityManager entityManager )
+ {
+ this.entityManager = entityManager;
+ }
+
+ @Nonnull
+ public Set find( @Nonnull G prefix, @Nonnull Collection suffixes )
+ {
+ final CriteriaBuilder cb = entityManager.getCriteriaBuilder();
+ final CriteriaQuery criteria = cb.createQuery( String.class );
+ final Root root = criteria.from( getProcessedItemClass() );
+ return new HashSet<>( entityManager.createQuery( criteria.select( root.get( "id" ).get( "processedId" ) )
+ .where( cb.equal( root.get( "id" ).get( "group" ), prefix ),
+ root.get( "id" ).get( "processedId" ).in( suffixes ) ) )
+ .setHint( "org.hibernate.fetchSize", 1000 ).getResultList() );
+ }
+
+ @Transactional
+ public void process( @Nonnull T processedItem, @Nonnull Consumer consumer )
+ {
+ entityManager.persist( processedItem );
+ entityManager.flush();
+ consumer.accept( processedItem );
+ }
+
+ @Transactional
+ public int deleteOldest( @Nonnull G prefix, @Nonnull Instant timestamp )
+ {
+ final CriteriaBuilder cb = entityManager.getCriteriaBuilder();
+ final CriteriaDelete criteriaDelete = cb.createCriteriaDelete( getProcessedItemClass() );
+ final Root root = criteriaDelete.from( getProcessedItemClass() );
+ return entityManager.createQuery( criteriaDelete.where(
+ cb.equal( root.get( "id" ).get( "group" ), prefix ),
+ cb.lessThan( root.get( "processedAt" ), timestamp ) ) ).executeUpdate();
+ }
+
+ @Nonnull
+ protected abstract Class getProcessedItemClass();
+}
diff --git a/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractQueuedItemRepositoryImpl.java b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractQueuedItemRepositoryImpl.java
new file mode 100644
index 00000000..f5332c5b
--- /dev/null
+++ b/common/src/main/java/org/dhis2/fhir/adapter/data/repository/impl/AbstractQueuedItemRepositoryImpl.java
@@ -0,0 +1,181 @@
+package org.dhis2.fhir.adapter.data.repository.impl;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.QueuedItem;
+import org.dhis2.fhir.adapter.data.model.QueuedItemId;
+import org.dhis2.fhir.adapter.data.repository.AlreadyQueuedException;
+import org.dhis2.fhir.adapter.data.repository.IgnoredQueuedItemException;
+import org.dhis2.fhir.adapter.data.repository.QueuedItemRepository;
+import org.dhis2.fhir.adapter.util.SqlExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.dao.support.DataAccessUtils;
+import org.springframework.dao.support.PersistenceExceptionTranslator;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.DefaultTransactionDefinition;
+
+import javax.annotation.Nonnull;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityNotFoundException;
+import javax.persistence.PersistenceContext;
+import javax.persistence.PersistenceException;
+
+import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW;
+
+/**
+ * Abstract implementation that enqueues and dequeues items.
+ *
+ * @param the concrete type of the queued item.
+ * @param the concrete type of the data group.
+ * @param the ID class of the queued items.
+ * @author volsch
+ */
+public abstract class AbstractQueuedItemRepositoryImpl, I extends QueuedItemId, G extends DataGroup> implements QueuedItemRepository
+{
+ private final Logger logger = LoggerFactory.getLogger( getClass() );
+
+ @PersistenceContext
+ private EntityManager entityManager;
+
+ private PlatformTransactionManager platformTransactionManager;
+
+ private PersistenceExceptionTranslator persistenceExceptionTranslator;
+
+ protected AbstractQueuedItemRepositoryImpl( @Nonnull EntityManager entityManager,
+ @Nonnull PlatformTransactionManager platformTransactionManager,
+ @Nonnull @Qualifier( "&entityManagerFactory" ) PersistenceExceptionTranslator persistenceExceptionTranslator )
+ {
+ this.entityManager = entityManager;
+ this.platformTransactionManager = platformTransactionManager;
+ this.persistenceExceptionTranslator = persistenceExceptionTranslator;
+ }
+
+ @Transactional( rollbackFor = AlreadyQueuedException.class )
+ public void enqueue( @Nonnull I id ) throws AlreadyQueuedException
+ {
+ try
+ {
+ entityManager.persist( createQueuedItem( id ) );
+ entityManager.flush();
+ }
+ catch ( EntityNotFoundException e )
+ {
+ logger.error( "Could not process enqueue request for {} due to constraint violation: {}", id, e.getCause().getMessage() );
+ throw new IgnoredQueuedItemException( "Queued item " + id + " does no longer exist.", e );
+ }
+ catch ( PersistenceException e )
+ {
+ final RuntimeException runtimeException = DataAccessUtils.translateIfNecessary(
+ e, persistenceExceptionTranslator );
+ if ( runtimeException instanceof DataIntegrityViolationException )
+ {
+ final DataIntegrityViolationException dataIntegrityViolationException =
+ (DataIntegrityViolationException) runtimeException;
+ if ( SqlExceptionUtils.isUniqueKeyViolation( dataIntegrityViolationException.getMostSpecificCause() ) )
+ {
+ throw new AlreadyQueuedException();
+ }
+ if ( SqlExceptionUtils.isForeignKeyViolation( dataIntegrityViolationException.getMostSpecificCause() ) )
+ {
+ logger.error( "Could not process enqueue request for {} due to constraint violation: {}",
+ id, e.getCause().getMessage() );
+ throw new IgnoredQueuedItemException( "Queued item " + id + " does no longer exist.", e );
+ }
+ }
+ throw runtimeException;
+ }
+ }
+
+ public boolean dequeued( @Nonnull I id )
+ {
+ // First an enqueue must be tried. There may still be a pending not committed enqueue.
+ // This must be deleted. The pending enqueue will block this enqueue until it has been committed.
+ TransactionStatus transactionStatus = platformTransactionManager
+ .getTransaction( new DefaultTransactionDefinition( PROPAGATION_REQUIRES_NEW ) );
+ try
+ {
+ enqueue( id );
+ }
+ catch ( AlreadyQueuedException e )
+ {
+ // can be ignored
+ }
+ finally
+ {
+ finalizeTransaction( transactionStatus );
+ }
+
+ transactionStatus = platformTransactionManager.getTransaction( new DefaultTransactionDefinition() );
+ try
+ {
+ final T reference = entityManager.getReference( getQueuedItemClass(), id );
+ entityManager.remove( reference );
+ entityManager.flush();
+ }
+ catch ( EntityNotFoundException e )
+ {
+ return false;
+ }
+ finally
+ {
+ finalizeTransaction( transactionStatus );
+ }
+ return true;
+ }
+
+ private void finalizeTransaction( @Nonnull TransactionStatus transactionStatus )
+ {
+ if ( transactionStatus.isRollbackOnly() )
+ {
+ platformTransactionManager.rollback( transactionStatus );
+ }
+ else
+ {
+ platformTransactionManager.commit( transactionStatus );
+ }
+ }
+
+ @Nonnull
+ protected EntityManager getEntityManager()
+ {
+ return entityManager;
+ }
+
+ @Nonnull
+ protected abstract Class getQueuedItemClass();
+
+ @Nonnull
+ protected abstract T createQueuedItem( @Nonnull I id );
+}
diff --git a/fhir-dstu3/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/dstu3/Dstu3SubscriptionResourceBundleRetrieverImpl.java b/fhir-dstu3/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/dstu3/Dstu3SubscriptionResourceItemRetrieverImpl.java
similarity index 92%
rename from fhir-dstu3/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/dstu3/Dstu3SubscriptionResourceBundleRetrieverImpl.java
rename to fhir-dstu3/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/dstu3/Dstu3SubscriptionResourceItemRetrieverImpl.java
index ce481c4a..5cb16c49 100644
--- a/fhir-dstu3/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/dstu3/Dstu3SubscriptionResourceBundleRetrieverImpl.java
+++ b/fhir-dstu3/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/dstu3/Dstu3SubscriptionResourceItemRetrieverImpl.java
@@ -31,7 +31,7 @@
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import org.dhis2.fhir.adapter.fhir.model.FhirVersion;
-import org.dhis2.fhir.adapter.fhir.remote.impl.AbstractSubscriptionResourceBundleRetriever;
+import org.dhis2.fhir.adapter.fhir.remote.impl.AbstractSubscriptionResourceItemRetriever;
import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseBundle;
@@ -45,14 +45,14 @@
import java.util.stream.Collectors;
/**
- * Implementation of {@link AbstractSubscriptionResourceBundleRetriever} for DSTU3.
+ * Implementation of {@link AbstractSubscriptionResourceItemRetriever} for DSTU3.
*
* @author volsch
*/
@Component
-public class Dstu3SubscriptionResourceBundleRetrieverImpl extends AbstractSubscriptionResourceBundleRetriever
+public class Dstu3SubscriptionResourceItemRetrieverImpl extends AbstractSubscriptionResourceItemRetriever
{
- public Dstu3SubscriptionResourceBundleRetrieverImpl( @Nonnull @Qualifier( "fhirContextDstu3" ) FhirContext fhirContext )
+ public Dstu3SubscriptionResourceItemRetrieverImpl( @Nonnull @Qualifier( "fhirContextDstu3" ) FhirContext fhirContext )
{
super( fhirContext );
}
diff --git a/fhir/pom.xml b/fhir/pom.xml
index 8ebcb222..a6cbeeb6 100644
--- a/fhir/pom.xml
+++ b/fhir/pom.xml
@@ -114,18 +114,6 @@
com.mysema.maven
apt-maven-plugin
- 1.1.3
-
-
-
- process
-
-
- target/generated-sources/java
- com.querydsl.apt.jpa.JPAAnnotationProcessor
-
-
-
org.flywaydb
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResource.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResource.java
index 657920b9..e44c22e6 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResource.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResource.java
@@ -28,9 +28,11 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.model.ProcessedItem;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import javax.annotation.Nonnull;
+import javax.persistence.AttributeOverride;
import javax.persistence.Column;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
@@ -46,44 +48,34 @@
*/
@Entity
@Table( name = "fhir_processed_remote_resource" )
-public class ProcessedRemoteFhirResource implements Serializable
+public class ProcessedRemoteFhirResource extends ProcessedItem implements Serializable
{
private static final long serialVersionUID = -6484140859863504862L;
private ProcessedRemoteFhirResourceId id;
- private Instant processedAt;
-
public ProcessedRemoteFhirResource()
{
super();
}
- public ProcessedRemoteFhirResource( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull String versionedFhirResourceId, @Nonnull Instant processedAt )
+ public ProcessedRemoteFhirResource( @Nonnull ProcessedRemoteFhirResourceId id, @Nonnull Instant processedAt )
{
- this.id = new ProcessedRemoteFhirResourceId( remoteSubscriptionResource, versionedFhirResourceId );
- this.processedAt = processedAt;
+ super( processedAt );
+ this.id = id;
}
@EmbeddedId
+ @AttributeOverride( name = "processedId", column = @Column( name = "versioned_fhir_resource_id", nullable = false ) )
+ @Override
public ProcessedRemoteFhirResourceId getId()
{
return id;
}
+ @Override
public void setId( ProcessedRemoteFhirResourceId id )
{
this.id = id;
}
-
- @Column( name = "processed_at", nullable = false )
- public Instant getProcessedAt()
- {
- return processedAt;
- }
-
- public void setProcessedAt( Instant processedAt )
- {
- this.processedAt = processedAt;
- }
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResourceId.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResourceId.java
index 3651039c..c074475b 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResourceId.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/ProcessedRemoteFhirResourceId.java
@@ -28,10 +28,10 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.model.ProcessedItemId;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import javax.annotation.Nonnull;
-import javax.persistence.Column;
import javax.persistence.Embeddable;
import javax.persistence.FetchType;
import javax.persistence.JoinColumn;
@@ -45,46 +45,35 @@
* @author volsch
*/
@Embeddable
-public class ProcessedRemoteFhirResourceId implements Serializable
+public class ProcessedRemoteFhirResourceId extends ProcessedItemId implements Serializable
{
private static final long serialVersionUID = 143055103713986347L;
- private RemoteSubscriptionResource remoteSubscriptionResource;
-
- private String versionedFhirResourceId;
+ private RemoteSubscriptionResource group;
public ProcessedRemoteFhirResourceId()
{
super();
}
- public ProcessedRemoteFhirResourceId( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull String versionedFhirResourceId )
+ public ProcessedRemoteFhirResourceId( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull String processedId )
{
- this.remoteSubscriptionResource = remoteSubscriptionResource;
- this.versionedFhirResourceId = versionedFhirResourceId;
+ super( processedId );
+ this.group = remoteSubscriptionResource;
}
+ @Override
@ManyToOne( optional = false, fetch = FetchType.LAZY )
@JoinColumn( name = "remote_subscription_resource_id" )
- public RemoteSubscriptionResource getRemoteSubscriptionResource()
+ public RemoteSubscriptionResource getGroup()
{
- return remoteSubscriptionResource;
+ return group;
}
- public void setRemoteSubscriptionResource( RemoteSubscriptionResource remoteSubscriptionResource )
- {
- this.remoteSubscriptionResource = remoteSubscriptionResource;
- }
-
- @Column( name = "versioned_fhir_resource_id", nullable = false )
- public String getVersionedFhirResourceId()
- {
- return versionedFhirResourceId;
- }
-
- public void setVersionedFhirResourceId( String versionedFhirResourceId )
+ @Override
+ public void setGroup( RemoteSubscriptionResource group )
{
- this.versionedFhirResourceId = versionedFhirResourceId;
+ this.group = group;
}
@Override
@@ -92,15 +81,21 @@ public boolean equals( Object o )
{
if ( this == o ) return true;
if ( o == null || getClass() != o.getClass() ) return false;
+ if ( !super.equals( o ) ) return false;
ProcessedRemoteFhirResourceId that = (ProcessedRemoteFhirResourceId) o;
- return Objects.equals( (remoteSubscriptionResource == null) ? null : remoteSubscriptionResource.getId(),
- (that.remoteSubscriptionResource == null) ? null : that.remoteSubscriptionResource.getId() ) &&
- Objects.equals( versionedFhirResourceId, that.versionedFhirResourceId );
+ return Objects.equals( ((group == null) ? 0 : group.getId()),
+ ((that.group == null) ? 0 : that.group.getId()) );
}
@Override
public int hashCode()
{
- return Objects.hash( (remoteSubscriptionResource == null) ? null : remoteSubscriptionResource.getId(), versionedFhirResourceId );
+ return Objects.hash( super.hashCode(), (group == null ? 0 : group.getId()) );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[Remote Subscription Resource ID " + ((group == null) ? "?" : group.getId()) + ", Processed ID " + getProcessedId() + "]";
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResource.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResource.java
index db0a14cd..53d10db4 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResource.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResource.java
@@ -28,10 +28,10 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.model.QueuedItem;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
-import javax.persistence.Basic;
-import javax.persistence.Column;
+import javax.annotation.Nonnull;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.Table;
@@ -46,47 +46,33 @@
*/
@Entity
@Table( name = "fhir_queued_remote_resource" )
-public class QueuedRemoteFhirResource implements Serializable
+public class QueuedRemoteFhirResource extends QueuedItem implements Serializable
{
private static final long serialVersionUID = 3146612484665379623L;
private QueuedRemoteFhirResourceId id;
- private String requestId;
-
- private Instant queuedAt;
-
- @EmbeddedId
- public QueuedRemoteFhirResourceId getId()
+ public QueuedRemoteFhirResource()
{
- return id;
+ super();
}
- public void setId( QueuedRemoteFhirResourceId id )
+ public QueuedRemoteFhirResource( @Nonnull QueuedRemoteFhirResourceId id, @Nonnull Instant queuedAt )
{
+ super( queuedAt );
this.id = id;
}
- @Basic
- @Column( name = "request_id", nullable = false )
- public String getRequestId()
- {
- return requestId;
- }
-
- public void setRequestId( String requestId )
- {
- this.requestId = requestId;
- }
-
- @Column( name = "queued_at", nullable = false )
- public Instant getQueuedAt()
+ @EmbeddedId
+ @Override
+ public QueuedRemoteFhirResourceId getId()
{
- return queuedAt;
+ return id;
}
- public void setQueuedAt( Instant queuedAt )
+ @Override
+ public void setId( QueuedRemoteFhirResourceId id )
{
- this.queuedAt = queuedAt;
+ this.id = id;
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResourceId.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResourceId.java
index c5ae08dd..0c241af2 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResourceId.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteFhirResourceId.java
@@ -28,6 +28,7 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.model.QueuedItemId;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import javax.annotation.Nonnull;
@@ -45,11 +46,11 @@
* @author volsch
*/
@Embeddable
-public class QueuedRemoteFhirResourceId implements Serializable
+public class QueuedRemoteFhirResourceId extends QueuedItemId implements Serializable
{
private static final long serialVersionUID = -4642534319215405587L;
- private RemoteSubscriptionResource remoteSubscriptionResource;
+ private RemoteSubscriptionResource group;
private String fhirResourceId;
@@ -58,22 +59,24 @@ public QueuedRemoteFhirResourceId()
super();
}
- public QueuedRemoteFhirResourceId( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull String fhirResourceId )
+ public QueuedRemoteFhirResourceId( @Nonnull RemoteSubscriptionResource group, @Nonnull String fhirResourceId )
{
- this.remoteSubscriptionResource = remoteSubscriptionResource;
+ this.group = group;
this.fhirResourceId = fhirResourceId;
}
@ManyToOne( optional = false, fetch = FetchType.LAZY )
@JoinColumn( name = "remote_subscription_resource_id" )
- public RemoteSubscriptionResource getRemoteSubscriptionResource()
+ @Override
+ public RemoteSubscriptionResource getGroup()
{
- return remoteSubscriptionResource;
+ return group;
}
- public void setRemoteSubscriptionResource( RemoteSubscriptionResource remoteSubscriptionResource )
+ @Override
+ public void setGroup( RemoteSubscriptionResource group )
{
- this.remoteSubscriptionResource = remoteSubscriptionResource;
+ this.group = group;
}
@Column( name = "fhir_resource_id", nullable = false )
@@ -93,14 +96,20 @@ public boolean equals( Object o )
if ( this == o ) return true;
if ( o == null || getClass() != o.getClass() ) return false;
QueuedRemoteFhirResourceId that = (QueuedRemoteFhirResourceId) o;
- return Objects.equals( (remoteSubscriptionResource == null) ? null : remoteSubscriptionResource.getId(),
- (that.remoteSubscriptionResource == null) ? null : that.remoteSubscriptionResource.getId() ) &&
+ return Objects.equals( (group == null) ? null : group.getId(),
+ (that.group == null) ? null : that.group.getId() ) &&
Objects.equals( fhirResourceId, that.fhirResourceId );
}
@Override
public int hashCode()
{
- return Objects.hash( (remoteSubscriptionResource == null) ? null : remoteSubscriptionResource.getId(), fhirResourceId );
+ return Objects.hash( (group == null) ? null : group.getId(), fhirResourceId );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[Remote Subscription Resource ID " + ((group == null) ? "?" : group.getId()) + ", FHIR Resource ID " + getFhirResourceId() + "]";
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteSubscriptionRequest.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteSubscriptionRequest.java
index 2d1d74df..5e115f5a 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteSubscriptionRequest.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteSubscriptionRequest.java
@@ -28,19 +28,15 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.model.QueuedItem;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
-import javax.persistence.Basic;
-import javax.persistence.Column;
+import javax.annotation.Nonnull;
+import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.JoinColumn;
-import javax.persistence.MapsId;
-import javax.persistence.OneToOne;
import javax.persistence.Table;
import java.io.Serializable;
import java.time.Instant;
-import java.util.UUID;
/**
* Entity that contains if currently for {@linkplain RemoteSubscriptionResource remote subscription resource}
@@ -50,63 +46,33 @@
*/
@Entity
@Table( name = "fhir_queued_remote_subscription_request" )
-public class QueuedRemoteSubscriptionRequest implements Serializable
+public class QueuedRemoteSubscriptionRequest extends QueuedItem implements Serializable
{
private static final long serialVersionUID = 4304414115903803395L;
- private UUID id;
+ private QueuedRemoteSubscriptionRequestId id;
- private RemoteSubscriptionResource subscriptionResource;
-
- private String requestId;
-
- private Instant queuedAt;
-
- @Id
- @Column( name = "id", nullable = false )
- public UUID getId()
+ public QueuedRemoteSubscriptionRequest()
{
- return id;
+ super();
}
- public void setId( UUID id )
+ public QueuedRemoteSubscriptionRequest( @Nonnull QueuedRemoteSubscriptionRequestId id, @Nonnull Instant queuedAt )
{
+ super( queuedAt );
this.id = id;
}
- @OneToOne( optional = false )
- @JoinColumn( name = "id", nullable = false )
- @MapsId
- public RemoteSubscriptionResource getSubscriptionResource()
- {
- return subscriptionResource;
- }
-
- public void setSubscriptionResource( RemoteSubscriptionResource subscriptionResource )
- {
- this.subscriptionResource = subscriptionResource;
- }
-
- @Basic
- @Column( name = "request_id", nullable = false )
- public String getRequestId()
+ @EmbeddedId
+ @Override
+ public QueuedRemoteSubscriptionRequestId getId()
{
- return requestId;
- }
-
- public void setRequestId( String requestId )
- {
- this.requestId = requestId;
- }
-
- @Column( name = "queued_at", nullable = false )
- public Instant getQueuedAt()
- {
- return queuedAt;
+ return id;
}
- public void setQueuedAt( Instant queuedAt )
+ @Override
+ public void setId( QueuedRemoteSubscriptionRequestId id )
{
- this.queuedAt = queuedAt;
+ this.id = id;
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteSubscriptionRequestId.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteSubscriptionRequestId.java
new file mode 100644
index 00000000..6e76de2b
--- /dev/null
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/model/QueuedRemoteSubscriptionRequestId.java
@@ -0,0 +1,99 @@
+package org.dhis2.fhir.adapter.fhir.data.model;
+
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.dhis2.fhir.adapter.data.model.QueuedItemId;
+import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
+
+import javax.annotation.Nonnull;
+import javax.persistence.Embeddable;
+import javax.persistence.FetchType;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The unique ID of a a pending request for remote subscription resource processing.
+ *
+ * @author volsch
+ */
+@Embeddable
+public class QueuedRemoteSubscriptionRequestId extends QueuedItemId implements Serializable
+{
+ private static final long serialVersionUID = -4642534319215405587L;
+
+ private RemoteSubscriptionResource group;
+
+ public QueuedRemoteSubscriptionRequestId()
+ {
+ super();
+ }
+
+ public QueuedRemoteSubscriptionRequestId( @Nonnull RemoteSubscriptionResource group )
+ {
+ this.group = group;
+ }
+
+ @ManyToOne( optional = false, fetch = FetchType.LAZY )
+ @JoinColumn( name = "remote_subscription_resource_id" )
+ @Override
+ public RemoteSubscriptionResource getGroup()
+ {
+ return group;
+ }
+
+ @Override
+ public void setGroup( RemoteSubscriptionResource group )
+ {
+ this.group = group;
+ }
+
+ @Override
+ public boolean equals( Object o )
+ {
+ if ( this == o ) return true;
+ if ( o == null || getClass() != o.getClass() ) return false;
+ QueuedRemoteSubscriptionRequestId that = (QueuedRemoteSubscriptionRequestId) o;
+ return Objects.equals( (group == null) ? null : group.getId(),
+ (that.group == null) ? null : that.group.getId() );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash( (group == null) ? null : group.getId() );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[Remote Subscription Resource ID " + ((group == null) ? "?" : group.getId()) + "]";
+ }
+}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomProcessedRemoteFhirResourceRepository.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomProcessedRemoteFhirResourceRepository.java
index 03f9695e..3ebb40aa 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomProcessedRemoteFhirResourceRepository.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomProcessedRemoteFhirResourceRepository.java
@@ -28,19 +28,14 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.repository.ProcessedItemRepository;
import org.dhis2.fhir.adapter.fhir.data.model.ProcessedRemoteFhirResource;
+import org.dhis2.fhir.adapter.fhir.data.model.ProcessedRemoteFhirResourceId;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
-import javax.annotation.Nonnull;
-import java.time.Instant;
-import java.util.function.Consumer;
-
/**
* Custom repository for processed remote FHIR resources {@link ProcessedRemoteFhirResource}.
*/
-public interface CustomProcessedRemoteFhirResourceRepository
+public interface CustomProcessedRemoteFhirResourceRepository extends ProcessedItemRepository
{
- void process( @Nonnull ProcessedRemoteFhirResource processedRemoteFhirResource, @Nonnull Consumer consumer );
-
- int deleteOldest( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull Instant timestamp );
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteFhirResourceRepository.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteFhirResourceRepository.java
index faec31e9..fd766115 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteFhirResourceRepository.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteFhirResourceRepository.java
@@ -28,42 +28,16 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.repository.QueuedItemRepository;
import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteFhirResource;
-
-import javax.annotation.Nonnull;
-import java.util.UUID;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteFhirResourceId;
+import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
/**
* Custom repository for {@link QueuedRemoteFhirResource} entities.
*
* @author volsch
*/
-public interface CustomQueuedRemoteFhirResourceRepository
+public interface CustomQueuedRemoteFhirResourceRepository extends QueuedItemRepository
{
- /**
- * Tries to insert a new entry into queued remote FHIR resource. If the entry already
- * exists (there are still messages inside the queue for the specified FHIR resource ID
- * of the specified subscription resource), this method returns false
.
- * Otherwise this method returns true
.
- *
- * @param subscriptionResourceId the ID of the subscription resource for which a
- * message should be enqueued.
- * @param fhirResourceId the ID of the FHIR resource for which a message should
- * be enqueued.
- * @param requestId the unique ID of the request to enqueue this resource.
- * @throws AlreadyQueuedException thrown if there are still messages inside the queue.
- */
- void enqueue( @Nonnull UUID subscriptionResourceId, @Nonnull String fhirResourceId, @Nonnull String requestId ) throws AlreadyQueuedException;
-
- /**
- * Tries to dequeue the entity with the specified IDs. If the entity does not
- * exist, false
is returned.
- *
- * @param subscriptionResourceId the ID of the subscribtion resource to which the FHIR
- * resource belongs to.
- * @param fhirResourceId the ID of the FHIR resource for which a message has been
- * dequeued.
- * @return true
if the entity has been deleted, false
otherwise.
- */
- boolean dequeued( @Nonnull UUID subscriptionResourceId, @Nonnull String fhirResourceId );
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteSubscriptionRequestRepository.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteSubscriptionRequestRepository.java
index b79bb08d..e97cb4f6 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteSubscriptionRequestRepository.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/CustomQueuedRemoteSubscriptionRequestRepository.java
@@ -28,38 +28,16 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.repository.QueuedItemRepository;
import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteSubscriptionRequest;
-
-import javax.annotation.Nonnull;
-import java.util.UUID;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteSubscriptionRequestId;
+import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
/**
* Custom repository for {@link QueuedRemoteSubscriptionRequest} entities.
*
* @author volsch
*/
-public interface CustomQueuedRemoteSubscriptionRequestRepository
+public interface CustomQueuedRemoteSubscriptionRequestRepository extends QueuedItemRepository
{
- /**
- * Tries to insert a new entry into subscription resource. If the entry already
- * exists (there are still messages inside the queue for the specified subscription
- * resource), this method returns false
. Otherwise this method
- * returns true
.
- *
- * @param subscriptionResourceId the ID of the subscription resource for which a
- * message should be enqueued.
- * @param requestId the ID of the current request.
- * @throws AlreadyQueuedException thrown if there are still messages inside the queue.
- */
- void enqueue( @Nonnull UUID subscriptionResourceId, @Nonnull String requestId ) throws AlreadyQueuedException;
-
- /**
- * Tries to dequeue the entity with the specified ID. If the entity does not
- * exist, false
is returned.
- *
- * @param subscriptionResourceId the ID of the entity to be deleted.
- * @return true
if the entity has been deleted,
- * false
otherwise.
- */
- boolean dequeued( @Nonnull UUID subscriptionResourceId );
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/ProcessedRemoteFhirResourceRepository.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/ProcessedRemoteFhirResourceRepository.java
index fa84ae84..b3b0c1fd 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/ProcessedRemoteFhirResourceRepository.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/ProcessedRemoteFhirResourceRepository.java
@@ -29,16 +29,8 @@
*/
import org.dhis2.fhir.adapter.fhir.data.model.ProcessedRemoteFhirResource;
-import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.data.jpa.repository.Query;
-import org.springframework.data.jpa.repository.QueryHints;
-import org.springframework.data.repository.query.Param;
-import javax.annotation.Nonnull;
-import javax.persistence.QueryHint;
-import java.util.Collection;
-import java.util.Set;
import java.util.UUID;
/**
@@ -48,9 +40,4 @@
*/
public interface ProcessedRemoteFhirResourceRepository extends JpaRepository, CustomProcessedRemoteFhirResourceRepository
{
- @Nonnull
- @Query( value = "SELECT p.id.versionedFhirResourceId FROM #{#entityName} p WHERE p.id.remoteSubscriptionResource=:remoteSubscriptionResource AND p.id.versionedFhirResourceId IN (:versionedFhirResourceIds)" )
- @QueryHints( @QueryHint( name = "org.hibernate.fetchSize", value = "1000" ) )
- Set findByVersionedIds( @Nonnull @Param( "remoteSubscriptionResource" ) RemoteSubscriptionResource remoteSubscriptionResource,
- @Nonnull @Param( "versionedFhirResourceIds" ) Collection versionedFhirResourceIds );
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomProcessedRemoteFhirResourceRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomProcessedRemoteFhirResourceRepositoryImpl.java
index 84315c47..40d65510 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomProcessedRemoteFhirResourceRepositoryImpl.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomProcessedRemoteFhirResourceRepositoryImpl.java
@@ -28,41 +28,31 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.repository.impl.AbstractProcessedItemRepositoryImpl;
import org.dhis2.fhir.adapter.fhir.data.model.ProcessedRemoteFhirResource;
+import org.dhis2.fhir.adapter.fhir.data.model.ProcessedRemoteFhirResourceId;
import org.dhis2.fhir.adapter.fhir.data.repository.CustomProcessedRemoteFhirResourceRepository;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
-import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
-import javax.persistence.PersistenceContext;
-import java.time.Instant;
-import java.util.function.Consumer;
/**
* Implementation of {@link CustomProcessedRemoteFhirResourceRepository}.
*
* @author volsch
*/
-public class CustomProcessedRemoteFhirResourceRepositoryImpl implements CustomProcessedRemoteFhirResourceRepository
+public class CustomProcessedRemoteFhirResourceRepositoryImpl extends AbstractProcessedItemRepositoryImpl implements CustomProcessedRemoteFhirResourceRepository
{
- @PersistenceContext
- EntityManager entityManager;
-
- @Transactional
- @Override
- public void process( @Nonnull ProcessedRemoteFhirResource processedRemoteFhirResource, @Nonnull Consumer consumer )
+ public CustomProcessedRemoteFhirResourceRepositoryImpl( @Nonnull EntityManager entityManager )
{
- entityManager.persist( processedRemoteFhirResource );
- entityManager.flush();
- consumer.accept( processedRemoteFhirResource );
+ super( entityManager );
}
- @Transactional
+ @Nonnull
@Override
- public int deleteOldest( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull Instant timestamp )
+ protected Class getProcessedItemClass()
{
- return entityManager.createQuery( "DELETE FROM ProcessedRemoteFhirResource p WHERE p.id.remoteSubscriptionResource=:remoteSubscriptionResource AND p.processedAt<:timestamp" )
- .setParameter( "remoteSubscriptionResource", remoteSubscriptionResource ).setParameter( "timestamp", timestamp ).executeUpdate();
+ return ProcessedRemoteFhirResource.class;
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteFhirResourceRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteFhirResourceRepositoryImpl.java
index d8db0e7c..0e643320 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteFhirResourceRepositoryImpl.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteFhirResourceRepositoryImpl.java
@@ -28,140 +28,44 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteSubscriptionRequest;
-import org.dhis2.fhir.adapter.fhir.data.repository.AlreadyQueuedException;
+import org.dhis2.fhir.adapter.data.repository.impl.AbstractQueuedItemRepositoryImpl;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteFhirResource;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteFhirResourceId;
import org.dhis2.fhir.adapter.fhir.data.repository.CustomQueuedRemoteFhirResourceRepository;
-import org.dhis2.fhir.adapter.fhir.data.repository.IgnoredSubscriptionResourceException;
-import org.dhis2.fhir.adapter.util.SqlExceptionUtils;
-import org.hibernate.query.NativeQuery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.dao.support.DataAccessUtils;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
-import javax.persistence.PersistenceContext;
-import javax.persistence.PersistenceException;
-import javax.persistence.Query;
import java.time.Instant;
-import java.util.UUID;
-
-import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW;
/**
* Implementation of {@link CustomQueuedRemoteFhirResourceRepository}.
*
* @author volsch
*/
-public class CustomQueuedRemoteFhirResourceRepositoryImpl implements CustomQueuedRemoteFhirResourceRepository
+public class CustomQueuedRemoteFhirResourceRepositoryImpl extends AbstractQueuedItemRepositoryImpl implements CustomQueuedRemoteFhirResourceRepository
{
- private final Logger logger = LoggerFactory.getLogger( getClass() );
-
- @PersistenceContext
- private EntityManager entityManager;
-
- private PlatformTransactionManager platformTransactionManager;
-
- private PersistenceExceptionTranslator persistenceExceptionTranslator;
-
public CustomQueuedRemoteFhirResourceRepositoryImpl( @Nonnull EntityManager entityManager,
- @Nonnull PlatformTransactionManager platformTransactionManager, @Nonnull @Qualifier( "&entityManagerFactory" ) PersistenceExceptionTranslator persistenceExceptionTranslator )
+ @Nonnull PlatformTransactionManager platformTransactionManager,
+ @Nonnull @Qualifier( "&entityManagerFactory" ) PersistenceExceptionTranslator persistenceExceptionTranslator )
{
- this.entityManager = entityManager;
- this.platformTransactionManager = platformTransactionManager;
- this.persistenceExceptionTranslator = persistenceExceptionTranslator;
+ super( entityManager, platformTransactionManager, persistenceExceptionTranslator );
}
- @Transactional( rollbackFor = AlreadyQueuedException.class )
+ @Nonnull
@Override
- public void enqueue( @Nonnull UUID subscriptionResourceId, @Nonnull String fhirResourceId, @Nonnull String requestId ) throws AlreadyQueuedException
+ protected Class getQueuedItemClass()
{
- final Query query = entityManager.createNativeQuery( "INSERT INTO fhir_queued_remote_resource(remote_subscription_resource_id,fhir_resource_id,request_id,queued_at) " +
- "VALUES (:subscriptionResourceId,:fhirResourceId,:requestId,:queuedAt)" )
- .setParameter( "subscriptionResourceId", subscriptionResourceId ).setParameter( "fhirResourceId", fhirResourceId )
- .setParameter( "requestId", requestId ).setParameter( "queuedAt", Instant.now() );
- // avoid invalidation of complete 2nd level cache
- query.unwrap( NativeQuery.class ).addSynchronizedEntityClass( QueuedRemoteSubscriptionRequest.class );
-
- try
- {
- query.executeUpdate();
- }
- catch ( PersistenceException e )
- {
- final RuntimeException runtimeException = DataAccessUtils.translateIfNecessary( e, persistenceExceptionTranslator );
- if ( runtimeException instanceof DataIntegrityViolationException )
- {
- final DataIntegrityViolationException dataIntegrityViolationException =
- (DataIntegrityViolationException) runtimeException;
- if ( SqlExceptionUtils.isUniqueKeyViolation( dataIntegrityViolationException.getMostSpecificCause() ) )
- {
- throw new AlreadyQueuedException();
- }
- if ( SqlExceptionUtils.isForeignKeyViolation( dataIntegrityViolationException.getMostSpecificCause() ) )
- {
- logger.error( "Could not process enqueue request for subscription resource {} and FHIR resource {} due to constraint violation: {}",
- subscriptionResourceId, fhirResourceId, e.getCause().getMessage() );
- throw new IgnoredSubscriptionResourceException( "Subscription resource " + subscriptionResourceId + " does no longer exist.", e );
- }
- }
- throw runtimeException;
- }
+ return QueuedRemoteFhirResource.class;
}
+ @Nonnull
@Override
- public boolean dequeued( @Nonnull UUID subscriptionResourceId, @Nonnull String fhirResourceId )
+ protected QueuedRemoteFhirResource createQueuedItem( @Nonnull QueuedRemoteFhirResourceId id )
{
- // First an enqueue must be tried. There may still be a pending not committed enqueue.
- // This must be deleted. The pending enqueue will block this enqueue until it has been committed.
- TransactionStatus transactionStatus = platformTransactionManager
- .getTransaction( new DefaultTransactionDefinition( PROPAGATION_REQUIRES_NEW ) );
- try
- {
- enqueue( subscriptionResourceId, fhirResourceId, "?" );
- }
- catch ( AlreadyQueuedException e )
- {
- // can be ignored
- }
- finally
- {
- if ( transactionStatus.isRollbackOnly() )
- {
- platformTransactionManager.rollback( transactionStatus );
- }
- else
- {
- platformTransactionManager.commit( transactionStatus );
- }
- }
-
- transactionStatus = platformTransactionManager
- .getTransaction( new DefaultTransactionDefinition() );
- try
- {
- final Query query = entityManager.createQuery( "DELETE FROM QueuedRemoteFhirResource " +
- "WHERE id.remoteSubscriptionResource.id=:subscriptionResourceId AND id.fhirResourceId=:fhirResourceId" )
- .setParameter( "subscriptionResourceId", subscriptionResourceId ).setParameter( "fhirResourceId", fhirResourceId );
- return (query.executeUpdate() > 0);
- }
- finally
- {
- if ( transactionStatus.isRollbackOnly() )
- {
- platformTransactionManager.rollback( transactionStatus );
- }
- else
- {
- platformTransactionManager.commit( transactionStatus );
- }
- }
+ return new QueuedRemoteFhirResource( id, Instant.now() );
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteSubscriptionRequestRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteSubscriptionRequestRepositoryImpl.java
index f77f18ff..f819edcd 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteSubscriptionRequestRepositoryImpl.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/data/repository/impl/CustomQueuedRemoteSubscriptionRequestRepositoryImpl.java
@@ -28,138 +28,51 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.repository.impl.AbstractQueuedItemRepositoryImpl;
import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteSubscriptionRequest;
-import org.dhis2.fhir.adapter.fhir.data.repository.AlreadyQueuedException;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteSubscriptionRequestId;
import org.dhis2.fhir.adapter.fhir.data.repository.CustomQueuedRemoteSubscriptionRequestRepository;
-import org.dhis2.fhir.adapter.fhir.data.repository.IgnoredSubscriptionResourceException;
-import org.dhis2.fhir.adapter.util.SqlExceptionUtils;
-import org.hibernate.query.NativeQuery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.dao.support.DataAccessUtils;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
-import javax.persistence.PersistenceContext;
-import javax.persistence.PersistenceException;
-import javax.persistence.Query;
import java.time.Instant;
-import java.util.UUID;
-
-import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW;
/**
* Implementation of {@link CustomQueuedRemoteSubscriptionRequestRepository}.
*
* @author volsch
*/
-public class CustomQueuedRemoteSubscriptionRequestRepositoryImpl implements CustomQueuedRemoteSubscriptionRequestRepository
+public class CustomQueuedRemoteSubscriptionRequestRepositoryImpl extends AbstractQueuedItemRepositoryImpl implements CustomQueuedRemoteSubscriptionRequestRepository
{
- private final Logger logger = LoggerFactory.getLogger( getClass() );
-
- @PersistenceContext
- private EntityManager entityManager;
-
- private PlatformTransactionManager platformTransactionManager;
-
- private PersistenceExceptionTranslator persistenceExceptionTranslator;
-
- public CustomQueuedRemoteSubscriptionRequestRepositoryImpl( @Nonnull EntityManager entityManager,
- @Nonnull PlatformTransactionManager platformTransactionManager, @Nonnull @Qualifier( "&entityManagerFactory" ) PersistenceExceptionTranslator persistenceExceptionTranslator )
+ public CustomQueuedRemoteSubscriptionRequestRepositoryImpl(
+ @Nonnull EntityManager entityManager,
+ @Nonnull PlatformTransactionManager platformTransactionManager,
+ @Nonnull @Qualifier( "&entityManagerFactory" ) PersistenceExceptionTranslator persistenceExceptionTranslator )
{
- this.entityManager = entityManager;
- this.platformTransactionManager = platformTransactionManager;
- this.persistenceExceptionTranslator = persistenceExceptionTranslator;
+ super( entityManager, platformTransactionManager, persistenceExceptionTranslator );
}
- @Transactional( rollbackFor = AlreadyQueuedException.class )
+ @Nonnull
@Override
- public void enqueue( @Nonnull UUID subscriptionResourceId, @Nonnull String requestId ) throws AlreadyQueuedException
+ protected Class getQueuedItemClass()
{
- final Query query = entityManager.createNativeQuery( "INSERT INTO fhir_queued_remote_subscription_request" +
- "(id,request_id,queued_at) VALUES (:id,:requestId,:queuedAt)" )
- .setParameter( "id", subscriptionResourceId ).setParameter( "requestId", requestId ).setParameter( "queuedAt", Instant.now() );
- // avoid invalidation of complete 2nd level cache
- query.unwrap( NativeQuery.class ).addSynchronizedEntityClass( QueuedRemoteSubscriptionRequest.class );
-
- try
- {
- query.executeUpdate();
- }
- catch ( PersistenceException e )
- {
- final RuntimeException runtimeException = DataAccessUtils.translateIfNecessary( e, persistenceExceptionTranslator );
- if ( runtimeException instanceof DataIntegrityViolationException )
- {
- final DataIntegrityViolationException dataIntegrityViolationException =
- (DataIntegrityViolationException) runtimeException;
- if ( SqlExceptionUtils.isUniqueKeyViolation( dataIntegrityViolationException.getMostSpecificCause() ) )
- {
- throw new AlreadyQueuedException();
- }
- if ( SqlExceptionUtils.isForeignKeyViolation( dataIntegrityViolationException.getMostSpecificCause() ) )
- {
- logger.error( "Could not process enqueue request for subscription resource {} due to constraint violation: {}",
- subscriptionResourceId, e.getCause().getMessage() );
- throw new IgnoredSubscriptionResourceException( "Subscription resource " + subscriptionResourceId + " does no longer exist.", e );
- }
- }
- throw runtimeException;
- }
+ return QueuedRemoteSubscriptionRequest.class;
}
+
+ @Nonnull
@Override
- public boolean dequeued( @Nonnull UUID subscriptionResourceId )
+ protected QueuedRemoteSubscriptionRequest createQueuedItem( @Nonnull QueuedRemoteSubscriptionRequestId id )
{
- // First an enqueue must be tried. There may still be a pending not committed enqueue.
- // This must be deleted. The pending enqueue will block this enqueue until it has been committed.
- TransactionStatus transactionStatus = platformTransactionManager
- .getTransaction( new DefaultTransactionDefinition( PROPAGATION_REQUIRES_NEW ) );
- try
- {
- enqueue( subscriptionResourceId, "?" );
- }
- catch ( AlreadyQueuedException e )
- {
- // can be ignored
- }
- finally
- {
- if ( transactionStatus.isRollbackOnly() )
- {
- platformTransactionManager.rollback( transactionStatus );
- }
- else
- {
- platformTransactionManager.commit( transactionStatus );
- }
- }
-
- transactionStatus = platformTransactionManager
- .getTransaction( new DefaultTransactionDefinition() );
- try
- {
- final Query query = entityManager.createQuery( "DELETE FROM QueuedRemoteSubscriptionRequest WHERE id=:id" )
- .setParameter( "id", subscriptionResourceId );
- return (query.executeUpdate() > 0);
- }
- finally
+ RemoteSubscriptionResource remoteSubscriptionResource = id.getGroup();
+ if ( !getEntityManager().contains( remoteSubscriptionResource ) )
{
- if ( transactionStatus.isRollbackOnly() )
- {
- platformTransactionManager.rollback( transactionStatus );
- }
- else
- {
- platformTransactionManager.commit( transactionStatus );
- }
+ remoteSubscriptionResource = getEntityManager().getReference( RemoteSubscriptionResource.class, remoteSubscriptionResource.getId() );
}
+ return new QueuedRemoteSubscriptionRequest( new QueuedRemoteSubscriptionRequestId( remoteSubscriptionResource ), Instant.now() );
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResource.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResource.java
index 7604298f..4cf58325 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResource.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResource.java
@@ -30,6 +30,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
+import org.dhis2.fhir.adapter.data.model.DataGroup;
+import org.dhis2.fhir.adapter.data.model.UuidDataGroupId;
import org.dhis2.fhir.adapter.validator.EnumValue;
import org.springframework.data.rest.core.annotation.RestResource;
@@ -43,6 +45,7 @@
import javax.persistence.ManyToOne;
import javax.persistence.OneToOne;
import javax.persistence.Table;
+import javax.persistence.Transient;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.io.Serializable;
@@ -54,7 +57,7 @@
*/
@Entity
@Table( name = "fhir_remote_subscription_resource" )
-public class RemoteSubscriptionResource extends VersionedBaseMetadata implements Serializable
+public class RemoteSubscriptionResource extends VersionedBaseMetadata implements DataGroup, Serializable
{
private static final long serialVersionUID = -6797001318266984453L;
@@ -128,7 +131,7 @@ public void setRemoteSubscription( RemoteSubscription remoteSubscription )
}
@RestResource( exported = false )
- @OneToOne( mappedBy = "remoteSubscriptionResource", cascade = { CascadeType.REMOVE, CascadeType.PERSIST } )
+ @OneToOne( mappedBy = "group", cascade = { CascadeType.REMOVE, CascadeType.PERSIST } )
@JsonIgnore
public RemoteSubscriptionResourceUpdate getResourceUpdate()
{
@@ -164,4 +167,12 @@ public void setFhirSubscriptionId( String fhirSubscriptionId )
{
this.fhirSubscriptionId = fhirSubscriptionId;
}
+
+ @JsonIgnore
+ @Transient
+ @Override
+ public UuidDataGroupId getGroupId()
+ {
+ return (getId() == null) ? null : new UuidDataGroupId( getId() );
+ }
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResourceUpdate.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResourceUpdate.java
index 91548852..dec70c3f 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResourceUpdate.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/model/RemoteSubscriptionResourceUpdate.java
@@ -29,8 +29,9 @@
*/
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.dhis2.fhir.adapter.data.model.DataGroupUpdate;
-import javax.persistence.Basic;
+import javax.persistence.AttributeOverride;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
@@ -38,8 +39,6 @@
import javax.persistence.MapsId;
import javax.persistence.OneToOne;
import javax.persistence.Table;
-import java.io.Serializable;
-import java.time.Instant;
import java.util.UUID;
/**
@@ -49,13 +48,14 @@
*/
@Entity
@Table( name = "fhir_remote_subscription_resource_update" )
-public class RemoteSubscriptionResourceUpdate implements Serializable
+@AttributeOverride( name = "lastUpdated", column = @Column( name = "remote_last_updated", nullable = false ) )
+public class RemoteSubscriptionResourceUpdate extends DataGroupUpdate
{
private static final long serialVersionUID = -2051276256396499975L;
private UUID id;
- private RemoteSubscriptionResource remoteSubscriptionResource;
- private Instant remoteLastUpdated;
+
+ private RemoteSubscriptionResource group;
@Id
public UUID getId()
@@ -72,25 +72,14 @@ public void setId( UUID id )
@OneToOne( optional = false )
@MapsId
@JsonIgnore
- public RemoteSubscriptionResource getRemoteSubscriptionResource()
- {
- return remoteSubscriptionResource;
- }
-
- public void setRemoteSubscriptionResource( RemoteSubscriptionResource remoteSubscriptionResource )
- {
- this.remoteSubscriptionResource = remoteSubscriptionResource;
- }
-
- @Basic
- @Column( name = "remote_last_updated", nullable = false )
- public Instant getRemoteLastUpdated()
+ public RemoteSubscriptionResource getGroup()
{
- return remoteLastUpdated;
+ return group;
}
- public void setRemoteLastUpdated( Instant remoteLastUpdate )
+ public void setGroup( RemoteSubscriptionResource group )
{
- this.remoteLastUpdated = remoteLastUpdate;
+ this.group = group;
+ setId( (group == null) ? null : group.getId() );
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/CustomRemoteSubscriptionResourceUpdateRepository.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/CustomRemoteSubscriptionResourceUpdateRepository.java
index 258f8d4a..d0b90123 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/CustomRemoteSubscriptionResourceUpdateRepository.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/CustomRemoteSubscriptionResourceUpdateRepository.java
@@ -28,24 +28,16 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.model.DataGroupUpdate;
+import org.dhis2.fhir.adapter.data.repository.DataGroupUpdateRepository;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResourceUpdate;
-import org.springframework.data.rest.core.annotation.RestResource;
-
-import javax.annotation.Nonnull;
-import java.time.Instant;
/**
* Custom repository for {@link RemoteSubscriptionResourceUpdate}.
*
* @author volsch
*/
-public interface CustomRemoteSubscriptionResourceUpdateRepository
+public interface CustomRemoteSubscriptionResourceUpdateRepository extends DataGroupUpdateRepository, RemoteSubscriptionResource>
{
- @RestResource( exported = false )
- @Nonnull
- Instant getRemoteLastUpdated( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource );
-
- @RestResource( exported = false )
- boolean updateRemoteLastUpdated( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull Instant lastUpdated );
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionRepositoryImpl.java
index a13a5218..07e8d11c 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionRepositoryImpl.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionRepositoryImpl.java
@@ -133,8 +133,8 @@ protected List createAutoCreatedSubscriptionResource
remoteSubscription.getResources().add( rsr );
final RemoteSubscriptionResourceUpdate resourceUpdate = new RemoteSubscriptionResourceUpdate();
- resourceUpdate.setRemoteSubscriptionResource( rsr );
- resourceUpdate.setRemoteLastUpdated( Instant.now() );
+ resourceUpdate.setGroup( rsr );
+ resourceUpdate.setLastUpdated( Instant.now() );
rsr.setResourceUpdate( resourceUpdate );
autoCreatedRemoteSubscriptionResources.add( rsr );
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionResourceUpdateRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionResourceUpdateRepositoryImpl.java
index f9cb2f7c..334b525f 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionResourceUpdateRepositoryImpl.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/impl/CustomRemoteSubscriptionResourceUpdateRepositoryImpl.java
@@ -28,60 +28,36 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import org.dhis2.fhir.adapter.data.model.DataGroupUpdate;
+import org.dhis2.fhir.adapter.data.repository.impl.AbstractDataGroupUpdateRepositoryImpl;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResourceUpdate;
import org.dhis2.fhir.adapter.fhir.metadata.repository.CustomRemoteSubscriptionResourceUpdateRepository;
-import org.springframework.data.rest.core.annotation.RestResource;
-import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
-import javax.persistence.LockModeType;
-import javax.persistence.PersistenceContext;
-import java.time.Instant;
/**
* Implementation of {@link CustomRemoteSubscriptionResourceUpdateRepository}.
*/
-public class CustomRemoteSubscriptionResourceUpdateRepositoryImpl implements CustomRemoteSubscriptionResourceUpdateRepository
+public class CustomRemoteSubscriptionResourceUpdateRepositoryImpl extends AbstractDataGroupUpdateRepositoryImpl, RemoteSubscriptionResource>
+ implements CustomRemoteSubscriptionResourceUpdateRepository
{
- @PersistenceContext
- private EntityManager entityManager;
-
public CustomRemoteSubscriptionResourceUpdateRepositoryImpl( @Nonnull EntityManager entityManager )
{
- this.entityManager = entityManager;
+ super( entityManager );
}
- @RestResource( exported = false )
- @Nonnull
- @Override
- public Instant getRemoteLastUpdated( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource )
+
+ @Nonnull @Override protected Class getUpdateClass()
{
- RemoteSubscriptionResourceUpdate rsr = entityManager.find( RemoteSubscriptionResourceUpdate.class, remoteSubscriptionResource.getId() );
- if ( rsr == null )
- {
- rsr = new RemoteSubscriptionResourceUpdate();
- rsr.setId( remoteSubscriptionResource.getId() );
- rsr.setRemoteSubscriptionResource( remoteSubscriptionResource );
- rsr.setRemoteLastUpdated( Instant.now() );
- entityManager.persist( rsr );
- }
- return rsr.getRemoteLastUpdated();
+ return RemoteSubscriptionResourceUpdate.class;
}
- @RestResource( exported = false )
- @Transactional
+ @Nonnull
@Override
- public boolean updateRemoteLastUpdated( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource, @Nonnull Instant lastUpdated )
+ protected RemoteSubscriptionResourceUpdate createUpdate()
{
- final RemoteSubscriptionResourceUpdate rsr = entityManager.find( RemoteSubscriptionResourceUpdate.class, remoteSubscriptionResource.getId() );
- if ( rsr == null )
- {
- return false;
- }
- entityManager.lock( rsr, LockModeType.PESSIMISTIC_WRITE );
- rsr.setRemoteLastUpdated( lastUpdated );
- return true;
+ return new RemoteSubscriptionResourceUpdate();
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/listener/RemoteSubscriptionResourceEventListener.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/listener/RemoteSubscriptionResourceEventListener.java
index 159f2fea..8390bd04 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/listener/RemoteSubscriptionResourceEventListener.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/metadata/repository/listener/RemoteSubscriptionResourceEventListener.java
@@ -62,8 +62,8 @@ public RemoteSubscriptionResourceEventListener( @Nonnull EntityManager entityMan
protected void onBeforeCreate( RemoteSubscriptionResource entity )
{
final RemoteSubscriptionResourceUpdate resourceUpdate = new RemoteSubscriptionResourceUpdate();
- resourceUpdate.setRemoteSubscriptionResource( entity );
- resourceUpdate.setRemoteLastUpdated( Instant.now() );
+ resourceUpdate.setGroup( entity );
+ resourceUpdate.setLastUpdated( Instant.now() );
entity.setResourceUpdate( resourceUpdate );
// must not be set externally
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/queue/FhirQueueConfig.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/queue/FhirQueueConfig.java
index 232299dc..a0399ad4 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/queue/FhirQueueConfig.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/queue/FhirQueueConfig.java
@@ -136,7 +136,7 @@ protected DefaultJmsListenerContainerFactory jmsListenerContainerFactory( @Nonnu
@Bean
@Nonnull
- protected JmsTemplate webHookRequestQueueJmsTemplate( @Nonnull ConnectionFactory connectionFactory, @Nonnull MessageConverter jmsMessageConverter )
+ protected JmsTemplate fhirRestHookRequestQueueJmsTemplate( @Nonnull ConnectionFactory connectionFactory, @Nonnull MessageConverter jmsMessageConverter )
{
final JmsTemplate jmsTemplate = new JmsTemplate( connectionFactory );
jmsTemplate.setDefaultDestinationName( remoteConfig.getWebHookRequestQueue().getQueueName() );
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookController.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookController.java
index 7e35465e..fa320590 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookController.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookController.java
@@ -44,7 +44,6 @@
import javax.annotation.Nonnull;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Accepts the web hook request from the remote FHIR service and queues the request
@@ -61,10 +60,6 @@ public class RemoteRestHookController
{
private final Logger logger = LoggerFactory.getLogger( getClass() );
- private final String requestIdBase = UUID.randomUUID().toString();
-
- private final AtomicLong requestId = new AtomicLong();
-
private final RemoteSubscriptionResourceRepository resourceRepository;
private final RemoteRestHookProcessor processor;
@@ -100,12 +95,6 @@ public void receive( @PathVariable UUID subscriptionId, @PathVariable UUID subsc
throw new RestUnauthorizedException( "Authentication has failed." );
}
- processor.received( subscriptionResource.getId(), getCurrentRequestId() );
- }
-
- @Nonnull
- protected String getCurrentRequestId()
- {
- return requestIdBase + "#" + Long.toString( requestId.getAndIncrement(), 36 );
+ processor.process( subscriptionResource );
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookProcessor.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookProcessor.java
index abb7868d..bbf2bc18 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookProcessor.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/RemoteRestHookProcessor.java
@@ -28,8 +28,8 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-import javax.annotation.Nonnull;
-import java.util.UUID;
+import org.dhis2.fhir.adapter.data.processor.QueuedDataProcessor;
+import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
/**
* Processes incoming web hook requests in two steps. First the web hook request
@@ -39,7 +39,6 @@
*
* @author volsch
*/
-public interface RemoteRestHookProcessor
+public interface RemoteRestHookProcessor extends QueuedDataProcessor
{
- void received( @Nonnull UUID remoteSubscriptionResourceId, @Nonnull String requestId );
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/AbstractSubscriptionResourceBundleRetriever.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/AbstractSubscriptionResourceItemRetriever.java
similarity index 78%
rename from fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/AbstractSubscriptionResourceBundleRetriever.java
rename to fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/AbstractSubscriptionResourceItemRetriever.java
index 49e9896a..3d7d4d17 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/AbstractSubscriptionResourceBundleRetriever.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/AbstractSubscriptionResourceItemRetriever.java
@@ -37,9 +37,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
+import org.dhis2.fhir.adapter.data.model.ProcessedItemInfo;
+import org.dhis2.fhir.adapter.data.processor.DataProcessorItemRetriever;
+import org.dhis2.fhir.adapter.data.processor.QueuedDataProcessorException;
import org.dhis2.fhir.adapter.fhir.metadata.model.RemoteSubscriptionResource;
import org.dhis2.fhir.adapter.fhir.model.FhirVersionRestricted;
-import org.dhis2.fhir.adapter.fhir.remote.RemoteRestHookProcessorException;
import org.dhis2.fhir.adapter.fhir.repository.FhirClientUtils;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseBundle;
@@ -62,6 +64,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
@@ -73,49 +76,50 @@
*
* @author volsch
*/
-public abstract class AbstractSubscriptionResourceBundleRetriever implements FhirVersionRestricted
+public abstract class AbstractSubscriptionResourceItemRetriever implements DataProcessorItemRetriever, FhirVersionRestricted
{
private final Logger logger = LoggerFactory.getLogger( getClass() );
private final FhirContext fhirContext;
- protected AbstractSubscriptionResourceBundleRetriever( @Nonnull FhirContext fhirContext )
+ protected AbstractSubscriptionResourceItemRetriever( @Nonnull FhirContext fhirContext )
{
this.fhirContext = fhirContext;
}
+ @Override
@Nonnull
- public Instant poll( @Nonnull RemoteSubscriptionResource subscriptionResource, @Nonnull Instant remoteLastUpdated, int maxSearchCount, @Nonnull Consumer> consumer )
+ public Instant poll( @Nonnull RemoteSubscriptionResource group, @Nonnull Instant lastUpdated, int maxSearchCount, @Nonnull Consumer> consumer )
{
- final IGenericClient client = FhirClientUtils.createClient( fhirContext, subscriptionResource.getRemoteSubscription().getFhirEndpoint() );
- Instant lastUpdated = null;
- Instant fromLastUpdated = remoteLastUpdated;
+ final IGenericClient client = FhirClientUtils.createClient( fhirContext, group.getRemoteSubscription().getFhirEndpoint() );
+ Instant processedLastUpdated = null;
+ Instant fromLastUpdated = lastUpdated;
- final Set allResources = new HashSet<>();
- final List orderedAllResources = new ArrayList<>();
- Set previousResources = null;
+ final Set allResources = new HashSet<>();
+ final List orderedAllResources = new ArrayList<>();
+ Set previousResources = null;
boolean paging = false;
boolean backwardPaging = false;
boolean moreAvailable;
- final String resourceName = subscriptionResource.getFhirResourceType().getResourceTypeName();
+ final String resourceName = group.getFhirResourceType().getResourceTypeName();
do
{
- logger.debug( "Loading next since {} for remote subscription resource with maximum count {}.", fromLastUpdated, subscriptionResource.getId(), maxSearchCount );
- if ( lastUpdated == null )
+ logger.debug( "Loading next since {} for remote subscription resource with maximum count {}.", fromLastUpdated, group.getId(), maxSearchCount );
+ if ( processedLastUpdated == null )
{
// last updated must only bet set on the first search invocation
- lastUpdated = Instant.now();
+ processedLastUpdated = Instant.now();
}
fromLastUpdated = fromLastUpdated
- .minus( subscriptionResource.getRemoteSubscription().getToleranceMillis(), ChronoUnit.MILLIS );
- IBaseBundle bundle = createBaseQuery( client, resourceName, subscriptionResource, fromLastUpdated ).count( maxSearchCount )
+ .minus( group.getRemoteSubscription().getToleranceMillis(), ChronoUnit.MILLIS );
+ IBaseBundle bundle = createBaseQuery( client, resourceName, group, fromLastUpdated ).count( maxSearchCount )
.elementsSubset( "meta", "id" ).returnBundle( getBundleClass() ).sort().ascending( "_lastUpdated" ).execute();
do
{
- final List resources = new ArrayList<>();
+ final List resources = new ArrayList<>();
for ( final IAnyResource resource : getResourceEntries( bundle ) )
{
- resources.add( new SubscriptionResourceInfo( resource.getIdElement().getIdPart(),
+ resources.add( new ProcessedItemInfo( resource.getIdElement().getIdPart(),
(resource.getMeta().getLastUpdated() == null) ? null : resource.getMeta().getLastUpdated().toInstant(),
resource.getIdElement().getVersionIdPart() ) );
}
@@ -153,30 +157,30 @@ public Instant poll( @Nonnull RemoteSubscriptionResource subscriptionResource, @
}
}
}
- else if ( resources.size() < (totalCount = getTotalCount( client, resourceName, subscriptionResource, fromLastUpdated, currentBundle )) )
+ else if ( resources.size() < (totalCount = getTotalCount( client, resourceName, group, fromLastUpdated, currentBundle )) )
{
logger.debug( "Returned {} of {} for remote subscription resource {} with maximum requested {}.",
- resources.size(), totalCount, subscriptionResource.getId(), maxSearchCount );
- final Instant minLastUpdated = resources.stream().map( SubscriptionResourceInfo::getLastUpdated )
- .filter( lu -> (lu != null) ).min( Comparator.naturalOrder() ).orElse( null );
+ resources.size(), totalCount, group.getId(), maxSearchCount );
+ final Instant minLastUpdated = resources.stream().map( ProcessedItemInfo::getLastUpdated )
+ .filter( Objects::nonNull ).min( Comparator.naturalOrder() ).orElse( null );
if ( (minLastUpdated != null) && minLastUpdated.isBefore( fromLastUpdated ) )
{
logger.warn( "Remote subscription resource {} returned minimum last updated {} for lower bound {}.",
- subscriptionResource.getId(), minLastUpdated, fromLastUpdated );
+ group.getId(), minLastUpdated, fromLastUpdated );
}
if ( (previousResources != null) && previousResources.containsAll( resources ) && (previousResources.size() >= resources.size()) )
{
- throw new RemoteRestHookProcessorException( "Remote subscription resource " + subscriptionResource.getId() + " returned same result for last updated " +
+ throw new QueuedDataProcessorException( "Remote subscription resource " + group.getId() + " returned same result for last updated " +
fromLastUpdated + " (count " + resources.size() + " of maximum " + maxSearchCount + ")." );
}
previousResources = new HashSet<>( resources );
- final Instant maxLastUpdated = resources.stream().map( SubscriptionResourceInfo::getLastUpdated )
- .filter( lu -> (lu != null) ).max( Comparator.naturalOrder() ).orElse( null );
+ final Instant maxLastUpdated = resources.stream().map( ProcessedItemInfo::getLastUpdated )
+ .filter( Objects::nonNull ).max( Comparator.naturalOrder() ).orElse( null );
if ( maxLastUpdated == null )
{
- logger.warn( "Remote subscription resource {} does not support last updated timestamps.", subscriptionResource.getId() );
+ logger.warn( "Remote subscription resource {} does not support last updated timestamps.", group.getId() );
}
else
{
@@ -187,7 +191,7 @@ else if ( resources.size() < (totalCount = getTotalCount( client, resourceName,
}
else
{
- throw new RemoteRestHookProcessorException( "Remote subscription resource " + subscriptionResource.getId() + " last updated timestamp " +
+ throw new QueuedDataProcessorException( "Remote subscription resource " + group.getId() + " last updated timestamp " +
fromLastUpdated + " has not been changed after processing " + resources.size() + " resources (total " + totalCount + ")." );
}
}
@@ -208,7 +212,7 @@ else if ( resources.size() < (totalCount = getTotalCount( client, resourceName,
{
consumer.accept( orderedAllResources );
}
- return lastUpdated;
+ return processedLastUpdated;
}
protected long getTotalCount( @Nonnull IGenericClient client, @Nonnull String resourceName, @Nonnull RemoteSubscriptionResource subscriptionResource, @Nonnull Instant fromLastUpdated, @Nonnull IBaseBundle bundle )
@@ -224,7 +228,7 @@ protected long getTotalCount( @Nonnull IGenericClient client, @Nonnull String re
totalCount = getBundleTotalCount( newBundle );
if ( totalCount == null )
{
- throw new RemoteRestHookProcessorException( "Remote subscription resource " + subscriptionResource.getId() + " did not return requested total count." );
+ throw new QueuedDataProcessorException( "Remote subscription resource " + subscriptionResource.getId() + " did not return requested total count." );
}
return totalCount;
}
@@ -277,7 +281,7 @@ protected Map> getQuery( @Nonnull RemoteSubscriptionResourc
}
catch ( URISyntaxException e )
{
- throw new RemoteRestHookProcessorException( "FHIR criteria parameters of remote subscription resource " + subscriptionResource.getId() + " are no valid query string.", e );
+ throw new QueuedDataProcessorException( "FHIR criteria parameters of remote subscription resource " + subscriptionResource.getId() + " are no valid query string.", e );
}
final Map> result = new LinkedHashMap<>();
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookProcessorImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookProcessorImpl.java
index afe4c4ba..1552fba8 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookProcessorImpl.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookProcessorImpl.java
@@ -29,9 +29,17 @@
*/
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
+import org.dhis2.fhir.adapter.data.model.ProcessedItemInfo;
+import org.dhis2.fhir.adapter.data.model.UuidDataGroupId;
+import org.dhis2.fhir.adapter.data.processor.DataItemQueueItem;
+import org.dhis2.fhir.adapter.data.processor.DataProcessorItemRetriever;
+import org.dhis2.fhir.adapter.data.processor.QueuedDataProcessorException;
+import org.dhis2.fhir.adapter.data.processor.impl.AbstractQueuedDataProcessorImpl;
+import org.dhis2.fhir.adapter.data.processor.impl.DataGroupQueueItem;
import org.dhis2.fhir.adapter.fhir.data.model.ProcessedRemoteFhirResource;
-import org.dhis2.fhir.adapter.fhir.data.repository.AlreadyQueuedException;
-import org.dhis2.fhir.adapter.fhir.data.repository.IgnoredSubscriptionResourceException;
+import org.dhis2.fhir.adapter.fhir.data.model.ProcessedRemoteFhirResourceId;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteFhirResourceId;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteSubscriptionRequestId;
import org.dhis2.fhir.adapter.fhir.data.repository.ProcessedRemoteFhirResourceRepository;
import org.dhis2.fhir.adapter.fhir.data.repository.QueuedRemoteFhirResourceRepository;
import org.dhis2.fhir.adapter.fhir.data.repository.QueuedRemoteSubscriptionRequestRepository;
@@ -40,7 +48,6 @@
import org.dhis2.fhir.adapter.fhir.metadata.repository.RemoteSubscriptionResourceUpdateRepository;
import org.dhis2.fhir.adapter.fhir.model.FhirVersion;
import org.dhis2.fhir.adapter.fhir.remote.RemoteRestHookProcessor;
-import org.dhis2.fhir.adapter.fhir.remote.RemoteRestHookProcessorException;
import org.dhis2.fhir.adapter.fhir.repository.RemoteFhirResource;
import org.dhis2.fhir.adapter.fhir.security.SystemAuthenticationToken;
import org.slf4j.Logger;
@@ -49,27 +56,20 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
-import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.time.Instant;
import java.time.ZonedDateTime;
-import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
/**
* Implementation of {@link RemoteRestHookProcessor}.
@@ -77,226 +77,122 @@
* @author volsch
*/
@Service
-public class RemoteRestHookProcessorImpl implements RemoteRestHookProcessor
+public class RemoteRestHookProcessorImpl extends
+ AbstractQueuedDataProcessorImpl
+ implements RemoteRestHookProcessor
{
private final Logger logger = LoggerFactory.getLogger( getClass() );
private final RemoteProcessorConfig processorConfig;
- private final QueuedRemoteSubscriptionRequestRepository queuedRemoteSubscriptionRequestRepository;
-
private final RemoteSubscriptionResourceRepository remoteSubscriptionResourceRepository;
- private final RemoteSubscriptionResourceUpdateRepository remoteSubscriptionResourceUpdateRepository;
-
- private final ProcessedRemoteFhirResourceRepository processedRemoteFhirResourceRepository;
-
- private final QueuedRemoteFhirResourceRepository queuedRemoteFhirResourceRepository;
-
- private final Map bundleRetrievers = new HashMap<>();
-
- private final PlatformTransactionManager platformTransactionManager;
+ private final Map itemRetrievers = new HashMap<>();
- private final JmsTemplate webHookRequestQueueJmsTemplate;
-
- private final JmsTemplate fhirResourceQueueJmsTemplate;
-
- private final String requestIdBase = UUID.randomUUID().toString();
-
- private final AtomicLong requestId = new AtomicLong();
-
- public RemoteRestHookProcessorImpl( @Nonnull RemoteProcessorConfig processorConfig,
- @Nonnull QueuedRemoteSubscriptionRequestRepository queuedRemoteSubscriptionRequestRepository,
- @Nonnull RemoteSubscriptionResourceRepository remoteSubscriptionResourceRepository,
- @Nonnull RemoteSubscriptionResourceUpdateRepository remoteSubscriptionResourceUpdateRepository,
- @Nonnull ProcessedRemoteFhirResourceRepository processedRemoteFhirResourceRepository,
- @Nonnull QueuedRemoteFhirResourceRepository queuedRemoteFhirResourceRepository,
- @Nonnull ObjectProvider> bundleRetrievers,
+ public RemoteRestHookProcessorImpl(
+ @Nonnull QueuedRemoteSubscriptionRequestRepository queuedGroupRepository,
+ @Nonnull @Qualifier( "fhirRestHookRequestQueueJmsTemplate" ) JmsTemplate groupQueueJmsTemplate,
+ @Nonnull RemoteSubscriptionResourceUpdateRepository dataGroupUpdateRepository,
+ @Nonnull ProcessedRemoteFhirResourceRepository processedItemRepository,
+ @Nonnull QueuedRemoteFhirResourceRepository queuedItemRepository,
+ @Nonnull @Qualifier( "fhirResourceQueueJmsTemplate" ) JmsTemplate itemQueueJmsTemplate,
@Nonnull PlatformTransactionManager platformTransactionManager,
- @Nonnull @Qualifier( "webHookRequestQueueJmsTemplate" ) JmsTemplate webHookRequestQueueJmsTemplate,
- @Nonnull @Qualifier( "fhirResourceQueueJmsTemplate" ) JmsTemplate fhirResourceQueueJmsTemplate )
+ @Nonnull RemoteProcessorConfig processorConfig,
+ @Nonnull RemoteSubscriptionResourceRepository remoteSubscriptionResourceRepository,
+ @Nonnull ObjectProvider> itemRetrievers )
{
+ super( queuedGroupRepository, groupQueueJmsTemplate, dataGroupUpdateRepository, processedItemRepository, queuedItemRepository, itemQueueJmsTemplate, platformTransactionManager );
this.processorConfig = processorConfig;
- this.queuedRemoteSubscriptionRequestRepository = queuedRemoteSubscriptionRequestRepository;
this.remoteSubscriptionResourceRepository = remoteSubscriptionResourceRepository;
- this.remoteSubscriptionResourceUpdateRepository = remoteSubscriptionResourceUpdateRepository;
- this.processedRemoteFhirResourceRepository = processedRemoteFhirResourceRepository;
- this.queuedRemoteFhirResourceRepository = queuedRemoteFhirResourceRepository;
- this.platformTransactionManager = platformTransactionManager;
- this.webHookRequestQueueJmsTemplate = webHookRequestQueueJmsTemplate;
- this.fhirResourceQueueJmsTemplate = fhirResourceQueueJmsTemplate;
- bundleRetrievers.getIfAvailable( Collections::emptyList ).forEach( br -> {
+ itemRetrievers.getIfAvailable( Collections::emptyList ).forEach( br -> {
for ( final FhirVersion version : br.getFhirVersions() )
{
- RemoteRestHookProcessorImpl.this.bundleRetrievers.put( version, br );
+ RemoteRestHookProcessorImpl.this.itemRetrievers.put( version, br );
}
} );
}
@HystrixCommand
+ @Transactional( propagation = Propagation.NOT_SUPPORTED )
+ @JmsListener( destination = "#{@fhirRemoteConfig.webHookRequestQueue.queueName}",
+ concurrency = "#{@fhirRemoteConfig.webHookRequestQueue.listener.concurrency}" )
+ public void receive( @Nonnull RemoteRestHookRequest remoteRestHookRequest )
+ {
+ super.receive( remoteRestHookRequest );
+ }
+
@Override
- public void received( @Nonnull UUID remoteSubscriptionResourceId, @Nonnull String requestId )
+ protected QueuedRemoteSubscriptionRequestId createQueuedGroupId( @Nonnull RemoteSubscriptionResource group )
{
- final TransactionStatus transactionStatus = platformTransactionManager.getTransaction( new DefaultTransactionDefinition() );
- try
- {
- logger.debug( "Checking for a queued entry of remote subscription resource {}.", remoteSubscriptionResourceId );
- try
- {
- queuedRemoteSubscriptionRequestRepository.enqueue( remoteSubscriptionResourceId, requestId );
- }
- catch ( AlreadyQueuedException e )
- {
- logger.debug( "There is already a queued entry for remote subscription resource {}.", remoteSubscriptionResourceId );
- return;
- }
- catch ( IgnoredSubscriptionResourceException e )
- {
- // has already been logger with sufficient details
- return;
- }
+ return new QueuedRemoteSubscriptionRequestId( group );
+ }
- logger.debug( "Enqueuing entry for remote subscription resource {}.", remoteSubscriptionResourceId );
- webHookRequestQueueJmsTemplate.convertAndSend( new RemoteRestHookRequest( remoteSubscriptionResourceId, ZonedDateTime.now() ), message -> {
- // only one message for a remote subscription resource must be processed at a specific time (grouping)
- message.setStringProperty( "JMSXGroupID", remoteSubscriptionResourceId.toString() );
- return message;
- } );
- logger.info( "Enqueued entry for remote subscription resource {}.", remoteSubscriptionResourceId );
- }
- finally
- {
- if ( transactionStatus.isRollbackOnly() )
- {
- platformTransactionManager.rollback( transactionStatus );
- }
- else
- {
- platformTransactionManager.commit( transactionStatus );
- }
- }
+ @Nonnull
+ @Override
+ protected DataGroupQueueItem createDataGroupQueueItem( @Nonnull RemoteSubscriptionResource group )
+ {
+ return new RemoteRestHookRequest( group.getGroupId(), ZonedDateTime.now() );
}
- @HystrixCommand
- @Transactional( propagation = Propagation.NOT_SUPPORTED )
- @JmsListener( destination = "#{@fhirRemoteConfig.webHookRequestQueue.queueName}",
- concurrency = "#{@fhirRemoteConfig.webHookRequestQueue.listener.concurrency}" )
- public void receive( @Nonnull RemoteRestHookRequest remoteRestHookRequest )
+ @Nullable
+ @Override
+ protected RemoteSubscriptionResource findGroupByGroupId( @Nonnull UuidDataGroupId groupId )
{
- SecurityContextHolder.getContext().setAuthentication( new SystemAuthenticationToken() );
- try
- {
- receiveAuthenticated( remoteRestHookRequest );
- }
- finally
- {
- SecurityContextHolder.clearContext();
- }
+ return remoteSubscriptionResourceRepository.findByIdCached( groupId.getId() ).orElse( null );
}
- protected void receiveAuthenticated( @Nonnull RemoteRestHookRequest remoteRestHookRequest )
+ @Override
+ protected int getMaxProcessedAgeMinutes()
{
- logger.info( "Processing queued web hook request {}.", remoteRestHookRequest.getRemoteSubscriptionResourceId() );
- try
- {
- queuedRemoteSubscriptionRequestRepository.dequeued( remoteRestHookRequest.getRemoteSubscriptionResourceId() );
- }
- catch ( IgnoredSubscriptionResourceException e )
- {
- // has already been logger with sufficient details
- return;
- }
+ return processorConfig.getMaxProcessedAgeMinutes();
+ }
- final RemoteSubscriptionResource remoteSubscriptionResource =
- remoteSubscriptionResourceRepository.findByIdCached( remoteRestHookRequest.getRemoteSubscriptionResourceId() ).orElse( null );
- if ( remoteSubscriptionResource == null )
- {
- logger.warn( "Remote subscription resource {} is no longer available. Skipping processing of updated resources.",
- remoteRestHookRequest.getRemoteSubscriptionResourceId() );
- return;
- }
+ @Override
+ protected int getMaxSearchCount()
+ {
+ return processorConfig.getMaxSearchCount();
+ }
- final FhirVersion fhirVersion = remoteSubscriptionResource.getRemoteSubscription().getFhirVersion();
- final AbstractSubscriptionResourceBundleRetriever bundleRetriever = bundleRetrievers.get( fhirVersion );
- if ( bundleRetriever == null )
+ @Nonnull
+ @Override
+ protected DataProcessorItemRetriever getDataProcessorItemRetriever( @Nonnull RemoteSubscriptionResource group )
+ {
+ final FhirVersion fhirVersion = group.getRemoteSubscription().getFhirVersion();
+ final AbstractSubscriptionResourceItemRetriever itemRetriever = itemRetrievers.get( fhirVersion );
+ if ( itemRetriever == null )
{
- throw new RemoteRestHookProcessorException( "Remote subscription resource requires FHIR version " + fhirVersion +
- ", but no bundle retriever is available for that version." );
+ throw new QueuedDataProcessorException( "Remote subscription resource requires FHIR version " + fhirVersion +
+ ", but no item retriever is available for that version." );
}
+ return itemRetriever;
+ }
- final Instant remoteLastUpdated = remoteSubscriptionResourceUpdateRepository.getRemoteLastUpdated( remoteSubscriptionResource );
- final AtomicLong count = new AtomicLong();
- final Instant lastUpdated = bundleRetriever.poll( remoteSubscriptionResource, remoteLastUpdated, processorConfig.getMaxSearchCount(), resources -> {
- final String requestId = getCurrentRequestId();
- final Instant processedAt = Instant.now();
- final Set processedVersionedIds = processedRemoteFhirResourceRepository.findByVersionedIds( remoteSubscriptionResource,
- resources.stream().map( sr -> sr.toVersionString( processedAt ) ).collect( Collectors.toList() ) );
-
- resources.forEach( sr -> {
- final String versionedId = sr.toVersionString( processedAt );
- if ( !processedVersionedIds.contains( versionedId ) )
- {
- // persist processed remote FHIR resource and
- processedRemoteFhirResourceRepository.process( new ProcessedRemoteFhirResource( remoteSubscriptionResource, versionedId, processedAt ), p -> {
- final TransactionStatus transactionStatus = platformTransactionManager.getTransaction( new DefaultTransactionDefinition( TransactionDefinition.PROPAGATION_NOT_SUPPORTED ) );
- try
- {
- queuedRemoteFhirResourceRepository.enqueue( remoteSubscriptionResource.getId(), sr.getId(), requestId );
- fhirResourceQueueJmsTemplate.convertAndSend(
- new RemoteFhirResource( remoteSubscriptionResource.getId(), sr.getId(), sr.getVersion(), sr.getLastUpdated() ) );
- logger.debug( "FHIR Resource {} of remote subscription resource {} has been enqueued.",
- sr.getId(), remoteSubscriptionResource.getId() );
- count.incrementAndGet();
- }
- catch ( AlreadyQueuedException e )
- {
- logger.debug( "FHIR Resource {} of remote subscription resource {} is still queued.",
- sr.getId(), remoteSubscriptionResource.getId() );
- }
- catch ( IgnoredSubscriptionResourceException e )
- {
- // has already been logger with sufficient details
- }
- finally
- {
- if ( transactionStatus.isRollbackOnly() )
- {
- platformTransactionManager.rollback( transactionStatus );
- }
- else
- {
- platformTransactionManager.commit( transactionStatus );
- }
- }
- } );
- }
- } );
- } );
- remoteSubscriptionResourceUpdateRepository.updateRemoteLastUpdated( remoteSubscriptionResource, lastUpdated );
+ @Nonnull
+ @Override
+ protected ProcessedRemoteFhirResource createProcessedItem( @Nonnull RemoteSubscriptionResource group, @Nonnull String id, @Nonnull Instant processedAt )
+ {
+ return new ProcessedRemoteFhirResource( new ProcessedRemoteFhirResourceId( group, id ), processedAt );
+ }
- // Purging old data must not be done before and also must not be done asynchronously. The remote last updated
- // timestamp may be older than the purged data. And before purging the old data, the remote last updated
- // timestamp of the remote subscription resource must be updated by processing the complete FHIR resources that
- // belong to the remote subscription resource.
- purgeOldestProcessed( remoteSubscriptionResource );
- logger.info( "Processed queued web hook request {} with {} enqueued FHIR resources.",
- remoteRestHookRequest.getRemoteSubscriptionResourceId(), count.longValue() );
+ @Nonnull
+ @Override
+ protected QueuedRemoteFhirResourceId createQueuedItemId( @Nonnull RemoteSubscriptionResource group, @Nonnull ProcessedItemInfo processedItemInfo )
+ {
+ return new QueuedRemoteFhirResourceId( group, processedItemInfo.getId() );
}
@Nonnull
- protected String getCurrentRequestId()
+ @Override
+ protected DataItemQueueItem createDataItemQueueItem( @Nonnull RemoteSubscriptionResource group, @Nonnull ProcessedItemInfo processedItemInfo )
{
- return requestIdBase + "#" + Long.toString( requestId.getAndIncrement(), 36 );
+ return new RemoteFhirResource( group.getGroupId(), processedItemInfo );
}
- protected void purgeOldestProcessed( @Nonnull RemoteSubscriptionResource remoteSubscriptionResource )
+ @Nonnull
+ @Override
+ protected Authentication createAuthentication()
{
- final Instant from = Instant.now().minus( processorConfig.getMaxProcessedAgeMinutes(), ChronoUnit.MINUTES );
- logger.debug( "Purging oldest processed remote subscription FHIR resources before {} for remote subscription resource {}.",
- from, remoteSubscriptionResource.getId() );
- final int count = processedRemoteFhirResourceRepository.deleteOldest( remoteSubscriptionResource, from );
- logger.debug( "Purged {} oldest processed remote subscription FHIR resources before {} for remote subscription resource {}.",
- count, from, remoteSubscriptionResource.getId() );
+ return new SystemAuthenticationToken();
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookRequest.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookRequest.java
index cec029c6..6feba58c 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookRequest.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/remote/impl/RemoteRestHookRequest.java
@@ -28,6 +28,11 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.dhis2.fhir.adapter.data.model.UuidDataGroupId;
+import org.dhis2.fhir.adapter.data.processor.impl.DataGroupQueueItem;
+
import javax.annotation.Nonnull;
import java.io.Serializable;
import java.time.ZonedDateTime;
@@ -35,15 +40,15 @@
/**
* Remote web hook request that will be enqueued and dequeued in a message queue as JSON.
+ * The class must support the initial legacy serialized format.
+ * The class must not be moved to a different package since the full qualified class name is used in JMS messages.
*
* @author volsch
*/
-public class RemoteRestHookRequest implements Serializable
+public class RemoteRestHookRequest extends DataGroupQueueItem implements Serializable
{
private static final long serialVersionUID = -7911324825049826913L;
- private UUID remoteSubscriptionResourceId;
-
private ZonedDateTime receivedAt;
public RemoteRestHookRequest()
@@ -51,29 +56,27 @@ public RemoteRestHookRequest()
super();
}
- public RemoteRestHookRequest( @Nonnull UUID remoteSubscriptionResourceId, @Nonnull ZonedDateTime receivedAt )
+ public RemoteRestHookRequest( @Nonnull UuidDataGroupId dataGroupId, @Nonnull ZonedDateTime receivedAt )
{
- this.remoteSubscriptionResourceId = remoteSubscriptionResourceId;
- this.receivedAt = receivedAt;
+ super( dataGroupId, receivedAt );
}
- public UUID getRemoteSubscriptionResourceId()
+ @JsonIgnore
+ @Override
+ public UuidDataGroupId getDataGroupId()
{
- return remoteSubscriptionResourceId;
+ return super.getDataGroupId();
}
- public void setRemoteSubscriptionResourceId( UUID remoteSubscriptionResourceId )
- {
- this.remoteSubscriptionResourceId = remoteSubscriptionResourceId;
- }
-
- public ZonedDateTime getReceivedAt()
+ @JsonProperty
+ public UUID getRemoteSubscriptionResourceId()
{
- return receivedAt;
+ return (getDataGroupId() == null) ? null : getDataGroupId().getId();
}
- public void setReceivedAt( ZonedDateTime receivedAt )
+ public void setRemoteSubscriptionResourceId( UUID remoteSubscriptionResourceId )
{
- this.receivedAt = receivedAt;
+ setDataGroupId( (remoteSubscriptionResourceId == null) ? null :
+ new UuidDataGroupId( remoteSubscriptionResourceId ) );
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/RemoteFhirResource.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/RemoteFhirResource.java
index 126ad1b7..5204f13a 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/RemoteFhirResource.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/RemoteFhirResource.java
@@ -28,83 +28,86 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.dhis2.fhir.adapter.data.model.ProcessedItemInfo;
+import org.dhis2.fhir.adapter.data.model.UuidDataGroupId;
+import org.dhis2.fhir.adapter.data.processor.DataItemQueueItem;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.io.Serializable;
-import java.time.Instant;
import java.util.UUID;
/**
* Processing request for a remote FHIR resource that is enqueued and dequeued.
+ * The class must support the initial legacy serialized format.
+ * The class must not be moved to a different package since the full qualified class name is used in JMS messages.
*
* @author volsch
*/
-public class RemoteFhirResource implements Serializable
+public class RemoteFhirResource extends DataItemQueueItem implements Serializable
{
private static final long serialVersionUID = 1642564911249098319L;
- private UUID remoteSubscriptionResourceId;
-
- private String fhirResourceId;
-
- @JsonInclude( JsonInclude.Include.NON_NULL )
- private String fhirResourceVersion;
-
- @JsonInclude( JsonInclude.Include.NON_NULL )
- private Instant lastUpdated;
-
public RemoteFhirResource()
{
super();
}
- public RemoteFhirResource( @Nonnull UUID remoteSubscriptionResourceId, @Nonnull String fhirResourceId, @Nullable String fhirResourceVersion, @Nullable Instant lastUpdated )
+ public RemoteFhirResource( @Nonnull UuidDataGroupId dataGroupId, @Nonnull ProcessedItemInfo processedItemInfo )
{
- this.remoteSubscriptionResourceId = remoteSubscriptionResourceId;
- this.fhirResourceId = fhirResourceId;
- this.fhirResourceVersion = fhirResourceVersion;
- this.lastUpdated = lastUpdated;
+ super( dataGroupId, processedItemInfo );
}
- public UUID getRemoteSubscriptionResourceId()
+ @JsonIgnore
+ @Override
+ public UuidDataGroupId getDataGroupId()
{
- return remoteSubscriptionResourceId;
+ return super.getDataGroupId();
}
- public void setRemoteSubscriptionResourceId( UUID remoteSubscriptionResourceId )
+ @Override
+ public void setDataGroupId( UuidDataGroupId dataGroupId )
{
- this.remoteSubscriptionResourceId = remoteSubscriptionResourceId;
+ super.setDataGroupId( dataGroupId );
}
- public String getFhirResourceId()
+ @JsonProperty
+ public UUID getRemoteSubscriptionResourceId()
{
- return fhirResourceId;
+ return (getDataGroupId() == null) ? null : getDataGroupId().getId();
}
- public void setFhirResourceId( String fhirResourceId )
+ public void setRemoteSubscriptionResourceId( UUID remoteSubscriptionResourceId )
{
- this.fhirResourceId = fhirResourceId;
+ super.setDataGroupId( (remoteSubscriptionResourceId == null) ? null : new UuidDataGroupId( remoteSubscriptionResourceId ) );
}
- public String getFhirResourceVersion()
+ @JsonProperty( "fhirResourceId" )
+ @Override
+ public String getId()
{
- return fhirResourceVersion;
+ return super.getId();
}
- public void setFhirResourceVersion( String fhirResourceVersion )
+ @Override
+ public void setId( String id )
{
- this.fhirResourceVersion = fhirResourceVersion;
+ super.setId( id );
}
- public Instant getLastUpdated()
+ @JsonProperty( "fhirResourceVersion" )
+ @JsonInclude( JsonInclude.Include.NON_NULL )
+ @Override
+ public String getVersion()
{
- return lastUpdated;
+ return super.getVersion();
}
- public void setLastUpdated( Instant lastUpdated )
+ @Override
+ public void setVersion( String version )
{
- this.lastUpdated = lastUpdated;
+ super.setVersion( version );
}
}
diff --git a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java
index ecb1d88b..d59b1bd8 100644
--- a/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java
+++ b/fhir/src/main/java/org/dhis2/fhir/adapter/fhir/repository/impl/FhirRepositoryImpl.java
@@ -33,6 +33,7 @@
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.dhis2.fhir.adapter.auth.Authorization;
import org.dhis2.fhir.adapter.auth.AuthorizationContext;
+import org.dhis2.fhir.adapter.data.repository.IgnoredQueuedItemException;
import org.dhis2.fhir.adapter.dhis.DhisConflictException;
import org.dhis2.fhir.adapter.dhis.model.DhisResource;
import org.dhis2.fhir.adapter.dhis.tracker.program.EnrollmentService;
@@ -40,7 +41,7 @@
import org.dhis2.fhir.adapter.dhis.tracker.program.EventService;
import org.dhis2.fhir.adapter.dhis.tracker.trackedentity.TrackedEntityInstance;
import org.dhis2.fhir.adapter.dhis.tracker.trackedentity.TrackedEntityService;
-import org.dhis2.fhir.adapter.fhir.data.repository.IgnoredSubscriptionResourceException;
+import org.dhis2.fhir.adapter.fhir.data.model.QueuedRemoteFhirResourceId;
import org.dhis2.fhir.adapter.fhir.data.repository.QueuedRemoteFhirResourceRepository;
import org.dhis2.fhir.adapter.fhir.metadata.model.AuthenticationMethod;
import org.dhis2.fhir.adapter.fhir.metadata.model.FhirResourceType;
@@ -188,31 +189,30 @@ public void receive( @Nonnull RemoteFhirResource remoteFhirResource )
protected void receiveAuthenticated( @Nonnull RemoteFhirResource remoteFhirResource )
{
logger.info( "Processing FHIR resource {} for remote subscription resource {}.",
- remoteFhirResource.getFhirResourceId(), remoteFhirResource.getRemoteSubscriptionResourceId() );
- try
- {
- queuedRemoteFhirResourceRepository.dequeued(
- remoteFhirResource.getRemoteSubscriptionResourceId(), remoteFhirResource.getFhirResourceId() );
- }
- catch ( IgnoredSubscriptionResourceException e )
- {
- // has already been logger with sufficient details
- return;
- }
-
+ remoteFhirResource.getId(), remoteFhirResource.getRemoteSubscriptionResourceId() );
final RemoteSubscriptionResource remoteSubscriptionResource =
remoteSubscriptionResourceRepository.findByIdCached( remoteFhirResource.getRemoteSubscriptionResourceId() ).orElse( null );
if ( remoteSubscriptionResource == null )
{
logger.warn( "Remote subscription resource {} is no longer available. Skipping processing of updated FHIR resource {}.",
- remoteFhirResource.getRemoteSubscriptionResourceId(), remoteFhirResource.getFhirResourceId() );
+ remoteFhirResource.getRemoteSubscriptionResourceId(), remoteFhirResource.getId() );
+ return;
+ }
+
+ try
+ {
+ queuedRemoteFhirResourceRepository.dequeued( new QueuedRemoteFhirResourceId( remoteSubscriptionResource, remoteFhirResource.getId() ) );
+ }
+ catch ( IgnoredQueuedItemException e )
+ {
+ // has already been logger with sufficient details
return;
}
final RemoteSubscription remoteSubscription = remoteSubscriptionResource.getRemoteSubscription();
final Optional resource = remoteFhirRepository.findRefreshed(
remoteSubscription.getId(), remoteSubscription.getFhirVersion(), remoteSubscription.getFhirEndpoint(),
- remoteSubscriptionResource.getFhirResourceType().getResourceTypeName(), remoteFhirResource.getFhirResourceId() );
+ remoteSubscriptionResource.getFhirResourceType().getResourceTypeName(), remoteFhirResource.getId() );
if ( resource.isPresent() )
{
try ( final MDC.MDCCloseable c = MDC.putCloseable( "fhirId", remoteSubscriptionResource.getId() + ":" + resource.get().getIdElement().toUnqualifiedVersionless() ) )
@@ -239,7 +239,7 @@ protected void receiveAuthenticated( @Nonnull RemoteFhirResource remoteFhirResou
else
{
logger.info( "FHIR resource {}/{} for remote subscription resource {} is no longer available. Skipping processing of updated FHIR resource.",
- remoteSubscriptionResource.getFhirResourceType().getResourceTypeName(), remoteFhirResource.getFhirResourceId(), remoteSubscriptionResource.getId() );
+ remoteSubscriptionResource.getFhirResourceType().getResourceTypeName(), remoteFhirResource.getId(), remoteSubscriptionResource.getId() );
}
}
diff --git a/fhir/src/main/resources/db/migration/production/V1.1.0.0_0_0__TEI_Export.sql b/fhir/src/main/resources/db/migration/production/V1.1.0.0_0_0__TEI_Export.sql
new file mode 100644
index 00000000..388924f5
--- /dev/null
+++ b/fhir/src/main/resources/db/migration/production/V1.1.0.0_0_0__TEI_Export.sql
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2004-2018, University of Oslo
+ * All rights reserved.
+ *t
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO PROGRAM_STAGE_EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+ALTER TABLE fhir_queued_remote_subscription_request
+ DROP COLUMN IF EXISTS request_id;
+ALTER TABLE fhir_queued_remote_resource
+ DROP COLUMN IF EXISTS request_id;
diff --git a/pom.xml b/pom.xml
index 279b87df..ba20df9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,22 @@
+
+ com.mysema.maven
+ apt-maven-plugin
+ 1.1.3
+
+
+
+ process
+
+
+ target/generated-sources/java
+ com.querydsl.apt.jpa.JPAAnnotationProcessor
+
+
+
+
org.apache.maven.plugins
maven-compiler-plugin