Skip to content

Commit

Permalink
Implemented subscription payload handling in order to support
Browse files Browse the repository at this point in the history
special cases of subscriptions (custom subscriptions).
  • Loading branch information
volsch committed Jan 30, 2019
1 parent 4464605 commit 8da4103
Show file tree
Hide file tree
Showing 17 changed files with 960 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.dhis2.fhir.adapter.data.processor.impl;

/*
* Copyright (c) 2004-2018, University of Oslo
* Copyright (c) 2004-2019, University of Oslo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -177,7 +177,7 @@ protected void process( @Nonnull G group, @Nullable Integer rateMillis )
}
catch ( IgnoredQueuedItemException e )
{
// has already been logger with sufficient details
// has already been logged with sufficient details
return;
}

Expand Down Expand Up @@ -260,27 +260,10 @@ protected void receiveAuthenticated( @Nonnull DataGroupQueueItem<GI> dataGroupQu
{
// persist processed item
processedItemRepository.process( createProcessedItem( group, processedId, processedAt ), p -> {
final TransactionStatus transactionStatus = platformTransactionManager.getTransaction(
new DefaultTransactionDefinition( TransactionDefinition.PROPAGATION_NOT_SUPPORTED ) );
try
if ( enqueueDataItem( group, item, false, true ) )
{
queuedItemRepository.enqueue( createQueuedItemId( group, item ) );
itemQueueJmsTemplate.convertAndSend( createDataItemQueueItem( group, item ) );
logger.debug( "Item {} of group {} has been enqueued.", item.getId(), group.getGroupId() );
count.incrementAndGet();
}
catch ( AlreadyQueuedException e )
{
logger.debug( "Item {} of group {} is still queued.", item.getId(), group.getGroupId() );
}
catch ( IgnoredQueuedItemException e )
{
// has already been logged with sufficient details
}
finally
{
finalizeTransaction( transactionStatus );
}
} );
}
} ) );
Expand Down Expand Up @@ -308,6 +291,43 @@ else if ( isPeriodicInfoLogging() )
}
}

protected boolean enqueueDataItem( @Nonnull G group, @Nonnull ProcessedItemInfo item, boolean persistedDataItem, boolean autonomousTransaction )
{
final TransactionStatus transactionStatus;
if ( autonomousTransaction )
{
transactionStatus = platformTransactionManager.getTransaction(
new DefaultTransactionDefinition( TransactionDefinition.PROPAGATION_NOT_SUPPORTED ) );
}
else
{
transactionStatus = null;
}
try
{
queuedItemRepository.enqueue( createQueuedItemId( group, item ) );
itemQueueJmsTemplate.convertAndSend( createDataItemQueueItem( group, item, persistedDataItem ) );
logger.debug( "Item {} of group {} has been enqueued.", item.getId(), group.getGroupId() );
return true;
}
catch ( AlreadyQueuedException e )
{
logger.debug( "Item {} of group {} is still queued.", item.getId(), group.getGroupId() );
}
catch ( IgnoredQueuedItemException e )
{
// has already been logged with sufficient details
}
finally
{
if ( transactionStatus != null )
{
finalizeTransaction( transactionStatus );
}
}
return false;
}

private void awaitTaskTermination( @Nonnull ForkJoinTask<?> task )
{
try
Expand Down Expand Up @@ -384,7 +404,7 @@ protected Authentication createAuthentication()
protected abstract QI createQueuedItemId( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo );

@Nonnull
protected abstract DataItemQueueItem<GI> createDataItemQueueItem( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo );
protected abstract DataItemQueueItem<GI> createDataItemQueueItem( @Nonnull G group, @Nonnull ProcessedItemInfo processedItemInfo, boolean persistedDataItem );

@Nonnull
protected abstract SG getStoredItemGroup( @Nonnull G group );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.dhis2.fhir.adapter.data.repository;

/*
* Copyright (c) 2004-2019, University of Oslo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* Neither the name of the HISP project nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

/**
* Thrown if it has been tried too many times to persist an entity that may be
* tried to insert multiple times at the same time and need to be updated in case
* it does exist.
*
* @author volsch
*/
public class TooManyPersistRetriesException extends RuntimeException
{
private static final long serialVersionUID = -6868353655551006879L;

public TooManyPersistRetriesException( String message )
{
super( message );
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.dhis2.fhir.adapter.dhis.sync.impl;

/*
* Copyright (c) 2004-2018, University of Oslo
* Copyright (c) 2004-2019, University of Oslo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -191,7 +191,7 @@ protected QueuedDhisResourceId createQueuedItemId( @Nonnull DhisSyncGroup group,

@Nonnull
@Override
protected DhisResourceQueueItem createDataItemQueueItem( @Nonnull DhisSyncGroup group, @Nonnull ProcessedItemInfo processedItemInfo )
protected DhisResourceQueueItem createDataItemQueueItem( @Nonnull DhisSyncGroup group, @Nonnull ProcessedItemInfo processedItemInfo, boolean persistedDataItem )
{
return new DhisResourceQueueItem( group.getGroupId(), processedItemInfo );
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.dhis2.fhir.adapter.fhir.data.model;

/*
* Copyright (c) 2004-2019, University of Oslo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* Neither the name of the HISP project nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServerResource;
import org.dhis2.fhir.adapter.fhir.model.FhirVersion;
import org.hibernate.annotations.GenericGenerator;

import javax.persistence.Basic;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
import java.io.Serializable;
import java.time.Instant;
import java.util.UUID;

/**
* Contains a FHIR resource that has been delivered by a subscription notification.
*
* @author volsch
*/
@Entity
@Table( name = "fhir_subscription_resource" )
@NamedQuery( name = SubscriptionFhirResource.RESOURCE_NAMED_QUERY, query = "SELECT sfr FROM SubscriptionFhirResource sfr WHERE sfr.fhirServerResource=:fhirServerResource AND sfr.fhirResourceId=:fhirResourceId" )
public class SubscriptionFhirResource implements Serializable
{
private static final long serialVersionUID = -7965763701550940008L;

public static final String RESOURCE_NAMED_QUERY = "SubscriptionFhirResource.resource";

private UUID id;

private Instant createdAt;

private FhirServerResource fhirServerResource;

private String fhirResourceId;

private String contentType;

private FhirVersion fhirVersion;

private String fhirResource;

@GeneratedValue( generator = "uuid2" )
@GenericGenerator( name = "uuid2", strategy = "uuid2" )
@Id
@Column( name = "id", nullable = false )
public UUID getId()
{
return id;
}

public void setId( UUID id )
{
this.id = id;
}

@Basic
@Column( name = "created_at", nullable = false )
public Instant getCreatedAt()
{
return createdAt;
}

public void setCreatedAt( Instant createdAt )
{
this.createdAt = createdAt;
}

@ManyToOne( optional = false )
@JoinColumn( name = "fhir_server_resource_id", nullable = false )
public FhirServerResource getFhirServerResource()
{
return fhirServerResource;
}

public void setFhirServerResource( FhirServerResource fhirServerResource )
{
this.fhirServerResource = fhirServerResource;
}

@Basic
@Column( name = "fhir_resource_id", length = 200 )
public String getFhirResourceId()
{
return fhirResourceId;
}

public void setFhirResourceId( String fhirResourceId )
{
this.fhirResourceId = fhirResourceId;
}

@Basic
@Column( name = "content_type", length = 100, nullable = false )
public String getContentType()
{
return contentType;
}

public void setContentType( String contentType )
{
this.contentType = contentType;
}

@Basic
@Column( name = "fhir_version", nullable = false )
@Enumerated( EnumType.STRING )
public FhirVersion getFhirVersion()
{
return fhirVersion;
}

public void setFhirVersion( FhirVersion fhirVersion )
{
this.fhirVersion = fhirVersion;
}

@Basic
@Column( name = "fhir_resource" )
@Lob
public String getFhirResource()
{
return fhirResource;
}

public void setFhirResource( String fhirResource )
{
this.fhirResource = fhirResource;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.dhis2.fhir.adapter.fhir.data.repository;

/*
* Copyright (c) 2004-2019, University of Oslo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* Neither the name of the HISP project nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

import org.dhis2.fhir.adapter.fhir.data.model.SubscriptionFhirResource;
import org.dhis2.fhir.adapter.fhir.metadata.model.FhirServerResource;
import org.dhis2.fhir.adapter.fhir.model.FhirVersion;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Optional;

/**
* Custom repository that contains {@link org.dhis2.fhir.adapter.fhir.data.model.SubscriptionFhirResource}s.
*
* @author volsch
*/
public interface CustomSubscriptionFhirResourceRepository
{
void enqueue( @Nonnull FhirServerResource fhirServerResource, @Nullable String contentType, @Nonnull FhirVersion fhirVersion, @Nonnull String fhirResourceId, @Nonnull String fhirResource );

boolean deleteEnqueued( @Nonnull SubscriptionFhirResource subscriptionFhirResource );

@Nonnull
Optional<SubscriptionFhirResource> findResource( @Nonnull FhirServerResource fhirServerResource, @Nonnull String fhirResourceId );
}
Loading

0 comments on commit 8da4103

Please sign in to comment.