Skip to content

Commit

Permalink
Merge pull request #4066 from dseurotech/fix-avoid_multiple_MessageLi…
Browse files Browse the repository at this point in the history
…steners

🐛 Avoid multiple GuiceLocator instances, and handle potential multiple KapuaMessageListeners
  • Loading branch information
Coduz authored Jul 8, 2024
2 parents 4af4e52 + fae4a63 commit 2d110c4
Show file tree
Hide file tree
Showing 78 changed files with 291 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
*******************************************************************************/
package org.eclipse.kapua.broker.artemis.plugin.security;

import com.google.inject.Provides;
import java.util.UUID;

import javax.inject.Named;
import javax.inject.Singleton;
import javax.jms.JMSException;

import org.eclipse.kapua.KapuaErrorCodes;
import org.eclipse.kapua.KapuaRuntimeException;
import org.eclipse.kapua.broker.artemis.plugin.security.context.SecurityContext;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.LoginMetric;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSetting;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSettingKey;
import org.eclipse.kapua.client.security.MessageListener;
import org.eclipse.kapua.client.security.KapuaMessageListener;
import org.eclipse.kapua.client.security.ServiceClient;
import org.eclipse.kapua.client.security.ServiceClientMessagingImpl;
import org.eclipse.kapua.client.security.amqpclient.Client;
Expand All @@ -28,12 +33,10 @@
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;

import javax.inject.Named;
import javax.inject.Singleton;
import javax.jms.JMSException;
import java.util.UUID;
import com.google.inject.Provides;

public class ArtemisSecurityModule extends AbstractKapuaModule {

@Override
protected void configureModule() {
bind(ServerContext.class).in(Singleton.class);
Expand All @@ -46,9 +49,9 @@ protected void configureModule() {
@Provides
@Singleton
SecurityContext securityContext(LoginMetric loginMetric,
BrokerSetting brokerSettings,
MetricsSecurityPlugin metricsSecurityPlugin,
RunWithLock runWithLock) {
BrokerSetting brokerSettings,
MetricsSecurityPlugin metricsSecurityPlugin,
RunWithLock runWithLock) {
return new SecurityContext(loginMetric,
brokerSettings.getBoolean(BrokerSettingKey.PRINT_SECURITY_CONTEXT_REPORT, false),
new LocalCache<>(
Expand All @@ -74,14 +77,14 @@ SecurityContext securityContext(LoginMetric loginMetric,
@Singleton
@Provides
ServiceClient authServiceClient(
MessageListener messageListener,
KapuaMessageListener messageListener,
@Named("clusterName") String clusterName,
@Named("brokerHost") String brokerHost,
SystemSetting systemSetting) {
return new ServiceClientMessagingImpl(messageListener, buildClient(systemSetting, clusterName, brokerHost, messageListener));
}

public Client buildClient(SystemSetting systemSetting, String clusterName, String brokerHost, MessageListener messageListener) {
public Client buildClient(SystemSetting systemSetting, String clusterName, String brokerHost, KapuaMessageListener messageListener) {
//TODO change configuration (use service event broker for now)
String clientId = "svc-ath-" + UUID.randomUUID().toString();
String host = systemSetting.getString(SystemSettingKey.SERVICE_BUS_HOST, "events-broker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
*******************************************************************************/
package org.eclipse.kapua.client.security;

import javax.inject.Singleton;

import org.eclipse.kapua.client.security.metric.AuthLoginMetricFactory;
import org.eclipse.kapua.client.security.metric.AuthMetric;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;

import javax.inject.Singleton;
import com.google.inject.Provides;

public class ClientSecurityModule extends AbstractKapuaModule {

@Override
protected void configureModule() {
bind(MetricsClientSecurity.class).in(Singleton.class);
bind(MessageListener.class).in(Singleton.class);
bind(AuthMetric.class).in(Singleton.class);
bind(AuthLoginMetricFactory.class).in(Singleton.class);
}

@Provides
@Singleton
KapuaMessageListener messageListener(MetricsClientSecurity metricsClientSecurity) {
return new KapuaMessageListener(metricsClientSecurity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@
*******************************************************************************/
package org.eclipse.kapua.client.security;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import javax.inject.Singleton;
import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.qpid.jms.message.JmsTextMessage;
import org.eclipse.kapua.KapuaErrorCodes;
import org.eclipse.kapua.KapuaRuntimeException;
Expand All @@ -27,48 +35,54 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;

/**
* This class is responsible to correlate request/response messages. Only one instance of this must be present at any given time!
*/
@Singleton
public class MessageListener extends ClientMessageListener {
public class KapuaMessageListener extends ClientMessageListener implements Closeable {

protected static Logger logger = LoggerFactory.getLogger(MessageListener.class);

private final Map<String, ResponseContainer<?>> callbacks;//is not needed the synchronization
protected static Logger logger = LoggerFactory.getLogger(KapuaMessageListener.class);
//Should only be one
private static final AtomicInteger INSTANCES = new AtomicInteger();
private final int currentInstanceNumber;
//Hate to use a static here, but at least in case of multiple listeners they will be able to correlate messages
private static final Map<String, ResponseContainer<?>> CALLBACKS = new ConcurrentHashMap<>();
//is not needed the synchronization
private static ObjectMapper mapper = new ObjectMapper();
private static ObjectReader reader = mapper.reader();//check if it's thread safe

private MetricsClientSecurity metrics;

@Inject
public MessageListener(MetricsClientSecurity metricsClientSecurity) {
logger.debug("Starting MessageListener");
KapuaMessageListener(MetricsClientSecurity metricsClientSecurity) {
currentInstanceNumber = INSTANCES.incrementAndGet();
if (currentInstanceNumber != 1) {
logger.warn("Starting KapuaMessageListener, instance number {}! Is this right?!?!?", currentInstanceNumber);
} else {
logger.debug("Starting KapuaMessageListener, instance {}", currentInstanceNumber);
}
this.metrics = metricsClientSecurity;
callbacks = new ConcurrentHashMap<>();
}

@Override
public void onMessage(Message message) {
logger.debug("KapuaMessageListener processing message, instance {} responding", currentInstanceNumber);
try {
SecurityAction securityAction = SecurityAction.valueOf(message.getStringProperty(MessageConstants.HEADER_ACTION));
switch (securityAction) {
case brokerConnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;
case brokerDisconnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;
case getEntity:
updateResponseContainer(buildAccountResponseFromMessage((JmsTextMessage) message));
break;
default:
throw new KapuaRuntimeException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "action");
case brokerConnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;
case brokerDisconnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;
case getEntity:
updateResponseContainer(buildAccountResponseFromMessage((JmsTextMessage) message));
break;
default:
throw new KapuaRuntimeException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "action");
}
} catch (JMSException | IOException e) {
metrics.getLoginCallbackError().inc();
Expand All @@ -77,8 +91,8 @@ public void onMessage(Message message) {
}

private <R extends Response> void updateResponseContainer(R response) throws JMSException, IOException {
logger.debug("update callback {} on instance {}, map size: {}", response.getRequestId(), this, callbacks.size());
ResponseContainer<R> responseContainer = (ResponseContainer<R>) callbacks.get(response.getRequestId());
logger.debug("update callback {} on instance {}, map size: {}", response.getRequestId(), this, CALLBACKS.size());
ResponseContainer<R> responseContainer = (ResponseContainer<R>) CALLBACKS.get(response.getRequestId());
if (responseContainer == null) {
//internal error
logger.error("Cannot find request container for requestId {}", response.getRequestId());
Expand All @@ -102,13 +116,17 @@ private EntityResponse buildAccountResponseFromMessage(JmsTextMessage message) t
}

public void registerCallback(String requestId, ResponseContainer<?> responseContainer) {
callbacks.put(requestId, responseContainer);
logger.debug("registered callback {} on instance {}, map size: {}", requestId, this, callbacks.size());
CALLBACKS.put(requestId, responseContainer);
logger.debug("registered callback {} on instance {}, map size: {}", requestId, this, CALLBACKS.size());
}

public void removeCallback(String requestId) {
callbacks.remove(requestId);
logger.debug("removed callback {} from instance {}, map size: {}", requestId, this, callbacks.size());
CALLBACKS.remove(requestId);
logger.debug("removed callback {} from instance {}, map size: {}", requestId, this, CALLBACKS.size());
}

@Override
public void close() throws IOException {
INSTANCES.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*******************************************************************************/
package org.eclipse.kapua.client.security;

import com.fasterxml.jackson.core.JsonProcessingException;
import javax.jms.JMSException;

import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.bean.AuthRequest;
Expand All @@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import com.fasterxml.jackson.core.JsonProcessingException;

/**
* Security service. Implementation through AMQP messaging layer.
Expand All @@ -34,17 +34,18 @@ public class ServiceClientMessagingImpl implements ServiceClient {
private static final Logger logger = LoggerFactory.getLogger(ServiceClientMessagingImpl.class);

private static final int TIMEOUT = 5000;
private final MessageListener messageListener;
private final KapuaMessageListener messageListener;

private Client client;

public ServiceClientMessagingImpl(MessageListener messageListener, Client client) {
public ServiceClientMessagingImpl(KapuaMessageListener messageListener, Client client) {
this.messageListener = messageListener;
this.client = client;
}

@Override
public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException)
public AuthResponse brokerConnect(AuthRequest authRequest)
throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException)
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerConnect.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*******************************************************************************/
package org.eclipse.kapua.client.security.bean;

import org.eclipse.kapua.client.security.MessageListener;
import org.eclipse.kapua.client.security.KapuaMessageListener;

public class ResponseContainer<O extends Response> {

Expand All @@ -35,7 +35,7 @@ public String getRequestId() {
return requestId;
}

public static <O extends Response> ResponseContainer<O> createAnRegisterNewMessageContainer(MessageListener messageListener, Request request) {
public static <O extends Response> ResponseContainer<O> createAnRegisterNewMessageContainer(KapuaMessageListener messageListener, Request request) {
ResponseContainer<O> messageContainer = new ResponseContainer<>(request.getRequestId());
messageListener.registerCallback(request.getRequestId(), messageContainer);
return messageContainer;
Expand Down
4 changes: 0 additions & 4 deletions commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
*******************************************************************************/
package org.eclipse.kapua.commons.jpa;

import com.zaxxer.hikari.HikariDataSource;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;

import com.zaxxer.hikari.HikariDataSource;

public final class DataSource {

private static HikariDataSource hikariDataSource;
Expand Down
Loading

0 comments on commit 2d110c4

Please sign in to comment.