+ * Fetches from API and compares against database value (if any), then inserts into database and updates cache value. + * If value received from the API is different from what is saved in the database, a [RuntimeException] is thrown. + *
+ *+ * This fallback domain is necessary for LegalHold to work with Federation (as it needs id@domain) and not just the ID anymore. + * In case there is a mismatch we are throwing a RuntimeException so it stops the execution of this app, so in an event + * of already having a defined default domain saved in the database and this app restarts with a different domain + * we don't get mismatching domains. + *
+ * @param loginClient [{@link LoginClient}] as API to get backend configuration containing default domain. + * @param metadataDAO [{@link MetadataDAO}] as DAO to get/insert default domain to database. + * + * @throws RuntimeException if received domain from API is different from the one saved in the database. + */ + FallbackDomainFetcher(LoginClient loginClient, MetadataDAO metadataDAO) { + this.loginClient = loginClient; + this.metadataDAO = metadataDAO; + } + + @Override + public void run() { + if (Cache.getFallbackDomain() != null) { return; } + + Metadata metadata = metadataDAO.get(MetadataDAO.FALLBACK_DOMAIN_KEY); + try { + BackendConfiguration apiVersionResponse = loginClient.getBackendConfiguration(); + + if (metadata == null) { + metadataDAO.insert(MetadataDAO.FALLBACK_DOMAIN_KEY, apiVersionResponse.domain); + Cache.setFallbackDomain(apiVersionResponse.domain); + } else { + if (metadata.value.equals(apiVersionResponse.domain)) { + Cache.setFallbackDomain(apiVersionResponse.domain); + } else { + String formattedExceptionMessage = String.format( + "Database already has a default domain as %s and instead we got %s from the Backend API.", + metadata.value, + apiVersionResponse.domain + ); + throw new RuntimeException(formattedExceptionMessage); + } + } + } catch (HttpException exception) { + Logger.exception(exception, "FallbackDomainFetcher.run, exception: %s", exception.getMessage()); + } catch (ProcessingException pexception) { + Logger.info("FallbackDomainFetcher.run, ignoring test exceptions"); + } + } +} diff --git a/src/main/java/com/wire/bots/hold/NotificationProcessor.java b/src/main/java/com/wire/bots/hold/NotificationProcessor.java index 6a8bc86..b28534f 100644 --- a/src/main/java/com/wire/bots/hold/NotificationProcessor.java +++ b/src/main/java/com/wire/bots/hold/NotificationProcessor.java @@ -3,38 +3,34 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.wire.bots.hold.DAO.AccessDAO; -import com.wire.bots.hold.model.Notification; -import com.wire.bots.hold.model.NotificationList; import com.wire.bots.hold.model.database.LHAccess; +import com.wire.helium.API; import com.wire.helium.LoginClient; import com.wire.helium.models.Access; +import com.wire.helium.models.Event; +import com.wire.helium.models.NotificationList; import com.wire.xenon.backend.models.Payload; import com.wire.xenon.exceptions.AuthException; import com.wire.xenon.exceptions.HttpException; import com.wire.xenon.tools.Logger; import javax.ws.rs.client.Client; -import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Cookie; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import java.util.List; import java.util.UUID; import java.util.logging.Level; public class NotificationProcessor implements Runnable { + private static final int DEFAULT_NOTIFICATION_SIZE = 100; + private final Client client; private final AccessDAO accessDAO; private final HoldMessageResource messageResource; - private final WebTarget api; - NotificationProcessor(Client client, AccessDAO accessDAO, Config config, HoldMessageResource messageResource) { + NotificationProcessor(Client client, AccessDAO accessDAO, HoldMessageResource messageResource) { this.client = client; this.accessDAO = accessDAO; this.messageResource = messageResource; - - api = client.target(config.apiHost); } @Override @@ -58,7 +54,10 @@ private Access getAccess(Cookie cookie) throws HttpException { private void process(LHAccess device) { UUID userId = device.userId; + try { + final API api = new API(client, null, device.token); + Logger.debug("`GET /notifications`: user: %s, last: %s", userId, device.last); String cookieValue = device.cookie; @@ -76,39 +75,38 @@ private void process(LHAccess device) { device.token = access.getAccessToken(); - NotificationList notificationList = retrieveNotifications(device); + NotificationList notificationList = api.retrieveNotifications( + device.clientId, + device.last, + DEFAULT_NOTIFICATION_SIZE + ); process(userId, notificationList); - } catch (AuthException e) { accessDAO.disable(userId); Logger.info("Disabled LH device for user: %s, error: %s", userId, e.getMessage()); + } catch (HttpException e) { + Logger.exception(e, "NotificationProcessor: Couldn't retrieve notifications, error: %s", e.getMessage()); } catch (Exception e) { Logger.exception(e, "NotificationProcessor: user: %s, last: %s, error: %s", userId, device.last, e.getMessage()); } } - private static String bearer(String token) { - return token == null ? null : String.format("Bearer %s", token); - } - private void process(UUID userId, NotificationList notificationList) { - - for (Notification notif : notificationList.notifications) { - for (Payload payload : notif.payload) { - if (!process(userId, payload, notif.id)) { - Logger.error("Failed to process: user: %s, notif: %s", userId, notif.id); - //return; + for (Event event : notificationList.notifications) { + for (Payload payload : event.payload) { + if (!process(userId, payload, event.id)) { + Logger.error("Failed to process: user: %s, event: %s", userId, event.id); } else { - Logger.debug("Processed: `%s` conv: %s, user: %s, notifId: %s", + Logger.debug("Processed: `%s` conv: %s, user: %s, eventId: %s", payload.type, payload.conversation, userId, - notif.id); + event.id); } } - accessDAO.updateLast(userId, notif.id); + accessDAO.updateLast(userId, event.id); } } @@ -122,13 +120,13 @@ private boolean process(UUID userId, Payload payload, UUID id) { if (payload.from == null || payload.data == null) return true; - final boolean b = messageResource.onNewMessage(userId, id, payload); + final boolean wasMessageSent = messageResource.onNewMessage(userId, id, payload); - if (!b) { + if (!wasMessageSent) { Logger.error("process: `%s` user: %s, from: %s:%s, error: %s", payload.type, userId, payload.from, payload.data.sender); } - return b; + return wasMessageSent; } private void trace(Payload payload) { @@ -141,30 +139,4 @@ private void trace(Payload payload) { } } } - - //TODO remove this and use retrieveNotifications provided by Helium - private NotificationList retrieveNotifications(LHAccess access) throws HttpException { - Response response = api.path("notifications").queryParam("client", access.clientId).queryParam("since", access.last).queryParam("size", 100).request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON).header(HttpHeaders.AUTHORIZATION, bearer(access.token)).get(); - - int status = response.getStatus(); - - if (status == 200) { - return response.readEntity(NotificationList.class); - } - - if (status == 404) { //todo what??? - return response.readEntity(NotificationList.class); - } - - if (status == 401) { //todo nginx returns text/html for 401. Cannot deserialize as json - response.readEntity(String.class); - throw new AuthException(status); - } - - if (status == 403) { - throw response.readEntity(AuthException.class); - } - - throw response.readEntity(HttpException.class); - } } diff --git a/src/main/java/com/wire/bots/hold/Service.java b/src/main/java/com/wire/bots/hold/Service.java index 67d3b3a..aa470cd 100644 --- a/src/main/java/com/wire/bots/hold/Service.java +++ b/src/main/java/com/wire/bots/hold/Service.java @@ -22,6 +22,7 @@ import com.wire.bots.cryptobox.CryptoException; import com.wire.bots.hold.DAO.AccessDAO; import com.wire.bots.hold.DAO.EventsDAO; +import com.wire.bots.hold.DAO.MetadataDAO; import com.wire.bots.hold.filters.ServiceAuthenticationFilter; import com.wire.bots.hold.healthchecks.SanityCheck; import com.wire.bots.hold.monitoring.RequestMdcFactoryFilter; @@ -63,12 +64,13 @@ import javax.ws.rs.client.Client; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class Service extends Application