Skip to content

Commit

Permalink
fix: add check in Telemetry agent for timestamp during publish (#1270)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikkhilmuthye authored Jul 25, 2022
1 parent c73fca8 commit 06026ee
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
36 changes: 26 additions & 10 deletions src/main/java/com/aws/greengrass/telemetry/TelemetryAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,35 @@ void aggregatePeriodicMetrics() {
/**
* Helper for metrics uploader. Also used in tests.
*/
@SuppressWarnings("PMD.AvoidCatchingThrowable")
void publishPeriodicMetrics() {
if (!isConnected.get()) {
logger.atDebug().log("Cannot publish the metrics. MQTT connection interrupted.");
return;
}
Map<Long, List<AggregatedNamespaceData>> metricsToPublishMap = null;
long timestamp = Instant.now().toEpochMilli();
long lastPublish = Coerce.toLong(getPeriodicPublishTimeTopic());
Map<Long, List<AggregatedNamespaceData>> metricsToPublishMap =
metricsAggregator.getMetricsToPublish(lastPublish, timestamp);
getPeriodicPublishTimeTopic().withValue(timestamp);
// TODO: [P41214679] Do not publish if the metrics are empty.
publisher.publish(MetricsPayload.builder().build(), metricsToPublishMap.get(timestamp));
logger.atInfo().event("telemetry-metrics-published").log("Telemetry metrics update published.");
try {
if (!isConnected.get()) {
logger.atDebug().log("Cannot publish the metrics. MQTT connection interrupted");
return;
}
metricsToPublishMap = metricsAggregator.getMetricsToPublish(lastPublish, timestamp);
getPeriodicPublishTimeTopic().withValue(timestamp);
if (metricsToPublishMap != null && metricsToPublishMap.containsKey(timestamp)) {
publisher.publish(MetricsPayload.builder().build(), metricsToPublishMap.get(timestamp));
logger.atInfo().event("telemetry-metrics-published").log("Telemetry metrics update published");
}
} catch (Throwable t) {
logger.atWarn().log("Error collecting telemetry. Will retry", t);
return;
}
try {
getPeriodicPublishTimeTopic().withValue(timestamp);
if (metricsToPublishMap != null && metricsToPublishMap.containsKey(timestamp)) {
publisher.publish(MetricsPayload.builder().build(), metricsToPublishMap.get(timestamp));
logger.atInfo().event("telemetry-metrics-published").log("Telemetry metrics update published");
}
} catch (Throwable t) {
logger.atWarn().log("Error publishing telemetry. Will retry", t);
}
}

private Topic getPeriodicPublishTimeTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -54,6 +55,7 @@
import static com.aws.greengrass.telemetry.TelemetryAgent.TELEMETRY_TEST_PERIODIC_AGGREGATE_INTERVAL_SEC;
import static com.aws.greengrass.telemetry.TelemetryAgent.TELEMETRY_TEST_PERIODIC_PUBLISH_INTERVAL_SEC;
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionUltimateCauseOfType;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -248,4 +250,10 @@ void GIVEN_Telemetry_Agent_WHEN_mqtt_is_interrupted_THEN_aggregation_continues_b
// aggregation is continued irrespective of the mqtt connection
verify(ma, timeout(timeoutMs).atLeastOnce()).aggregateMetrics(anyLong(), anyLong());
}

@Test
void GIVEN_no_metrics_to_publish_WHEN_publish_THEN_finishes_without_exception() {
when(ma.getMetricsToPublish(anyLong(), anyLong())).thenReturn(Collections.emptyMap());
assertDoesNotThrow(() -> telemetryAgent.publishPeriodicMetrics());
}
}

0 comments on commit 06026ee

Please sign in to comment.