Skip to content

Commit fdb7b0f

Browse files
committed
chore: reapply GG changes to moquette 0.17
1 parent 657f725 commit fdb7b0f

32 files changed

+420
-313
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
greengrass-build/
22
integration/target/**
3+
moquette_messages.log
4+
target/
5+
.gradle/
36
*.iml

integration/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
<dependency>
8989
<groupId>io.moquette</groupId>
9090
<artifactId>moquette-broker</artifactId>
91-
<version>0.16-gg</version>
91+
<version>0.17-gg</version>
9292
</dependency>
9393
</dependencies>
9494
<build>

integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ public void onConnectionLost(InterceptConnectionLostMessage msg) {
218218
closeAuthSession(msg.getClientID(), msg.getUsername());
219219
}
220220

221+
@Override
222+
public void onSessionLoopError(Throwable error) {
223+
LOG.atWarn().log("Moquette session error", error);
224+
}
225+
221226
private void closeAuthSession(String clientId, String username) {
222227
UserSessionPair sessionPair = getSessionForClient(clientId, username);
223228
if (sessionPair != null) {

integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,14 @@ private synchronized void startWithProperties(Properties properties, boolean for
143143
IConfig config = new MemoryConfig(properties);
144144
ISslContextCreator sslContextCreator =
145145
new GreengrassMoquetteSslContextCreator(config, clientDeviceTrustManager);
146-
mqttBroker.startServer(config, interceptHandlers, sslContextCreator, clientDeviceAuthorizer,
147-
clientDeviceAuthorizer);
146+
try {
147+
mqttBroker.startServer(config, interceptHandlers, sslContextCreator, clientDeviceAuthorizer,
148+
clientDeviceAuthorizer);
149+
} catch (IOException e) {
150+
// IO Exception can only be thrown from H2 right now and we do not configure moquette to use h2.
151+
serviceErrored(e);
152+
return;
153+
}
148154
serverRunning = true;
149155
runningProperties = properties;
150156
}
@@ -182,6 +188,9 @@ private Properties getProperties() {
182188
//Disable plain TCP port
183189
p.setProperty(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
184190

191+
// Telemetry is actually deleted from the code base, but just set the flag here to be sure.
192+
p.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
193+
185194
return p;
186195
}
187196
}

moquette-0.17/broker/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<relativePath>../pom.xml</relativePath>
66
<artifactId>moquette-parent</artifactId>
77
<groupId>io.moquette</groupId>
8-
<version>0.17</version>
8+
<version>0.17-gg</version>
99
</parent>
1010

1111
<artifactId>moquette-broker</artifactId>
@@ -14,12 +14,12 @@
1414

1515
<properties>
1616
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
17-
<netty.version>4.1.93.Final</netty.version>
17+
<netty.version>4.1.94.Final</netty.version>
1818
<!-- Check Netty pom.xm to know the right version of tcnative, for example
1919
https://github.com/netty/netty/blob/netty-4.1.93.Final/pom.xml#L625 -->
2020
<netty.tcnative.version>2.0.61.Final</netty.tcnative.version>
2121
<paho.version>1.2.5</paho.version>
22-
<h2.version>2.1.212</h2.version>
22+
<h2.version>2.2.220</h2.version>
2323
</properties>
2424

2525
<dependencies>

moquette-0.17/broker/src/main/java/io/moquette/BrokerConstants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public final class BrokerConstants {
7575
public static final String KEY_MANAGER_PASSWORD_PROPERTY_NAME = IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME;
7676
@Deprecated
7777
public static final String ALLOW_ANONYMOUS_PROPERTY_NAME = IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME;
78+
public static final String PEER_CERTIFICATE_AS_USERNAME = "peer_certificate_as_username";
7879
public static final String REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT = "reauthorize_subscriptions_on_connect";
7980
public static final String ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME = "allow_zero_byte_client_id";
8081
@Deprecated
@@ -103,6 +104,7 @@ public final class BrokerConstants {
103104
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = IConfig.NETTY_MAX_BYTES_PROPERTY_NAME;
104105
@Deprecated
105106
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE;
107+
public static final String NETTY_ENABLED_TLS_PROTOCOLS_PROPERTY_NAME = "netty.enabled.tls.protocols";
106108
/**
107109
* @deprecated use the BUFFER_FLUSH_MS_PROPERTY_NAME
108110
* */
@@ -126,6 +128,11 @@ public final class BrokerConstants {
126128

127129
public static final String STORAGE_CLASS_NAME = "storage_class";
128130

131+
public static final String NETTY_CHANNEL_WRITE_LIMIT_PROPERTY_NAME = "netty.channel.write.limit";
132+
public static final int DEFAULT_NETTY_CHANNEL_WRITE_LIMIT_BYTES = 512 * 1024;
133+
public static final String NETTY_CHANNEL_READ_LIMIT_PROPERTY_NAME = "netty.channel.read.limit";
134+
public static final int DEFAULT_NETTY_CHANNEL_READ_LIMIT_BYTES = 512 * 1024;
135+
129136
@Deprecated
130137
public static final String PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME = IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME;
131138

moquette-0.17/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ class BrokerConfiguration {
2626
private final boolean allowZeroByteClientId;
2727
private final boolean reauthorizeSubscriptionsOnConnect;
2828
private final int bufferFlushMillis;
29+
private final boolean peerCertificateAsUsername;
2930

3031
BrokerConfiguration(IConfig props) {
3132
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
3233
allowZeroByteClientId = props.boolProp(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, false);
3334
reauthorizeSubscriptionsOnConnect = props.boolProp(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, false);
35+
peerCertificateAsUsername = props.boolProp(BrokerConstants.PEER_CERTIFICATE_AS_USERNAME, false);
3436

3537
// BUFFER_FLUSH_MS_PROPERTY_NAME has precedence over the deprecated IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME
3638
final String bufferFlushMillisProp = props.getProperty(BrokerConstants.BUFFER_FLUSH_MS_PROPERTY_NAME);
@@ -65,10 +67,17 @@ class BrokerConfiguration {
6567

6668
public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
6769
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis) {
70+
this(allowAnonymous, allowZeroByteClientId, reauthorizeSubscriptionsOnConnect, bufferFlushMillis, false);
71+
}
72+
73+
public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
74+
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis,
75+
boolean peerCertificateAsUsername) {
6876
this.allowAnonymous = allowAnonymous;
6977
this.allowZeroByteClientId = allowZeroByteClientId;
7078
this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect;
7179
this.bufferFlushMillis = bufferFlushMillis;
80+
this.peerCertificateAsUsername = peerCertificateAsUsername;
7281
}
7382

7483
public boolean isAllowAnonymous() {
@@ -86,4 +95,8 @@ public boolean isReauthorizeSubscriptionsOnConnect() {
8695
public int getBufferFlushMillis() {
8796
return bufferFlushMillis;
8897
}
98+
99+
public boolean isPeerCertificateAsUsername() {
100+
return peerCertificateAsUsername;
101+
}
89102
}

moquette-0.17/broker/src/main/java/io/moquette/broker/DefaultMoquetteSslContextCreator.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@
1616

1717
package io.moquette.broker;
1818

19+
import io.moquette.BrokerConstants;
20+
import io.moquette.broker.config.IConfig;
21+
import io.netty.handler.ssl.ClientAuth;
22+
import io.netty.handler.ssl.SslContext;
23+
import io.netty.handler.ssl.SslContextBuilder;
24+
import io.netty.handler.ssl.SslProvider;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
1928
import java.io.File;
2029
import java.io.FileInputStream;
2130
import java.io.FileNotFoundException;
@@ -32,17 +41,8 @@
3241
import java.security.cert.X509Certificate;
3342
import java.util.Collections;
3443
import java.util.Objects;
35-
36-
import io.moquette.BrokerConstants;
37-
import io.moquette.broker.config.IConfig;
38-
import io.netty.handler.ssl.ClientAuth;
39-
import io.netty.handler.ssl.SslContext;
40-
import io.netty.handler.ssl.SslContextBuilder;
41-
import io.netty.handler.ssl.SslProvider;
4244
import javax.net.ssl.KeyManagerFactory;
4345
import javax.net.ssl.TrustManagerFactory;
44-
import org.slf4j.Logger;
45-
import org.slf4j.LoggerFactory;
4646

4747
/**
4848
* Moquette integration implementation to load SSL certificate from local filesystem path configured in
@@ -89,6 +89,14 @@ public SslContext initSSLContext() {
8989
if (Boolean.valueOf(sNeedsClientAuth)) {
9090
addClientAuthentication(ks, contextBuilder);
9191
}
92+
93+
// if enabled tls protocols are not provided, we use the default
94+
String enabledTLSProtocols = props.getProperty(BrokerConstants.NETTY_ENABLED_TLS_PROTOCOLS_PROPERTY_NAME);
95+
if (enabledTLSProtocols != null) {
96+
LOG.info(String.format("Enabled TLS Protocols: {%s}", enabledTLSProtocols));
97+
contextBuilder.protocols(enabledTLSProtocols.split(";"));
98+
}
99+
92100
contextBuilder.sslProvider(sslProvider);
93101
SslContext sslContext = contextBuilder.build();
94102
LOG.info("The SSL context has been initialized successfully.");

moquette-0.17/broker/src/main/java/io/moquette/broker/MQTTConnection.java

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,59 @@
1616
package io.moquette.broker;
1717

1818
import io.moquette.BrokerConstants;
19-
import io.moquette.broker.subscriptions.Topic;
2019
import io.moquette.broker.security.IAuthenticator;
20+
import io.moquette.broker.security.PemUtils;
21+
import io.moquette.broker.subscriptions.Topic;
2122
import io.netty.buffer.ByteBuf;
2223
import io.netty.buffer.ByteBufHolder;
2324
import io.netty.channel.Channel;
2425
import io.netty.channel.ChannelFuture;
2526
import io.netty.channel.ChannelFutureListener;
2627
import io.netty.channel.ChannelPipeline;
27-
import io.netty.handler.codec.mqtt.*;
28+
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
29+
import io.netty.handler.codec.mqtt.MqttConnectMessage;
30+
import io.netty.handler.codec.mqtt.MqttConnectPayload;
31+
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
32+
import io.netty.handler.codec.mqtt.MqttFixedHeader;
33+
import io.netty.handler.codec.mqtt.MqttMessage;
34+
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
35+
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
36+
import io.netty.handler.codec.mqtt.MqttMessageType;
37+
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
38+
import io.netty.handler.codec.mqtt.MqttPublishMessage;
39+
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
40+
import io.netty.handler.codec.mqtt.MqttQoS;
41+
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
42+
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
43+
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
44+
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
45+
import io.netty.handler.codec.mqtt.MqttVersion;
46+
import io.netty.handler.ssl.SslHandler;
2847
import io.netty.handler.timeout.IdleStateHandler;
2948
import org.slf4j.Logger;
3049
import org.slf4j.LoggerFactory;
3150

51+
import java.io.IOException;
3252
import java.net.InetSocketAddress;
33-
import java.util.*;
53+
import java.security.cert.Certificate;
54+
import java.security.cert.CertificateEncodingException;
55+
import java.util.List;
56+
import java.util.UUID;
3457
import java.util.concurrent.CompletableFuture;
3558
import java.util.concurrent.TimeUnit;
3659
import java.util.concurrent.atomic.AtomicInteger;
60+
import javax.net.ssl.SSLPeerUnverifiedException;
3761

3862
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
3963
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
40-
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
64+
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
65+
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
66+
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
67+
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
68+
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
4169
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
42-
import static io.netty.handler.codec.mqtt.MqttQoS.*;
70+
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
71+
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
4372

4473
final class MQTTConnection {
4574

@@ -297,25 +326,46 @@ private void abortConnection(MqttConnectReturnCode returnCode) {
297326
}
298327

299328
private boolean login(MqttConnectMessage msg, final String clientId) {
300-
// handle user authentication
329+
String userName = null;
330+
byte[] pwd = null;
331+
301332
if (msg.variableHeader().hasUserName()) {
302-
byte[] pwd = null;
333+
userName = msg.payload().userName();
334+
// MQTT 3.1.2.9 does not mandate that there is a password - let the authenticator determine if it's needed
303335
if (msg.variableHeader().hasPassword()) {
304336
pwd = msg.payload().passwordInBytes();
305-
} else if (!brokerConfig.isAllowAnonymous()) {
306-
LOG.info("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
307-
return false;
308337
}
309-
final String login = msg.payload().userName();
310-
if (!authenticator.checkValid(clientId, login, pwd)) {
311-
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
338+
}
339+
340+
if (brokerConfig.isPeerCertificateAsUsername()) {
341+
try {
342+
// Use peer cert as username
343+
SslHandler sslhandler = (SslHandler) channel.pipeline().get("ssl");
344+
if (sslhandler != null) {
345+
Certificate[] certificateChain = sslhandler.engine().getSession().getPeerCertificates();
346+
userName = PemUtils.certificatesToPem(certificateChain);
347+
}
348+
} catch (SSLPeerUnverifiedException e) {
349+
LOG.debug("No peer cert provided. CId={}", clientId);
350+
} catch (CertificateEncodingException | IOException e) {
351+
LOG.warn("Unable to decode client certificate. CId={}", clientId);
352+
}
353+
}
354+
355+
if (userName == null || userName.isEmpty()) {
356+
if (brokerConfig.isAllowAnonymous()) {
357+
return true;
358+
} else {
359+
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
312360
return false;
313361
}
314-
NettyUtils.userName(channel, login);
315-
} else if (!brokerConfig.isAllowAnonymous()) {
316-
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
362+
}
363+
364+
if (!authenticator.checkValid(clientId, userName, pwd)) {
365+
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, userName);
317366
return false;
318367
}
368+
NettyUtils.userName(channel, userName);
319369
return true;
320370
}
321371

0 commit comments

Comments
 (0)