From 0ce3bfdf10945c0f6d483f96ebda07ea39a14134 Mon Sep 17 00:00:00 2001
From: Johan Hole <36231049+JohanGHole@users.noreply.github.com>
Date: Wed, 24 Jan 2024 17:41:44 +0100
Subject: [PATCH] feat: enqueue processed events (#178)

* feat: Fetch RapidPro contact uuid

Extracting RapidPro Contact uuid during contact creation to be used for later processing during RapidPro flow trigger.

* refactor: adding enrollment ID to mock event

For test consistency.

* test: testFetchAndProcessEvents

* feat: queue events route builder
---
 .../processor/EventStatusUpdateProcessor.java | 62 +++++++++++++
 ... QueueProgramStageEventsRouteBuilder.java} | 65 +++++--------
 src/main/resources/event.ds                   |  7 ++
 ...EventsRouteBuilderFunctionalTestCase.java} | 93 +++++++++++--------
 4 files changed, 145 insertions(+), 82 deletions(-)
 create mode 100644 src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventStatusUpdateProcessor.java
 rename src/main/java/org/hisp/dhis/integration/rapidpro/route/{FetchScheduledTrackerEventsRouteBuilder.java => QueueProgramStageEventsRouteBuilder.java} (66%)
 create mode 100644 src/main/resources/event.ds
 rename src/test/java/org/hisp/dhis/integration/rapidpro/route/{FetchScheduledTrackerEventsRouteBuilderFunctionalTestCase.java => QueueProgramStageEventsRouteBuilderFunctionalTestCase.java} (85%)

diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventStatusUpdateProcessor.java b/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventStatusUpdateProcessor.java
new file mode 100644
index 00000000..f8d4be80
--- /dev/null
+++ b/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventStatusUpdateProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2004-2022, University of Oslo
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * Neither the name of the HISP project nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.hisp.dhis.integration.rapidpro.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class EventStatusUpdateProcessor implements Processor
+{
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public void process( Exchange exchange )
+        throws
+        Exception
+    {
+        Map<String, Object> eventPayload = exchange.getProperty( "eventPayload", Map.class );
+        eventPayload.put( "status", "ACTIVE" );
+        eventPayload.put( "occurredAt", LocalDateTime.now().format( DateTimeFormatter.ofPattern( "yyyy-MM-dd" ) ) );
+        exchange.getMessage().setBody( Map.of( "events", List.of( eventPayload ) ) );
+        Map<String, String> queryParams = new HashMap<>();
+        queryParams.put( "async", "false" );
+        queryParams.put( "importStrategy", "UPDATE" );
+        exchange.getMessage().setHeader( "CamelDhis2.queryParams", queryParams );
+    }
+}
diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/FetchScheduledTrackerEventsRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilder.java
similarity index 66%
rename from src/main/java/org/hisp/dhis/integration/rapidpro/route/FetchScheduledTrackerEventsRouteBuilder.java
rename to src/main/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilder.java
index 7c11c567..a4aa7149 100644
--- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/FetchScheduledTrackerEventsRouteBuilder.java
+++ b/src/main/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilder.java
@@ -30,26 +30,20 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
-import org.hisp.dhis.api.model.Page;
-import org.hisp.dhis.api.model.Pager;
 import org.hisp.dhis.integration.rapidpro.aggregationStrategy.AttributesAggrStrategy;
 import org.hisp.dhis.integration.rapidpro.aggregationStrategy.ProgramStageEventsAggrStrategy;
-import org.hisp.dhis.integration.rapidpro.aggregationStrategy.RapidProContactEnricherAggrStrategy;
 import org.hisp.dhis.integration.rapidpro.aggregationStrategy.TrackedEntityIdAggrStrategy;
+import org.hisp.dhis.integration.rapidpro.processor.EventStatusUpdateProcessor;
 import org.hisp.dhis.integration.rapidpro.processor.FetchDueEventsQueryParamSetter;
 import org.hisp.dhis.integration.rapidpro.processor.SetAttributesEndpointProcessor;
 import org.hisp.dhis.integration.rapidpro.processor.SetProgramStagesPropertyProcessor;
-import org.hisp.dhis.integration.sdk.internal.operation.page.PageIterable;
-import org.jgroups.logging.Log;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 @Component
-public class FetchScheduledTrackerEventsRouteBuilder extends AbstractRouteBuilder
+public class QueueProgramStageEventsRouteBuilder extends AbstractRouteBuilder
 {
     @Autowired
     private SetProgramStagesPropertyProcessor setProgramStagesPropertyProcessor;
@@ -57,9 +51,6 @@ public class FetchScheduledTrackerEventsRouteBuilder extends AbstractRouteBuilde
     @Autowired
     private ProgramStageEventsAggrStrategy programStageEventsAggrStrategy;
 
-    @Autowired
-    private RapidProContactEnricherAggrStrategy rapidProContactEnricherAggrStrategy;
-
     @Autowired
     private FetchDueEventsQueryParamSetter fetchDueEventsQueryParamSetter;
 
@@ -72,6 +63,9 @@ public class FetchScheduledTrackerEventsRouteBuilder extends AbstractRouteBuilde
     @Autowired
     private SetAttributesEndpointProcessor setAttributesEndpointProcessor;
 
+    @Autowired
+    private EventStatusUpdateProcessor eventStatusUpdateProcessor;
+
     @Override
     protected void doConfigure()
         throws
@@ -80,24 +74,26 @@ protected void doConfigure()
         from( "servlet:tasks/syncEvents?muteException=true" )
             .precondition( "{{sync.dhis2.events.to.rapidpro.flows}}" )
             .removeHeaders( "*" )
-            .to( "direct:fetchAndProcessEvents" )
+            .to( "direct:queueEvents" )
             .setHeader( Exchange.CONTENT_TYPE, constant( "application/json" ) )
             .setBody( constant( Map.of( "status", "success", "data", "Fetched and enqueued due program stage events" ) ) )
             .marshal().json();
 
         from( "quartz://fetchDueEvents?cron={{sync.events.schedule.expression:0 0/30 * * * ?}}&stateful=true" )
             .precondition( "{{sync.dhis2.events.to.rapidpro.flows}}" )
-            .to( "direct:fetchAndProcessEvents" );
+            .to( "direct:queueEvents" );
 
-        from( "direct:fetchAndProcessEvents" )
-            .routeId( "Fetch And Process Tracker Events" )
+        from("direct:queueEvents")
+            .routeId( "Queue Program Stage Events" )
             .to( "direct:fetchDueEvents" )
             .split( simple( "${exchangeProperty.dueEvents}" ) )
-                .marshal().json().transform().body( String.class )
                 .setProperty( "eventPayload", simple( "${body}" ) )
-                .unmarshal().json()
                 .to( "direct:fetchAttributes" )
-                .to("direct:createRapidProContact");
+                .transform( datasonnet( "resource:classpath:event.ds", String.class, "application/x-java-object", "application/json" ) )
+                .to( "jms:queue:events?exchangePattern=InOnly" )
+                .unmarshal().json()
+                .log( LoggingLevel.DEBUG, LOGGER, "Enqueued event [eventId => ${body[event]}, programStage => ${body[programStage]}]" )
+                .to( "direct:updateDhisProgramStageEventStatus" );
 
         from( "direct:fetchDueEvents" )
             .routeId( "Fetch Due Events" )
@@ -124,29 +120,16 @@ protected void doConfigure()
                 .log( LoggingLevel.ERROR, LOGGER, "Error while fetching phone number attribute from DHIS2 enrollment ${body[enrollment]}. Hint: Be sure to set the 'dhis2.phone.number.attribute.code' config property." )
                 .stop();
 
-        from( "direct:createRapidProContact" )
-            .routeId( "Create RapidPro Contact" )
-            .log( LoggingLevel.INFO, LOGGER, "Creating RapidPro contact for DHIS2 enrollment ${body[enrollment]}" )
-            .setHeader( "Authorization", constant( "Token {{rapidpro.api.token}}" ) )
-            .enrich().simple( "{{rapidpro.api.url}}/contacts.json?urn=${body[contactUrn]}&httpMethod=GET" )
-            .aggregationStrategy( rapidProContactEnricherAggrStrategy )
-            .setProperty( "originalPayload", simple( "${body}") )
-            .choice()
-                .when( simple ("${body[results].size()} > 0" ) )
-                    .log( LoggingLevel.DEBUG, LOGGER, "RapidPro Contact already exists for DHIS2 enrollment ${exchangeProperty.originalPayload[enrollment]}. No action needed." )
-                .otherwise()
-                    .log( LoggingLevel.DEBUG, LOGGER, "RapidPro Contact does not exist for DHIS2 enrollment ${exchangeProperty.originalPayload[enrollment]}. Creating new contact...")
-                    .transform(
-                        datasonnet( "resource:classpath:trackedEntityContact.ds", Map.class, "application/x-java-object",
-                            "application/x-java-object" ) )
-                    .setHeader( "Authorization", constant( "Token {{rapidpro.api.token}}" ) )
-                    .marshal().json().convertBodyTo( String.class )
-                    .toD( "{{rapidpro.api.url}}/contacts.json?httpMethod=POST&okStatusCodeRange=200-499" )
-                    .choice().when( header( Exchange.HTTP_RESPONSE_CODE ).isNotEqualTo( "201" ) )
-                        .log( LoggingLevel.WARN, LOGGER, "Unexpected status code when creating RapidPro contact for DHIS2 enrollment ${exchangeProperty.originalPayload[enrollment]} => HTTP ${header.CamelHttpResponseCode}. HTTP response body => ${body}" )
-                    .end()
-                .end()
-            .setBody( simple( "${exchangeProperty.originalPayload}" ) )
+        from("direct:updateDhisProgramStageEventStatus")
+            .routeId( "Update DHIS Program Stage Event Status" )
+            .process( eventStatusUpdateProcessor )
+            .marshal().json().convertBodyTo( String.class )
+            .toD( "dhis2://post/resource?path=tracker&inBody=resource&client=#dhis2Client" )
+            .unmarshal().json()
+            .choice().when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) )
+                .log( LoggingLevel.DEBUG, LOGGER, "Successfully updated DHIS program stage event status for event with ID => ${exchangeProperty.eventPayload['event']}" )
+            .otherwise()
+                .log( LoggingLevel.ERROR, LOGGER, "Unexpected status code when updating the dhis program stage event status for event with ID => ${exchangeProperty.eventPayload['event']}. HTTP ${header.CamelHttpResponseCode}. HTTP response body => ${body}" )
             .end();
     }
 }
diff --git a/src/main/resources/event.ds b/src/main/resources/event.ds
new file mode 100644
index 00000000..319e52ca
--- /dev/null
+++ b/src/main/resources/event.ds
@@ -0,0 +1,7 @@
+{
+  event: body.event,
+  enrollment: body.enrollment,
+  programStage: body.programStage,
+  contactUrn: body.contactUrn,
+  givenName: body.givenName
+}
diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/FetchScheduledTrackerEventsRouteBuilderFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java
similarity index 85%
rename from src/test/java/org/hisp/dhis/integration/rapidpro/route/FetchScheduledTrackerEventsRouteBuilderFunctionalTestCase.java
rename to src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java
index c9783814..d5d8f144 100644
--- a/src/test/java/org/hisp/dhis/integration/rapidpro/route/FetchScheduledTrackerEventsRouteBuilderFunctionalTestCase.java
+++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java
@@ -31,7 +31,6 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.AdviceWith;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.CamelLogger;
@@ -51,12 +50,10 @@
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-import static io.restassured.RestAssured.given;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hisp.dhis.integration.rapidpro.Environment.DHIS2_CLIENT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class FetchScheduledTrackerEventsRouteBuilderFunctionalTestCase extends AbstractFunctionalTestCase
+public class QueueProgramStageEventsRouteBuilderFunctionalTestCase extends AbstractFunctionalTestCase
 {
     @Autowired
     protected ProgramStageToFlowMap programStageToFlowMap;
@@ -234,8 +231,8 @@ public void testAttributesEndpointWhenTypeAttributes()
         camelContext.start();
 
         Map<String, Object> body = new HashMap<>();
-        body.put( "enrollment", enrollmentId );
         body.put( "contactUrn", "1234" );
+        body.put( "enrollment", enrollmentId );
         producerTemplate.sendBody( "direct:fetchAttributes", ExchangePattern.InOut, body );
         spyEndpoint.assertIsSatisfied();
         Exchange exchange = spyEndpoint.getExchanges().get( 0 );
@@ -359,74 +356,88 @@ public void testFetchAttributesLogsErrorWhenInvalidPhoneNumberCode()
     }
 
     @Test
-    public void testCreateRapidProContactGivenValidUrn()
+    public void testEventTransformationReturnsExpectedValues()
         throws
-        IOException
+        Exception
     {
-        assertRapidProPreCondition();
+        String phoneNumber = "12345678";
+        String givenName = "John";
+        String programStage = "ZP5HZ87wzc0";
+        String enrollmentId = Environment.createDhis2TrackedEntityWithEnrollment( Environment.ORG_UNIT_ID, phoneNumber,
+            "ID-1", givenName, List.of( programStage ) );
+        AdviceWith.adviceWith( camelContext, "Queue Program Stage Events",
+            r -> r.interceptSendToEndpoint( "jms:queue:events?exchangePattern=InOnly" ).to( "mock:spy" ).stop() );
+        MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class );
+        spyEndpoint.setExpectedCount( 1 );
+
         camelContext.start();
-        Map<String, Object> body = new HashMap<>();
-        body.put( "contactUrn", "whatsapp:12345678" );
-        producerTemplate.sendBody( "direct:createRapidProContact", body );
-        given( RAPIDPRO_API_REQUEST_SPEC ).get( "contacts.json" ).then()
-            .body( "results.size()", equalTo( 1 ) )
-            .body( "results[0].urns[0]", equalTo( "whatsapp:12345678" ) );
+        producerTemplate.sendBody( "direct:queueEvents", ExchangePattern.InOut, null );
+        spyEndpoint.assertIsSatisfied( 10000 );
+        Exchange exchange = spyEndpoint.getExchanges().get( 0 );
+        Map<String, Object> body = objectMapper.readValue( exchange.getMessage().getBody( String.class ), Map.class );
+        String expectedContactUrn = "whatsapp:" + phoneNumber;
+        assertEquals( expectedContactUrn, body.get( "contactUrn" ) );
+        assertEquals( givenName, body.get( "givenName" ) );
+        assertEquals( enrollmentId, body.get( "enrollment" ) );
+        assertEquals( programStage, body.get( "programStage" ) );
     }
 
     @Test
-    public void testCreateRapidProContactFailsGivenInvalidUrn()
+    public void testUpdateDhisProgramStageEventStatusWithValidEventId()
         throws
         Exception
     {
-        assertRapidProPreCondition();
+        Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 1, List.of( "ZP5HZ87wzc0" ) );
+        Thread.sleep( 1000 );
+        Map<String, Object> payload = (Map<String, Object>) DHIS2_CLIENT.get( "tracker/events" )
+            .transfer()
+            .returnAs( Map.class );
+        List<Map<String, Object>> instances = (List<Map<String, Object>>) payload.get( "instances" );
+        Map<String, Object> event = instances.get( 0 );
         CountDownLatch expectedLogMessage = new CountDownLatch( 1 );
         ((SpringBootCamelContext) camelContext)
             .addLogListener( ( Exchange exchange, CamelLogger camelLogger, String message ) -> {
-                if ( camelLogger.getLevel().name().equals( "WARN" ) && message.startsWith(
-                    "Unexpected status code when creating RapidPro contact for " ) )
+                if ( camelLogger.getLevel().name().equals( "DEBUG" ) && message.startsWith(
+                    String.format( "Successfully updated DHIS program stage event status for event with ID => %s",
+                        event.get( "event" ) ) ) )
                 {
                     expectedLogMessage.countDown();
                 }
                 return message;
             } );
+
         camelContext.start();
-        Map<String, Object> body = new HashMap<>();
-        body.put( "contactUrn", "whatsapp:invalid" );
-        producerTemplate.sendBody( "direct:createRapidProContact", body );
+        producerTemplate.sendBodyAndProperty( "direct:updateDhisProgramStageEventStatus", null, "eventPayload", event );
+        Thread.sleep( 1000 );
         assertEquals( 0, expectedLogMessage.getCount() );
     }
 
     @Test
-    public void testCreateRapidProContactWhenContactAlreadyExists()
+    public void testQueueEvents()
+        throws
+        Exception
     {
-        assertRapidProPreCondition();
-        CountDownLatch expectedLogMessage = new CountDownLatch( 2 );
+        Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 10, List.of( "ZP5HZ87wzc0" ) );
+        CountDownLatch expectedLogMessage = new CountDownLatch( 10 );
         ((SpringBootCamelContext) camelContext)
             .addLogListener( ( Exchange exchange, CamelLogger camelLogger, String message ) -> {
                 if ( camelLogger.getLevel().name().equals( "DEBUG" ) && message.startsWith(
-                    "RapidPro Contact already exists for DHIS2" ) )
+                    "Successfully updated DHIS program stage event status for event with ID =>" ) )
                 {
                     expectedLogMessage.countDown();
                 }
                 return message;
             } );
-        camelContext.start();
-        Map<String, Object> body = new HashMap<>();
-        body.put( "contactUrn", "whatsapp:12345678" );
-        body.put( "enrollment", "enrollment-id" );
-        producerTemplate.sendBody( "direct:createRapidProContact", body );
-        assertEquals( 2, expectedLogMessage.getCount() );
-        producerTemplate.sendBody( "direct:createRapidProContact", body );
-        assertEquals( 1, expectedLogMessage.getCount() );
-        given( RAPIDPRO_API_REQUEST_SPEC ).get( "contacts.json" ).then()
-            .body( "results.size()", equalTo( 1 ) );
-    }
 
-    private void assertRapidProPreCondition()
-    {
-        given( RAPIDPRO_API_REQUEST_SPEC ).get( "contacts.json" ).then()
-            .body( "results.size()", equalTo( 0 ) );
-    }
+        AdviceWith.adviceWith( camelContext, "Queue Program Stage Events",
+            r -> r.interceptSendToEndpoint( "jms:queue:events?exchangePattern=InOnly" ).to( "mock:spy" ) );
+        MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class );
+        spyEndpoint.setExpectedCount( 10 );
 
+        camelContext.start();
+        producerTemplate.sendBody( "direct:queueEvents", ExchangePattern.InOut, null );
+        spyEndpoint.assertIsSatisfied( 10000 );
+        assertEquals( 0, expectedLogMessage.getCount() );
+    }
 
 }