Skip to content

Commit

Permalink
feat: enqueue processed events (#178)
Browse files Browse the repository at this point in the history
* feat: Fetch RapidPro contact uuid

Extracting RapidPro Contact uuid during contact creation to be used for later processing during RapidPro flow trigger.

* refactor: adding enrollment ID to mock event

For test consistency.

* test: testFetchAndProcessEvents

* feat: queue events route builder
  • Loading branch information
JohanGHole authored Jan 24, 2024
1 parent ed4bc3c commit 0ce3bfd
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2004-2022, University of Oslo
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* Neither the name of the HISP project nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.hisp.dhis.integration.rapidpro.processor;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
public class EventStatusUpdateProcessor implements Processor
{
@Autowired
private ObjectMapper objectMapper;

@Override
public void process( Exchange exchange )
throws
Exception
{
Map<String, Object> eventPayload = exchange.getProperty( "eventPayload", Map.class );
eventPayload.put( "status", "ACTIVE" );
eventPayload.put( "occurredAt", LocalDateTime.now().format( DateTimeFormatter.ofPattern( "yyyy-MM-dd" ) ) );
exchange.getMessage().setBody( Map.of( "events", List.of( eventPayload ) ) );
Map<String, String> queryParams = new HashMap<>();
queryParams.put( "async", "false" );
queryParams.put( "importStrategy", "UPDATE" );
exchange.getMessage().setHeader( "CamelDhis2.queryParams", queryParams );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,27 @@

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.hisp.dhis.api.model.Page;
import org.hisp.dhis.api.model.Pager;
import org.hisp.dhis.integration.rapidpro.aggregationStrategy.AttributesAggrStrategy;
import org.hisp.dhis.integration.rapidpro.aggregationStrategy.ProgramStageEventsAggrStrategy;
import org.hisp.dhis.integration.rapidpro.aggregationStrategy.RapidProContactEnricherAggrStrategy;
import org.hisp.dhis.integration.rapidpro.aggregationStrategy.TrackedEntityIdAggrStrategy;
import org.hisp.dhis.integration.rapidpro.processor.EventStatusUpdateProcessor;
import org.hisp.dhis.integration.rapidpro.processor.FetchDueEventsQueryParamSetter;
import org.hisp.dhis.integration.rapidpro.processor.SetAttributesEndpointProcessor;
import org.hisp.dhis.integration.rapidpro.processor.SetProgramStagesPropertyProcessor;
import org.hisp.dhis.integration.sdk.internal.operation.page.PageIterable;
import org.jgroups.logging.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Component
public class FetchScheduledTrackerEventsRouteBuilder extends AbstractRouteBuilder
public class QueueProgramStageEventsRouteBuilder extends AbstractRouteBuilder
{
@Autowired
private SetProgramStagesPropertyProcessor setProgramStagesPropertyProcessor;

@Autowired
private ProgramStageEventsAggrStrategy programStageEventsAggrStrategy;

@Autowired
private RapidProContactEnricherAggrStrategy rapidProContactEnricherAggrStrategy;

@Autowired
private FetchDueEventsQueryParamSetter fetchDueEventsQueryParamSetter;

Expand All @@ -72,6 +63,9 @@ public class FetchScheduledTrackerEventsRouteBuilder extends AbstractRouteBuilde
@Autowired
private SetAttributesEndpointProcessor setAttributesEndpointProcessor;

@Autowired
private EventStatusUpdateProcessor eventStatusUpdateProcessor;

@Override
protected void doConfigure()
throws
Expand All @@ -80,24 +74,26 @@ protected void doConfigure()
from( "servlet:tasks/syncEvents?muteException=true" )
.precondition( "{{sync.dhis2.events.to.rapidpro.flows}}" )
.removeHeaders( "*" )
.to( "direct:fetchAndProcessEvents" )
.to( "direct:queueEvents" )
.setHeader( Exchange.CONTENT_TYPE, constant( "application/json" ) )
.setBody( constant( Map.of( "status", "success", "data", "Fetched and enqueued due program stage events" ) ) )
.marshal().json();

from( "quartz://fetchDueEvents?cron={{sync.events.schedule.expression:0 0/30 * * * ?}}&stateful=true" )
.precondition( "{{sync.dhis2.events.to.rapidpro.flows}}" )
.to( "direct:fetchAndProcessEvents" );
.to( "direct:queueEvents" );

from( "direct:fetchAndProcessEvents" )
.routeId( "Fetch And Process Tracker Events" )
from("direct:queueEvents")
.routeId( "Queue Program Stage Events" )
.to( "direct:fetchDueEvents" )
.split( simple( "${exchangeProperty.dueEvents}" ) )
.marshal().json().transform().body( String.class )
.setProperty( "eventPayload", simple( "${body}" ) )
.unmarshal().json()
.to( "direct:fetchAttributes" )
.to("direct:createRapidProContact");
.transform( datasonnet( "resource:classpath:event.ds", String.class, "application/x-java-object", "application/json" ) )
.to( "jms:queue:events?exchangePattern=InOnly" )
.unmarshal().json()
.log( LoggingLevel.DEBUG, LOGGER, "Enqueued event [eventId => ${body[event]}, programStage => ${body[programStage]}]" )
.to( "direct:updateDhisProgramStageEventStatus" );

from( "direct:fetchDueEvents" )
.routeId( "Fetch Due Events" )
Expand All @@ -124,29 +120,16 @@ protected void doConfigure()
.log( LoggingLevel.ERROR, LOGGER, "Error while fetching phone number attribute from DHIS2 enrollment ${body[enrollment]}. Hint: Be sure to set the 'dhis2.phone.number.attribute.code' config property." )
.stop();

from( "direct:createRapidProContact" )
.routeId( "Create RapidPro Contact" )
.log( LoggingLevel.INFO, LOGGER, "Creating RapidPro contact for DHIS2 enrollment ${body[enrollment]}" )
.setHeader( "Authorization", constant( "Token {{rapidpro.api.token}}" ) )
.enrich().simple( "{{rapidpro.api.url}}/contacts.json?urn=${body[contactUrn]}&httpMethod=GET" )
.aggregationStrategy( rapidProContactEnricherAggrStrategy )
.setProperty( "originalPayload", simple( "${body}") )
.choice()
.when( simple ("${body[results].size()} > 0" ) )
.log( LoggingLevel.DEBUG, LOGGER, "RapidPro Contact already exists for DHIS2 enrollment ${exchangeProperty.originalPayload[enrollment]}. No action needed." )
.otherwise()
.log( LoggingLevel.DEBUG, LOGGER, "RapidPro Contact does not exist for DHIS2 enrollment ${exchangeProperty.originalPayload[enrollment]}. Creating new contact...")
.transform(
datasonnet( "resource:classpath:trackedEntityContact.ds", Map.class, "application/x-java-object",
"application/x-java-object" ) )
.setHeader( "Authorization", constant( "Token {{rapidpro.api.token}}" ) )
.marshal().json().convertBodyTo( String.class )
.toD( "{{rapidpro.api.url}}/contacts.json?httpMethod=POST&okStatusCodeRange=200-499" )
.choice().when( header( Exchange.HTTP_RESPONSE_CODE ).isNotEqualTo( "201" ) )
.log( LoggingLevel.WARN, LOGGER, "Unexpected status code when creating RapidPro contact for DHIS2 enrollment ${exchangeProperty.originalPayload[enrollment]} => HTTP ${header.CamelHttpResponseCode}. HTTP response body => ${body}" )
.end()
.end()
.setBody( simple( "${exchangeProperty.originalPayload}" ) )
from("direct:updateDhisProgramStageEventStatus")
.routeId( "Update DHIS Program Stage Event Status" )
.process( eventStatusUpdateProcessor )
.marshal().json().convertBodyTo( String.class )
.toD( "dhis2://post/resource?path=tracker&inBody=resource&client=#dhis2Client" )
.unmarshal().json()
.choice().when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) )
.log( LoggingLevel.DEBUG, LOGGER, "Successfully updated DHIS program stage event status for event with ID => ${exchangeProperty.eventPayload['event']}" )
.otherwise()
.log( LoggingLevel.ERROR, LOGGER, "Unexpected status code when updating the dhis program stage event status for event with ID => ${exchangeProperty.eventPayload['event']}. HTTP ${header.CamelHttpResponseCode}. HTTP response body => ${body}" )
.end();
}
}
7 changes: 7 additions & 0 deletions src/main/resources/event.ds
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
event: body.event,
enrollment: body.enrollment,
programStage: body.programStage,
contactUrn: body.contactUrn,
givenName: body.givenName
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.AdviceWith;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.CamelLogger;
Expand All @@ -51,12 +50,10 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;
import static org.hisp.dhis.integration.rapidpro.Environment.DHIS2_CLIENT;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class FetchScheduledTrackerEventsRouteBuilderFunctionalTestCase extends AbstractFunctionalTestCase
public class QueueProgramStageEventsRouteBuilderFunctionalTestCase extends AbstractFunctionalTestCase
{
@Autowired
protected ProgramStageToFlowMap programStageToFlowMap;
Expand Down Expand Up @@ -234,8 +231,8 @@ public void testAttributesEndpointWhenTypeAttributes()
camelContext.start();

Map<String, Object> body = new HashMap<>();
body.put( "enrollment", enrollmentId );
body.put( "contactUrn", "1234" );
body.put( "enrollment", enrollmentId );
producerTemplate.sendBody( "direct:fetchAttributes", ExchangePattern.InOut, body );
spyEndpoint.assertIsSatisfied();
Exchange exchange = spyEndpoint.getExchanges().get( 0 );
Expand Down Expand Up @@ -359,74 +356,88 @@ public void testFetchAttributesLogsErrorWhenInvalidPhoneNumberCode()
}

@Test
public void testCreateRapidProContactGivenValidUrn()
public void testEventTransformationReturnsExpectedValues()
throws
IOException
Exception
{
assertRapidProPreCondition();
String phoneNumber = "12345678";
String givenName = "John";
String programStage = "ZP5HZ87wzc0";
String enrollmentId = Environment.createDhis2TrackedEntityWithEnrollment( Environment.ORG_UNIT_ID, phoneNumber,
"ID-1", givenName, List.of( programStage ) );
AdviceWith.adviceWith( camelContext, "Queue Program Stage Events",
r -> r.interceptSendToEndpoint( "jms:queue:events?exchangePattern=InOnly" ).to( "mock:spy" ).stop() );
MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class );
spyEndpoint.setExpectedCount( 1 );

camelContext.start();
Map<String, Object> body = new HashMap<>();
body.put( "contactUrn", "whatsapp:12345678" );
producerTemplate.sendBody( "direct:createRapidProContact", body );
given( RAPIDPRO_API_REQUEST_SPEC ).get( "contacts.json" ).then()
.body( "results.size()", equalTo( 1 ) )
.body( "results[0].urns[0]", equalTo( "whatsapp:12345678" ) );
producerTemplate.sendBody( "direct:queueEvents", ExchangePattern.InOut, null );
spyEndpoint.assertIsSatisfied( 10000 );
Exchange exchange = spyEndpoint.getExchanges().get( 0 );
Map<String, Object> body = objectMapper.readValue( exchange.getMessage().getBody( String.class ), Map.class );
String expectedContactUrn = "whatsapp:" + phoneNumber;
assertEquals( expectedContactUrn, body.get( "contactUrn" ) );
assertEquals( givenName, body.get( "givenName" ) );
assertEquals( enrollmentId, body.get( "enrollment" ) );
assertEquals( programStage, body.get( "programStage" ) );
}

@Test
public void testCreateRapidProContactFailsGivenInvalidUrn()
public void testUpdateDhisProgramStageEventStatusWithValidEventId()
throws
Exception
{
assertRapidProPreCondition();
Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 1, List.of( "ZP5HZ87wzc0" ) );
Thread.sleep( 1000 );
Map<String, Object> payload = (Map<String, Object>) DHIS2_CLIENT.get( "tracker/events" )
.transfer()
.returnAs( Map.class );
List<Map<String, Object>> instances = (List<Map<String, Object>>) payload.get( "instances" );
Map<String, Object> event = instances.get( 0 );
CountDownLatch expectedLogMessage = new CountDownLatch( 1 );
((SpringBootCamelContext) camelContext)
.addLogListener( ( Exchange exchange, CamelLogger camelLogger, String message ) -> {
if ( camelLogger.getLevel().name().equals( "WARN" ) && message.startsWith(
"Unexpected status code when creating RapidPro contact for " ) )
if ( camelLogger.getLevel().name().equals( "DEBUG" ) && message.startsWith(
String.format( "Successfully updated DHIS program stage event status for event with ID => %s",
event.get( "event" ) ) ) )
{
expectedLogMessage.countDown();
}
return message;
} );

camelContext.start();
Map<String, Object> body = new HashMap<>();
body.put( "contactUrn", "whatsapp:invalid" );
producerTemplate.sendBody( "direct:createRapidProContact", body );
producerTemplate.sendBodyAndProperty( "direct:updateDhisProgramStageEventStatus", null, "eventPayload", event );
Thread.sleep( 1000 );
assertEquals( 0, expectedLogMessage.getCount() );
}

@Test
public void testCreateRapidProContactWhenContactAlreadyExists()
public void testQueueEvents()
throws
Exception
{
assertRapidProPreCondition();
CountDownLatch expectedLogMessage = new CountDownLatch( 2 );
Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 10, List.of( "ZP5HZ87wzc0" ) );
CountDownLatch expectedLogMessage = new CountDownLatch( 10 );
((SpringBootCamelContext) camelContext)
.addLogListener( ( Exchange exchange, CamelLogger camelLogger, String message ) -> {
if ( camelLogger.getLevel().name().equals( "DEBUG" ) && message.startsWith(
"RapidPro Contact already exists for DHIS2" ) )
"Successfully updated DHIS program stage event status for event with ID =>" ) )
{
expectedLogMessage.countDown();
}
return message;
} );
camelContext.start();
Map<String, Object> body = new HashMap<>();
body.put( "contactUrn", "whatsapp:12345678" );
body.put( "enrollment", "enrollment-id" );
producerTemplate.sendBody( "direct:createRapidProContact", body );
assertEquals( 2, expectedLogMessage.getCount() );
producerTemplate.sendBody( "direct:createRapidProContact", body );
assertEquals( 1, expectedLogMessage.getCount() );
given( RAPIDPRO_API_REQUEST_SPEC ).get( "contacts.json" ).then()
.body( "results.size()", equalTo( 1 ) );
}

private void assertRapidProPreCondition()
{
given( RAPIDPRO_API_REQUEST_SPEC ).get( "contacts.json" ).then()
.body( "results.size()", equalTo( 0 ) );
}
AdviceWith.adviceWith( camelContext, "Queue Program Stage Events",
r -> r.interceptSendToEndpoint( "jms:queue:events?exchangePattern=InOnly" ).to( "mock:spy" ) );
MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class );
spyEndpoint.setExpectedCount( 10 );

camelContext.start();
producerTemplate.sendBody( "direct:queueEvents", ExchangePattern.InOut, null );
spyEndpoint.assertIsSatisfied( 10000 );
assertEquals( 0, expectedLogMessage.getCount() );
}

}

0 comments on commit 0ce3bfd

Please sign in to comment.