Skip to content

Commit

Permalink
Merge pull request #24 from protegeproject/improve-error-handling
Browse files Browse the repository at this point in the history
Improvements to error handling
  • Loading branch information
alexsilaghi authored Jan 11, 2025
2 parents 9a7fbb9 + 4ed5dcb commit 89c82bc
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 125 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>edu.stanford.protege</groupId>
<artifactId>webprotege-gwt-api-gateway</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
<name>webprotege-gwt-api-gateway</name>
<description>The API Gateway for the WebProtégé GWT User Interface</description>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import edu.stanford.protege.webprotege.common.UserId;
import edu.stanford.protege.webprotege.ipc.CommandExecutionException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
Expand All @@ -17,7 +18,9 @@
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -31,17 +34,21 @@
@Lazy
public class GatewayController {


@Value("${webprotege.apigateway.forceUserName:}")
private String forceUserName;

private final long timeoutInMs;

private final Logger logger = LoggerFactory.getLogger(GatewayController.class);

private final RpcRequestProcessor rpcRequestProcessor;

private final LogoutHandler logoutHandler;


public GatewayController(RpcRequestProcessor rpcRequestProcessor, LogoutHandler logoutHandler) {
public GatewayController(@Value("${webprotege.gateway.timeout:600000}") long timeoutInMs, RpcRequestProcessor rpcRequestProcessor, LogoutHandler logoutHandler) {
this.timeoutInMs = timeoutInMs;
this.rpcRequestProcessor = rpcRequestProcessor;
this.logoutHandler = logoutHandler;
}
Expand All @@ -63,13 +70,29 @@ public RpcResponse execute(@RequestBody RpcRequest request,
}
var result = rpcRequestProcessor.processRequest(request, accessToken, new UserId(userId));
try {
return result.get(10, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException e) {
logger.error("Error while waiting for response to request", e);
return RpcResponse.forError(request.methodName(), HttpStatus.INTERNAL_SERVER_ERROR);

return result.get(timeoutInMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
if(e.getCause() instanceof CommandExecutionException ex) {
logger.info("Error with cause that is a CommandExecutionException. Mapping to a ResponseStatusException: {}", ex.getStatus());
throw new ResponseStatusException(ex.getStatus(), e.getMessage());
}
else if(e.getCause() instanceof ResponseStatusException ex) {
logger.info("Error with cause that is a ResponseStatusException. Rethrowing.");
throw ex;
}
else {
logger.error("Error while handling request. UserId: {} Request: {}", userId, request, e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "An error occurred while handling the request.");
}
} catch (TimeoutException e) {
return RpcResponse.forError(request.methodName(), HttpStatus.GATEWAY_TIMEOUT);
logger.error("Time out in GatewayController while waiting for response to request. Timeout set to {} ms. Request: {}", timeoutInMs, request, e);
throw new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, "A time out occurred while waiting for the response.");
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for response. UserId: {} Request: {}", userId, request, e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "An interrupt occurred while handling the request.");
} catch (CancellationException e) {
logger.error("Cancellation while waiting for response. UserId: {} Request: {}", userId, request, e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "A cancellation occurred while waiting for the request.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,13 @@
*/
public interface Messenger {

/**
* Sends a message that is associated with a request. The request is used to determined where to send the message to.
* @param request The request that is used to determine the channel where the request message will be sent to.
* @param accessToken The access token.
* @param payload The message payload.
* @param userId The user associated with the message request
* @return A future that contains the response message.
*/
CompletableFuture<Msg> sendAndReceive(RpcRequest request, String accessToken, byte[] payload, UserId userId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import edu.stanford.protege.webprotege.common.UserId;
import edu.stanford.protege.webprotege.ipc.Headers;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;

Expand All @@ -16,41 +17,41 @@
*/
public class MessengerImpl implements Messenger {


private final AsyncRabbitTemplate asyncRabbitTemplate;



public MessengerImpl(AsyncRabbitTemplate asyncRabbitTemplate) {
this.asyncRabbitTemplate = asyncRabbitTemplate;
}

/**
* Sends the payload to an exchange and uses the topicName as a routing key and waits for a maximum of the duration specified by the timeout
*
* @return The reply message.
*/
@Override
public CompletableFuture<Msg> sendAndReceive(RpcRequest rpcRequest,String accessToken, byte[] payload, UserId userId) {
String topicName = rpcRequest.methodName();
org.springframework.amqp.core.Message rabbitRequest = MessageBuilder.withBody(payload).build();
rabbitRequest.getMessageProperties().getHeaders().put(Headers.ACCESS_TOKEN, accessToken.toString());
rabbitRequest.getMessageProperties().getHeaders().put("webprotege_methodName", topicName);
rabbitRequest.getMessageProperties().getHeaders().put(Headers.USER_ID, userId.value());
if(rpcRequest.params().has("projectId")) {
public CompletableFuture<Msg> sendAndReceive(RpcRequest rpcRequest, String accessToken, byte[] payload, UserId userId) {
var method = rpcRequest.methodName();
var rabbitRequest = MessageBuilder.withBody(payload).build();
var headers = rabbitRequest.getMessageProperties().getHeaders();
headers.put(Headers.ACCESS_TOKEN, accessToken);
headers.put(Headers.METHOD, method);
headers.put(Headers.USER_ID, userId.value());
if (rpcRequest.params().has("projectId")) {
var projectId = rpcRequest.params().get("projectId").asText();
rabbitRequest.getMessageProperties().getHeaders().put(Headers.PROJECT_ID, projectId);
headers.put(Headers.PROJECT_ID, projectId);
}
return asyncRabbitTemplate.sendAndReceive("webprotege-exchange", topicName, rabbitRequest).thenApply(response -> {
Map<String, String> responseHeaders = new HashMap<>();
if(response != null) {
response.getMessageProperties().getHeaders().forEach((key, value) -> responseHeaders.put(key, String.valueOf(value)));
return new Msg(response.getBody(), responseHeaders);
} else {
throw new RuntimeException("Null message received from Rabbit ");
}
});

return asyncRabbitTemplate.sendAndReceive("webprotege-exchange", method, rabbitRequest)
.thenCompose(replyMsg -> {
// Transform the replyMsg to a Msg
var responseHeaders = new HashMap<String, String>();
if (replyMsg != null) {
replyMsg.getMessageProperties().getHeaders().forEach((key, value) -> responseHeaders.put(key, String.valueOf(value)));
var msg = new Msg(replyMsg.getBody(), responseHeaders);
return CompletableFuture.completedFuture(msg);
} else {
return CompletableFuture.failedFuture(new RuntimeException("Null replyMsg from Rabbit"));
}
});
}


}
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package edu.stanford.protege.webprotege.gateway;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.UserId;
import edu.stanford.protege.webprotege.ipc.CommandExecutionException;
import edu.stanford.protege.webprotege.ipc.Headers;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.web.server.ResponseStatusException;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

/**
* Matthew Horridge
Expand All @@ -25,104 +29,110 @@ public class RpcRequestProcessor {

private static final Logger logger = LoggerFactory.getLogger(RpcRequestProcessor.class);

private final String applicationName;

private final Messenger messenger;

private final ObjectMapper objectMapper;


public RpcRequestProcessor(Messenger messenger,
public RpcRequestProcessor(@Value("${spring.application.name}") String applicationName,
Messenger messenger,
ObjectMapper objectMapper) {
this.applicationName = applicationName;
this.messenger = messenger;
this.objectMapper = objectMapper;
}

/**
* Process the {@link RpcRequest}. This is a blocking method, intended for use
* by a Rest Controller.
* @param request The {@link RpcRequest} to handle
* Process the {@link RpcRequest}. Note that application level errors are returns as RPC error responses while
* all other errors (for example transport level errors, messaging errors etc. are returned as HTTP errors).
*
* @param request The {@link RpcRequest} to handle
* @param accessToken A JWT access token that identifies the principle
* @param userId The userId that corresponds to the principal
* @param userId The userId that corresponds to the principal
* @return The {@link RpcResponse} that corresponds to the message that was received in response to the
* message that was sent
* message that was sent. A failed future will be returned if the request could not be processed, for whatever
* reason. Failed futures will have a {@link ResponseStatusException} as the error with a status code of
* 500 Internal Server Error.
*/
public CompletableFuture<RpcResponse> processRequest(RpcRequest request,
String accessToken,
UserId userId) {
try {
var payload = writePayloadForRequest(request);
// Write out the payload and send it
var payload = objectMapper.writer().writeValueAsBytes(request.params());
return sendMessage(request, accessToken, userId, payload);
} catch (JsonProcessingException e) {
logger.error("Error serializing RPC request payload: {}. Returning failed future with ResponseStatusException HTTP 500 Internal Server Error.", e.getMessage(), e);
return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage(), e));
}
}

private CompletableFuture<RpcResponse> sendMessage(RpcRequest request, String accessToken, UserId userId, byte[] payload) {
try {
var reply = messenger.sendAndReceive(request,
accessToken,
payload, userId);

return reply.handleAsync((replyMsg, error) -> {
if(error != null) {
return createErrorResponse(request.methodName(), error);
}
var errorHeader = replyMsg.headers().get(Headers.ERROR);
if (errorHeader != null) {
try {
var executionException = objectMapper.readValue(errorHeader, CommandExecutionException.class);
return createRpcResponse(request.methodName(), executionException.getStatus());
} catch (JsonProcessingException e) {
logger.error("Error parsing error response into ", e);
return createRpcResponse(request.methodName(), HttpStatus.INTERNAL_SERVER_ERROR);
}
}

var result = parseResultFromResponseMessagePayload(replyMsg.payload());
return RpcResponse.forResult(request.methodName(), result);
});
accessToken,
payload, userId);
return reply
.exceptionally(e -> {
// Convert all exceptions to a ResponseStatusException
logger.error("Error during send and receive: {}. Returning failed future with ResponseStatusException HTTP 500 Internal Server Error", e.getMessage(), e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage(), e);
})
.thenCompose(msg -> {
try {

var rpcResponse = convertReplyMessageToRpcResponse(request, msg);
return CompletableFuture.completedFuture(rpcResponse);
} catch (IOException e) {
logger.error("Error parsing RPC response message: {}. Returning failed future with HTTP 500 Internal Server Error.", e.getMessage(), e);
return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage(), e));
}
});
} catch (Exception e) {
// Note: Catches InterruptedException
return createRpcResponseFuture(request.methodName(), HttpStatus.INTERNAL_SERVER_ERROR);
return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage(), e));
}
}

private static CompletableFuture<RpcResponse> createRpcResponseFuture(String method, HttpStatus httpStatus) {
var responseFuture = new CompletableFuture<RpcResponse>();
RpcResponse response = createRpcResponse(method, httpStatus);
responseFuture.complete(response);
return responseFuture;
}

private static RpcResponse createRpcResponse(String method, HttpStatus httpStatus) {
return new RpcResponse(method,
new RpcError(httpStatus.value(),
httpStatus.getReasonPhrase(), Collections.emptyMap()),
null);
private @NotNull RpcResponse convertReplyMessageToRpcResponse(RpcRequest request, Msg replyMsg) throws IOException {
var errorHeader = replyMsg.headers().get(Headers.ERROR);
if (errorHeader != null) {
var serviceName = replyMsg.headers().get("webprotege_serviceName");
return createRpcErrorResponseFromMessageErrorHeader(request, errorHeader, serviceName);
} else {
var result = parseResultFromResponseMessagePayload(replyMsg.payload());
return RpcResponse.forResult(request.methodName(), result);
}
}

@SuppressWarnings("unchecked")
private Map<String, Object> parseResultFromResponseMessagePayload(byte [] replyPayload) {
private @NotNull RpcResponse createRpcErrorResponseFromMessageErrorHeader(RpcRequest request, String errorHeader, @Nullable String serviceName) throws JsonProcessingException {
try {
if(replyPayload.length == 0) {
return Map.of();
}
return (Map<String, Object>) objectMapper.readValue(replyPayload, Map.class);
} catch (IOException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error deserializing payload from response message", e);
var executionException = objectMapper.readValue(errorHeader, CommandExecutionException.class);
var httpStatus = executionException.getStatus();
var errorMessage = httpStatus.getReasonPhrase() + " in " + applicationName;
return RpcResponse.forError(request.methodName(),
new RpcError(httpStatus.value(),
errorMessage, Collections.emptyMap()));
} catch (JsonProcessingException e) {
logger.error("Error parsing error header {} into CommandExecutionException: {}", errorHeader, e.getMessage(), e);
throw e;
}
}

private RpcResponse createErrorResponse(String method, Throwable e) {
if (e instanceof CommandExecutionException) {
var status = ((CommandExecutionException) e).getStatus();
return RpcResponse.forError(method, new RpcError(status.value(), status.getReasonPhrase(), Collections.emptyMap()));
}
else if(e instanceof TimeoutException) {
return createRpcResponse(method, HttpStatus.GATEWAY_TIMEOUT);
}
else {
return createRpcResponse(method, HttpStatus.INTERNAL_SERVER_ERROR);
@SuppressWarnings("unchecked")
private Map<String, Object> parseResultFromResponseMessagePayload(byte[] replyPayload) throws IOException {
if (replyPayload.length == 0) {
return Map.of();
}
}

private byte [] writePayloadForRequest(RpcRequest request) {
try {
return objectMapper.writer().writeValueAsBytes(request.params());
return (Map<String, Object>) objectMapper.readValue(replyPayload, Map.class);
} catch (JsonProcessingException e) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST);
logger.error("Error parsing payload into Map: {}", e.getMessage(), e);
throw e;
} catch (IOException e) {
logger.error("Error processing payload: {}", e.getMessage(), e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ MinioClient minioClient(MinioProperties properties) {

@Bean
@Lazy
RpcRequestProcessor rpcRequestProcessor(ObjectMapper objectMapper,
RpcRequestProcessor rpcRequestProcessor(@Value("${spring.application.name}") String applicationName,
ObjectMapper objectMapper,
Messenger messenger) {
return new RpcRequestProcessor(messenger, objectMapper);
return new RpcRequestProcessor(applicationName, messenger, objectMapper);
}

@Bean
Expand Down
Loading

0 comments on commit 89c82bc

Please sign in to comment.