Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit mqtt metrics #106

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
import com.aws.greengrass.clientdevices.auth.exception.AuthorizationException;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter;
import com.aws.greengrass.mqtt.moquette.metrics.MqttMetrics;
import com.aws.greengrass.telemetry.impl.Metric;
import com.aws.greengrass.telemetry.models.TelemetryAggregation;
import com.aws.greengrass.telemetry.models.TelemetryUnit;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.broker.subscriptions.Topic;
Expand All @@ -19,6 +24,7 @@
import io.moquette.interception.messages.InterceptConnectionLostMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -31,15 +37,19 @@ public class ClientDeviceAuthorizer implements IAuthenticator, IAuthorizatorPoli
private static final String MQTT_CREDENTIAL = "mqtt";

private final ClientDevicesAuthServiceApi clientDevicesAuthService;
private final MoquetteMqttMetricsEmmitter metricsEmitter;
private final Map<String, UserSessionPair> clientToSessionMap;

/**
* Constructor.
*
* @param clientDevicesAuthService Client devices auth service handle
* @param metricsEmmitter MQTT metrics emitter
*/
public ClientDeviceAuthorizer(ClientDevicesAuthServiceApi clientDevicesAuthService) {
public ClientDeviceAuthorizer(ClientDevicesAuthServiceApi clientDevicesAuthService,
MoquetteMqttMetricsEmmitter metricsEmmitter) {
this.clientDevicesAuthService = clientDevicesAuthService;
this.metricsEmitter = metricsEmmitter;
this.clientToSessionMap = new ConcurrentHashMap<>();
}

Expand Down Expand Up @@ -80,6 +90,7 @@ public boolean checkValid(String clientId, String username, byte[] password) {
} else {
LOG.atWarn().kv(CLIENT_ID, clientId).kv(SESSION_ID, sessionId).log("Device isn't authorized to connect");
clientDevicesAuthService.closeClientDeviceAuthSession(sessionId);
emitAuthErrorMetric("mqtt:connect");
}

return canConnect;
Expand All @@ -89,27 +100,39 @@ public boolean checkValid(String clientId, String username, byte[] password) {
public boolean canWrite(Topic topic, String user, String client) {
String resource = "mqtt:topic:" + topic;
boolean canPerform = false;
String publishOp = "mqtt:publish";
try {
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), "mqtt:publish", resource);
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), publishOp, resource);
} catch (AuthenticationException e) {
LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client.");
emitAuthErrorMetric(publishOp);
}
LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client)
.log("MQTT publish request");

if (!canPerform) {
emitAuthErrorMetric(publishOp);
}
return canPerform;
}

@Override
public boolean canRead(Topic topic, String user, String client) {
String resource = "mqtt:topicfilter:" + topic;
boolean canPerform = false;
String subscribeOp = "mqtt:subscribe";
try {
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), "mqtt:subscribe", resource);
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), subscribeOp, resource);
} catch (AuthenticationException e) {
LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client.");
emitAuthErrorMetric(subscribeOp);
vaibhavmurkute marked this conversation as resolved.
Show resolved Hide resolved
}
LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client)
.log("MQTT subscribe request");

if (!canPerform) {
emitAuthErrorMetric(subscribeOp);
}
return canPerform;
}

Expand Down Expand Up @@ -201,4 +224,37 @@ private void closeAuthSession(String clientId, String username) {
}
}
}

/**
* Emits MQTT AuthError metrics for requested MQTT operation.
* Ideally these metrics should be emitted from the Moquette request handlers.
* Emitting metrics from Authorizer integration to avoid broker code change.
*
* @param operation Requested MQTT operation
*/
private void emitAuthErrorMetric(String operation) {
String authErrorMetric;
switch (operation) {
case "mqtt:connect":
vaibhavmurkute marked this conversation as resolved.
Show resolved Hide resolved
authErrorMetric = MqttMetrics.CONNECT_AUTH_ERROR;
break;
case "mqtt:publish" :
authErrorMetric = MqttMetrics.PUBLISH_IN_AUTH_ERROR;
break;
case "mqtt:subscribe" :
authErrorMetric = MqttMetrics.SUBSCRIBE_AUTH_ERROR;
break;
default:
authErrorMetric = MqttMetrics.UNKNOWN_AUTH_ERROR;
break;
}
metricsEmitter.emitMetric(Metric.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably do not want to immediately emit a metric, this can be expensive. We may want to instead batch up a number of them and then emit the sum ourselves on a timer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to schedule batched metric emit

.namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's talk to cloud team about the namespaces. We may want to share a namespace for any MQTT broker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IoT Core Message broker metrics are under "MQTT" Protocol dimension in "AWS/IoT" namespace. Does it make sense to keep generic "LocalMQTT" namespace housing metrics for our local-mqtt broker?

.name(authErrorMetric)
.unit(TelemetryUnit.Count)
.aggregation(TelemetryAggregation.Sum)
.value(1)
.timestamp(Instant.now().toEpochMilli())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.lifecyclemanager.PluginService;
import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter;
import com.aws.greengrass.util.Coerce;
import io.moquette.BrokerConstants;
import io.moquette.broker.ISslContextCreator;
Expand All @@ -27,7 +28,7 @@

import java.io.IOException;
import java.security.KeyStoreException;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import javax.inject.Inject;
Expand All @@ -43,6 +44,7 @@ public class MQTTService extends PluginService {
private final Kernel kernel;
private final ClientDeviceTrustManager clientDeviceTrustManager;
private final ClientDeviceAuthorizer clientDeviceAuthorizer;
private final MoquetteMqttMetricsEmmitter mqttMetricsEmmitter;
private final List<InterceptHandler> interceptHandlers;
private final ClientDevicesAuthServiceApi clientDevicesAuthServiceApi;
private final GetCertificateRequest serverCertificateRequest;
Expand All @@ -62,8 +64,10 @@ public MQTTService(Topics topics, Kernel kernel, ClientDevicesAuthServiceApi cli
super(topics);
this.kernel = kernel;
this.clientDeviceTrustManager = new ClientDeviceTrustManager(clientDevicesAuthService);
this.clientDeviceAuthorizer = new ClientDeviceAuthorizer(clientDevicesAuthService);
this.interceptHandlers = Collections.singletonList(clientDeviceAuthorizer.new ConnectionTerminationListener());
this.mqttMetricsEmmitter = new MoquetteMqttMetricsEmmitter();
this.clientDeviceAuthorizer = new ClientDeviceAuthorizer(clientDevicesAuthService, mqttMetricsEmmitter);
this.interceptHandlers = Arrays.asList(clientDeviceAuthorizer.new ConnectionTerminationListener(),
mqttMetricsEmmitter. new MqttMetricsCaptor());
this.clientDevicesAuthServiceApi = clientDevicesAuthService;

GetCertificateRequestOptions options = new GetCertificateRequestOptions();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.mqtt.moquette.metrics;

import com.aws.greengrass.telemetry.impl.Metric;
import com.aws.greengrass.telemetry.impl.MetricFactory;
import com.aws.greengrass.telemetry.models.TelemetryAggregation;
import com.aws.greengrass.telemetry.models.TelemetryUnit;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.moquette.interception.messages.InterceptSubscribeMessage;
import io.moquette.interception.messages.InterceptUnsubscribeMessage;

import java.time.Instant;

public class MoquetteMqttMetricsEmmitter {

private final MetricFactory metricFactory = new MetricFactory(MqttMetrics.MOQUETTE_MQTT_NAMESPACE);

/**
* Emits Moquette MQTT metrics.
*
* @param metric {@link Metric} to be emitted in Moquette MQTT namespace
*/
public void emitMetric(Metric metric) {
metricFactory.putMetricData(metric);
}

public class MqttMetricsCaptor extends AbstractInterceptHandler implements InterceptHandler {

@Override
public String getID() {
return "MoquetteMqttMetricsCaptor";
}

@Override
public void onConnect(InterceptConnectMessage msg) {
emitMetric(Metric.builder()
.namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE)
.name(MqttMetrics.CONNECT_SUCCESS)
.unit(TelemetryUnit.Count)
.aggregation(TelemetryAggregation.Sum)
.value(1)
.timestamp(Instant.now().toEpochMilli())
.build());
}

@Override
public void onDisconnect(InterceptDisconnectMessage msg) {
emitMetric(Metric.builder()
.namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE)
.name(MqttMetrics.DISCONNECT)
.unit(TelemetryUnit.Count)
.aggregation(TelemetryAggregation.Sum)
.value(1)
.timestamp(Instant.now().toEpochMilli())
.build());
}

@Override
public void onPublish(InterceptPublishMessage msg) {
emitMetric(Metric.builder()
.namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE)
.name(MqttMetrics.PUBLISH_OUT_SUCCESS)
.unit(TelemetryUnit.Count)
.aggregation(TelemetryAggregation.Sum)
.value(1)
.timestamp(Instant.now().toEpochMilli())
.build());
}

@Override
public void onSubscribe(InterceptSubscribeMessage msg) {
emitMetric(Metric.builder()
.namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE)
.name(MqttMetrics.SUBSCRIBE_SUCCESS)
.unit(TelemetryUnit.Count)
.aggregation(TelemetryAggregation.Sum)
.value(1)
.timestamp(Instant.now().toEpochMilli())
.build());
}

@Override
public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
emitMetric(Metric.builder()
.namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE)
.name(MqttMetrics.UNSUBSCRIBE)
.unit(TelemetryUnit.Count)
.aggregation(TelemetryAggregation.Sum)
.value(1)
.timestamp(Instant.now().toEpochMilli())
.build());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.mqtt.moquette.metrics;


public final class MqttMetrics {
public static final String MOQUETTE_MQTT_NAMESPACE = "MoquetteMqtt";
public static final String CONNECT_SUCCESS = "Connect.Success";
public static final String CONNECT_AUTH_ERROR = "Connect.AuthError";
public static final String SUBSCRIBE_SUCCESS = "Subscribe.Success";
public static final String SUBSCRIBE_AUTH_ERROR = "Subscribe.AuthError";
public static final String PUBLISH_OUT_SUCCESS = "PublishOut.Success";
public static final String PUBLISH_IN_AUTH_ERROR = "PublishIn.AuthError";
public static final String DISCONNECT = "Disconnect";
public static final String UNSUBSCRIBE = "Unsubscribe";
public static final String UNKNOWN_AUTH_ERROR = "UnknownAuthError";
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.aws.greengrass.clientdevices.auth.api.ClientDevicesAuthServiceApi;
import com.aws.greengrass.clientdevices.auth.exception.AuthenticationException;
import com.aws.greengrass.clientdevices.auth.exception.AuthorizationException;
import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import com.aws.greengrass.testcommons.testutilities.GGServiceTestUtil;
import io.moquette.broker.subscriptions.Topic;
Expand All @@ -31,6 +32,8 @@
public class ClientDeviceAuthorizerTest extends GGServiceTestUtil {
@Mock
ClientDevicesAuthServiceApi mockClientDevicesAuthService;
@Mock
MoquetteMqttMetricsEmmitter mockMqttMetricsEmmitter;

private static final String DEFAULT_SESSION = "SESSION_ID";
private static final String DEFAULT_CLIENT = "clientId";
Expand Down Expand Up @@ -72,13 +75,13 @@ void configureSubscribeResponse(boolean doAllow) throws AuthorizationException {

@Test
void GIVEN_clientDataWithoutCertificate_WHEN_checkValid_THEN_returnsFalse() {
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);
assertThat(authorizer.checkValid(DEFAULT_CLIENT, EMPTY_PEER_CERT, DEFAULT_PASSWORD), is(false));
}

@Test
void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalseAndClosesSession() throws Exception {
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION);
configureConnectResponse(false);
Expand All @@ -92,7 +95,7 @@ void GIVEN_duplicateClientIds_WHEN_checkValid_THEN_firstSessionClosed() throws A
AuthorizationException {
final String USERNAME1 = "PeerCert1";
final String USERNAME2 = "PeerCert2";
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn("SESSION1");
configureConnectResponse("SESSION1", DEFAULT_CLIENT, true);
Expand All @@ -115,7 +118,7 @@ void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalse(ExtensionContext
throws AuthenticationException {
ignoreExceptionOfType(context, AuthenticationException.class);

ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenThrow(
new AuthenticationException("Invalid client"));
Expand All @@ -125,7 +128,7 @@ void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalse(ExtensionContext

@Test
void GIVEN_authorizedClient_WHEN_checkValid_THEN_returnsTrue() throws Exception {
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION);
configureConnectResponse(true);
Expand All @@ -138,7 +141,7 @@ void GIVEN_unknownClient_WHEN_canReadCanWrite_THEN_returnsFalse(ExtensionContext
throws AuthenticationException {
ignoreExceptionOfType(context, AuthenticationException.class);

ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenThrow(
new AuthenticationException("Invalid client"));
Expand All @@ -149,7 +152,7 @@ void GIVEN_unknownClient_WHEN_canReadCanWrite_THEN_returnsFalse(ExtensionContext

@Test
void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsFalse() throws AuthenticationException, AuthorizationException {
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION);
configureConnectResponse(true);
Expand All @@ -163,7 +166,7 @@ void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsFalse() throws Au

@Test
void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsTrue() throws AuthenticationException, AuthorizationException {
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION);
configureConnectResponse(true);
Expand All @@ -178,7 +181,7 @@ void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsTrue() throws Aut
@Test
void GIVEN_twoClientsWithDifferingPermissions_WHEN_canReadCanWrite_THEN_correctSessionIsUsed()
throws AuthenticationException, AuthorizationException {
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);
String session1 = "SESSION_ID1";
String session2 = "SESSION_ID2";
String client1 = "clientId1";
Expand Down Expand Up @@ -220,7 +223,7 @@ void GIVEN_twoClientsWithDifferingPermissions_WHEN_canReadCanWrite_THEN_correctS
@Test
void GIVEN_authorizedClient_WHEN_onDisconnect_THEN_closeAuthSession() throws AuthenticationException,
AuthorizationException {
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService);
ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter);

when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION);
configureConnectResponse(true);
Expand Down