diff --git a/docker-compose.yml b/docker-compose.yml index 67356c1..afc7384 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,9 +20,6 @@ services: - DB_PASSWORD=hold depends_on: - db - - elasticsearch - - kibana - - filebeat db: image: postgres:13 @@ -36,33 +33,5 @@ services: volumes: - hold-db:/var/lib/postgresql/data/ - elasticsearch: - container_name: lh-elasticsearch - image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0 - environment: - - "ES_JAVA_OPTS=-Xms1g -Xmx1g" - - "discovery.type=single-node" - ports: - - 9200:9200 - - kibana: - container_name: lh-kibana - image: docker.elastic.co/kibana/kibana:7.11.0 - environment: - - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 - - ELASTICSEARCH_URL=http://elasticsearch:9200 - depends_on: - - elasticsearch - ports: - - 5601:5601 - - filebeat: - image: "docker.elastic.co/beats/filebeat:7.2.0" - user: root - volumes: - - ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro - - /var/lib/docker:/var/lib/docker:ro - - /var/run/docker.sock:/var/run/docker.sock - volumes: hold-db: diff --git a/filebeat.yml b/filebeat.yml deleted file mode 100644 index 82cb212..0000000 --- a/filebeat.yml +++ /dev/null @@ -1,21 +0,0 @@ -filebeat.inputs: - - type: container - paths: - - '/var/lib/docker/containers/*/*.log' - -processors: - - add_docker_metadata: - host: "unix:///var/run/docker.sock" - - - decode_json_fields: - fields: ["message"] - target: "json" - overwrite_keys: true - -output.elasticsearch: - hosts: ["elasticsearch:9200"] - indices: - - index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}" - -logging.json: true -logging.metrics.enabled: false \ No newline at end of file diff --git a/hold.yaml b/hold.yaml index aed2043..7267f8f 100644 --- a/hold.yaml +++ b/hold.yaml @@ -56,6 +56,6 @@ apiHost: ${WIRE_API_HOST:-https://prod-nginz-https.wire.com} database: driverClass: ${DB_DRIVER:-org.postgresql.Driver} - url: ${DB_URL:-jdbc:postgresql://localhost/legalhold} - user: ${DB_USER:-} - password: ${DB_PASSWORD:-} + url: ${DB_URL:-jdbc:postgresql://localhost/hold} + user: ${DB_USER:-hold} + password: ${DB_PASSWORD:-hold} diff --git a/pom.xml b/pom.xml index 55baa81..7eba379 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,6 @@ 1.0.10 0.16.0 1.3.0 - true @@ -38,6 +37,11 @@ jitpack.io https://jitpack.io + + maven_central + Maven Central + https://repo.maven.apache.org/maven2/ + @@ -74,7 +78,7 @@ org.postgresql postgresql - 42.2.23.jre7 + 42.7.3 org.flywaydb @@ -90,7 +94,7 @@ com.github.spullara.mustache.java compiler - 0.9.10 + 0.9.11 com.openhtmltopdf diff --git a/src/main/java/com/wire/bots/hold/DAO/EventsDAO.java b/src/main/java/com/wire/bots/hold/DAO/EventsDAO.java index c2a3377..8da0260 100644 --- a/src/main/java/com/wire/bots/hold/DAO/EventsDAO.java +++ b/src/main/java/com/wire/bots/hold/DAO/EventsDAO.java @@ -43,21 +43,6 @@ int insert(@Bind("eventId") UUID eventId, @RegisterColumnMapper(_EventsResultSetMapper.class) List listConversations(); - @SqlQuery("SELECT DISTINCT conversationId, time FROM Events WHERE exported = 'FALSE'") - @RegisterColumnMapper(_EventsResultSetMapper.class) - List getUnexportedConvs(); - - @SqlQuery("SELECT * FROM Events WHERE conversationId = :conversationId AND exported = 'FALSE' ORDER BY time ASC") - @RegisterColumnMapper(EventsResultSetMapper.class) - List listAllUnxported(@Bind("conversationId") UUID conversationId); - - @SqlQuery("SELECT * FROM Events WHERE exported = 'FALSE' LIMIT 1000") - @RegisterColumnMapper(EventsResultSetMapper.class) - List listAllUnxported(); - - @SqlUpdate("UPDATE Events SET exported = 'TRUE' WHERE eventId = :eventId") - int markExported(@Bind("eventId") UUID eventId); - @SqlUpdate("DELETE FROM Events WHERE eventId = :eventId") int delete(@Bind("eventId") UUID eventId); diff --git a/src/main/java/com/wire/bots/hold/KibanaExporter.java b/src/main/java/com/wire/bots/hold/KibanaExporter.java deleted file mode 100644 index a4224bc..0000000 --- a/src/main/java/com/wire/bots/hold/KibanaExporter.java +++ /dev/null @@ -1,186 +0,0 @@ -package com.wire.bots.hold; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.wire.bots.hold.DAO.AccessDAO; -import com.wire.bots.hold.DAO.EventsDAO; -import com.wire.bots.hold.model.Event; -import com.wire.bots.hold.model.LHAccess; -import com.wire.bots.hold.model.Log; -import com.wire.bots.hold.utils.Cache; -import com.wire.bots.hold.utils.LegalHoldAPI; -import com.wire.xenon.backend.models.Conversation; -import com.wire.xenon.backend.models.Member; -import com.wire.xenon.backend.models.SystemMessage; -import com.wire.xenon.backend.models.User; -import com.wire.xenon.models.RemoteMessage; -import com.wire.xenon.models.TextMessage; -import com.wire.xenon.tools.Logger; -import org.jdbi.v3.core.Jdbi; - -import javax.annotation.Nullable; -import javax.ws.rs.client.Client; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - -import static com.wire.bots.hold.Const.*; - -public class KibanaExporter implements Runnable { - private final Client httpClient; - private final EventsDAO eventsDAO; - private final AccessDAO accessDAO; - private final ObjectMapper mapper = new ObjectMapper(); - - public KibanaExporter(Jdbi jdbi, Client httpClient) { - this.httpClient = httpClient; - - eventsDAO = jdbi.onDemand(EventsDAO.class); - accessDAO = jdbi.onDemand(AccessDAO.class); - } - - public void run() { - int count = 0; - - Set uniques = new HashSet<>(); - - List events = eventsDAO.listAllUnxported(); - Logger.info("KibanaExporter: Starting export of %d events", events.size()); - for (Event event : events) { - try { - final LHAccess access = accessDAO.get(event.userId); - - if (!access.enabled) { - Logger.warning("KibanaExporter: skipping event: %s, userId: %s", event.eventId, event.userId); - eventsDAO.markExported(event.eventId); - continue; - } - - Log.Kibana kibana = processEvent(event); - - if (kibana == null) { - Logger.info("KibanaExporter: skipping %s evt: %s", event.type, event.eventId); - eventsDAO.markExported(event.eventId); - continue; - } - - assert event.userId.equals(kibana.from); - - if (!uniques.add(kibana.messageID)) { - Logger.info("KibanaExporter: de-dup. msg: %s", kibana.messageID); - eventsDAO.markExported(event.eventId); - continue; - } - - _Conversation conversation = fetchConversation(event, access); - kibana.conversationName = conversation.name; - kibana.participants = conversation.participants; - kibana.sender = conversation.user; - - Log log = new Log(); - log.securehold = kibana; - System.out.println(mapper.writeValueAsString(log)); - - if (eventsDAO.markExported(event.eventId) > 0) - count++; - - } catch (Exception ex) { - Logger.exception(ex, "Export exception. cnv: %s, evt: %s", event.conversationId, event.eventId); - } - } - Logger.info("KibanaExporter: Finished exporting %d events", count); - } - - @Nullable - private Log.Kibana processEvent(Event event) throws JsonProcessingException, ParseException { - UUID userId; - UUID messageId; - String time; - String text = null; - - switch (event.type) { - case Const.CONVERSATION_CREATE: - case Const.CONVERSATION_RENAME: - case Const.CONVERSATION_MEMBER_LEAVE: - case Const.CONVERSATION_MEMBER_JOIN: { - SystemMessage msg = mapper.readValue(event.payload, SystemMessage.class); - userId = msg.from; - messageId = msg.id; - time = msg.time; - } - break; - case Const.CONVERSATION_OTR_MESSAGE_ADD_EDIT_TEXT: - case Const.CONVERSATION_OTR_MESSAGE_ADD_NEW_TEXT: { - TextMessage msg = mapper.readValue(event.payload, TextMessage.class); - userId = msg.getUserId(); - messageId = msg.getMessageId(); - time = msg.getTime(); - text = msg.getText(); - } - break; - case Const.CONVERSATION_OTR_MESSAGE_ADD_ASSET_DATA: { - RemoteMessage msg = mapper.readValue(event.payload, RemoteMessage.class); - userId = msg.getUserId(); - messageId = msg.getMessageId(); - time = msg.getTime(); - text = msg.getAssetId(); - } - break; - case CONVERSATION_OTR_MESSAGE_ADD_CALL: - case CONVERSATION_OTR_MESSAGE_ADD_REACTION: - case CONVERSATION_OTR_MESSAGE_ADD_VIDEO_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_AUDIO_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_FILE_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_IMAGE_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_DELETE_TEXT: - return null; - default: - Logger.warning("Kibana exporter: Unknown type: %s", event.type); - return null; - } - - Log.Kibana ret = new Log.Kibana(); - ret.id = event.eventId; - ret.type = event.type; - ret.messageID = messageId; - ret.conversationID = event.conversationId; - ret.from = userId; - ret.sent = date(time); - ret.text = text; - - return ret; - } - - private _Conversation fetchConversation(Event event, LHAccess access) { - _Conversation ret = new _Conversation(); - - final String token = access.token != null ? access.token : access.cookie; - final LegalHoldAPI api = new LegalHoldAPI(httpClient, event.conversationId, token); - Cache cache = new Cache(api, null); - - Conversation conversation = api.getConversation(); - ret.name = conversation.name; - ret.user = name(cache.getUser(event.userId)); - for (Member m : conversation.members) { - User user = cache.getUser(m.id); - ret.participants.add(user.handle != null ? user.handle : user.id.toString()); - } - return ret; - } - - private String name(User user) { - return user.handle != null ? user.handle : user.id.toString(); - } - - public static Long date(@Nullable String date) throws ParseException { - SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); - Date ret = parser.parse(date); - return ret.getTime(); - } - - static class _Conversation { - ArrayList participants = new ArrayList<>(); - String user; - String name; - } -} diff --git a/src/main/java/com/wire/bots/hold/MessageHandler.java b/src/main/java/com/wire/bots/hold/MessageHandler.java index d0dc4eb..1d8b854 100644 --- a/src/main/java/com/wire/bots/hold/MessageHandler.java +++ b/src/main/java/com/wire/bots/hold/MessageHandler.java @@ -3,11 +3,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.wire.bots.hold.DAO.AssetsDAO; import com.wire.bots.hold.DAO.EventsDAO; -import com.wire.bots.hold.model.Log; import com.wire.xenon.MessageHandlerBase; import com.wire.xenon.WireClient; -import com.wire.xenon.backend.models.Conversation; -import com.wire.xenon.backend.models.Member; import com.wire.xenon.backend.models.SystemMessage; import com.wire.xenon.models.*; import com.wire.xenon.tools.Logger; @@ -15,8 +12,6 @@ import java.util.UUID; -import static com.wire.bots.hold.KibanaExporter.date; - public class MessageHandler extends MessageHandlerBase { private final EventsDAO eventsDAO; private final AssetsDAO assetsDAO; @@ -36,7 +31,6 @@ public void onNewConversation(WireClient client, SystemMessage msg) { String type = msg.type; persist(eventId, convId, userId, type, msg); - kibana(type, msg, client); } @Override @@ -47,7 +41,6 @@ public void onConversationRename(WireClient client, SystemMessage msg) { String type = Const.CONVERSATION_RENAME; persist(eventId, convId, userId, type, msg); - kibana(type, msg, client); } @Override @@ -58,7 +51,6 @@ public void onMemberJoin(WireClient client, SystemMessage msg) { String type = msg.type; persist(eventId, convId, userId, type, msg); - kibana(type, msg, client); } @Override @@ -69,7 +61,6 @@ public void onMemberLeave(WireClient client, SystemMessage msg) { String type = msg.type; persist(eventId, convId, userId, type, msg); - kibana(type, msg, client); } @Override @@ -80,7 +71,6 @@ public void onText(WireClient client, TextMessage msg) { String type = Const.CONVERSATION_OTR_MESSAGE_ADD_NEW_TEXT; persist(eventId, convId, userId, type, msg); - kibana(type, msg, client); } @Override @@ -91,7 +81,6 @@ public void onText(WireClient client, EphemeralTextMessage msg) { String type = Const.CONVERSATION_OTR_MESSAGE_ADD_NEW_TEXT; persist(eventId, convId, userId, type, msg); - kibana(type, msg, client); } @Override @@ -102,7 +91,6 @@ public void onEditText(WireClient client, EditedTextMessage msg) { String type = Const.CONVERSATION_OTR_MESSAGE_ADD_EDIT_TEXT; persist(eventId, convId, userId, type, msg); - kibana(type, msg, client); } @Override @@ -221,59 +209,4 @@ private void persist(UUID eventId, UUID convId, UUID userId, String type, Object eventId); } } - - void kibana(String type, TextMessage msg, WireClient client) { - try { - Log.Kibana kibana = new Log.Kibana(); - kibana.id = msg.getEventId(); - kibana.type = type; - kibana.messageID = msg.getMessageId(); - kibana.conversationID = msg.getConversationId(); - kibana.from = msg.getUserId(); - kibana.sent = date(msg.getTime()); - kibana.text = msg.getText(); - - kibana.sender = client.getUser(msg.getUserId()).handle; - - Conversation conversation = client.getConversation(); - kibana.conversationName = conversation.name; - - for (Member m : conversation.members) { - kibana.participants.add(client.getUser(m.id).handle); - } - - Log log = new Log(); - log.securehold = kibana; - System.out.println(mapper.writeValueAsString(log)); - } catch (Exception e) { - Logger.exception(e, "MessageHandler:kibana: evt: %s", msg.getEventId()); - } - } - - void kibana(String type, SystemMessage msg, WireClient client) { - try { - Log.Kibana kibana = new Log.Kibana(); - kibana.id = msg.id; - kibana.type = type; - kibana.messageID = msg.id; //todo fix xenon to extract the messageId from payload.data.id - kibana.conversationID = msg.convId; - kibana.from = msg.from; - kibana.sent = date(msg.time); - - kibana.sender = client.getUser(msg.from).handle; - - Conversation conversation = client.getConversation(); - kibana.conversationName = conversation.name; - - for (Member m : conversation.members) { - kibana.participants.add(client.getUser(m.id).handle); - } - - Log log = new Log(); - log.securehold = kibana; - System.out.println(mapper.writeValueAsString(log)); - } catch (Exception e) { - Logger.exception(e, "MessageHandler:kibana: evt: %s", msg.id); - } - } } diff --git a/src/main/java/com/wire/bots/hold/NotificationProcessor.java b/src/main/java/com/wire/bots/hold/NotificationProcessor.java index b64fa41..2303919 100644 --- a/src/main/java/com/wire/bots/hold/NotificationProcessor.java +++ b/src/main/java/com/wire/bots/hold/NotificationProcessor.java @@ -79,7 +79,7 @@ private void process(LHAccess device) { NotificationList notificationList = retrieveNotifications(device); - process(userId, device.clientId, notificationList); + process(userId, notificationList); } catch (AuthException e) { accessDAO.disable(userId); @@ -93,15 +93,19 @@ private static String bearer(String token) { return token == null ? null : String.format("Bearer %s", token); } - private void process(UUID userId, String clientId, NotificationList notificationList) { + private void process(UUID userId, NotificationList notificationList) { for (Notification notif : notificationList.notifications) { for (Payload payload : notif.payload) { - if (!process(userId, clientId, payload, notif.id)) { + if (!process(userId, payload, notif.id)) { Logger.error("Failed to process: user: %s, notif: %s", userId, notif.id); //return; } else { - Logger.debug("Processed: `%s` conv: %s, user: %s:%s, notifId: %s", payload.type, payload.convId, userId, clientId, notif.id); + Logger.debug("Processed: `%s` conv: %s, user: %s, notifId: %s", + payload.type, + payload.convId, + userId, + notif.id); } } @@ -109,10 +113,13 @@ private void process(UUID userId, String clientId, NotificationList notification } } - private boolean process(UUID userId, String clientId, Payload payload, UUID id) { + private boolean process(UUID userId, Payload payload, UUID id) { trace(payload); - Logger.debug("Payload: %s %s:%s, from: %s", payload.type, userId, clientId, payload.from); + Logger.debug("Payload: %s %s, from: %s", + payload.type, + userId, + payload.from); if (payload.from == null || payload.data == null) return true; @@ -136,6 +143,7 @@ private void trace(Payload payload) { } } + //TODO remove this and use retrieveNotifications provided by Helium private NotificationList retrieveNotifications(LHAccess access) throws HttpException { Response response = api.path("notifications").queryParam("client", access.clientId).queryParam("since", access.last).queryParam("size", 100).request(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON).header(HttpHeaders.AUTHORIZATION, bearer(access.token)).get(); diff --git a/src/main/java/com/wire/bots/hold/Service.java b/src/main/java/com/wire/bots/hold/Service.java index 9217bf8..2563e8a 100644 --- a/src/main/java/com/wire/bots/hold/Service.java +++ b/src/main/java/com/wire/bots/hold/Service.java @@ -27,7 +27,6 @@ import com.wire.bots.hold.monitoring.RequestMdcFactoryFilter; import com.wire.bots.hold.monitoring.StatusResource; import com.wire.bots.hold.resource.*; -import com.wire.bots.hold.tasks.ExportTask; import com.wire.bots.hold.utils.HoldClientRepo; import com.wire.bots.hold.utils.ImagesBundle; import com.wire.xenon.Const; @@ -131,7 +130,6 @@ public void run(Config config, Environment environment) { final HoldClientRepo repo = new HoldClientRepo(jdbi, cf, httpClient); final HoldMessageResource holdMessageResource = new HoldMessageResource(new MessageHandler(jdbi), repo); final NotificationProcessor notificationProcessor = new NotificationProcessor(httpClient, accessDAO, config, holdMessageResource); - // final KibanaExporter kibanaExporter = new KibanaExporter(jdbi, httpClient); environment.lifecycle() .scheduledExecutorService("notifications") @@ -141,8 +139,6 @@ public void run(Config config, Environment environment) { CollectorRegistry.defaultRegistry.register(new DropwizardExports(metrics)); environment.getApplicationContext().addServlet(MetricsServlet.class, "/metrics"); - - environment.admin().addTask(new ExportTask(jdbi, httpClient, environment.lifecycle())); } public Config getConfig() { diff --git a/src/main/java/com/wire/bots/hold/model/Log.java b/src/main/java/com/wire/bots/hold/model/Log.java deleted file mode 100644 index 809575f..0000000 --- a/src/main/java/com/wire/bots/hold/model/Log.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.wire.bots.hold.model; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - - -public class Log { - public Kibana securehold; - - public static class Kibana { - public UUID id; - public String type; - public UUID conversationID; - public String conversationName; - public List participants = new ArrayList<>(); - public Long sent; - public String sender; - public UUID messageID; - public String text; - public UUID from; - } -} \ No newline at end of file diff --git a/src/main/java/com/wire/bots/hold/tasks/ExportTask.java b/src/main/java/com/wire/bots/hold/tasks/ExportTask.java deleted file mode 100644 index ace9d7f..0000000 --- a/src/main/java/com/wire/bots/hold/tasks/ExportTask.java +++ /dev/null @@ -1,257 +0,0 @@ -package com.wire.bots.hold.tasks; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.wire.bots.hold.Const; -import com.wire.bots.hold.DAO.AccessDAO; -import com.wire.bots.hold.DAO.EventsDAO; -import com.wire.bots.hold.model.Event; -import com.wire.bots.hold.model.LHAccess; -import com.wire.bots.hold.utils.Cache; -import com.wire.helium.API; -import com.wire.xenon.backend.models.Conversation; -import com.wire.xenon.backend.models.Member; -import com.wire.xenon.backend.models.SystemMessage; -import com.wire.xenon.backend.models.User; -import com.wire.xenon.models.RemoteMessage; -import com.wire.xenon.models.TextMessage; -import com.wire.xenon.tools.Logger; -import io.dropwizard.lifecycle.setup.LifecycleEnvironment; -import io.dropwizard.servlets.tasks.Task; -import org.jdbi.v3.core.Jdbi; - -import javax.ws.rs.client.Client; -import java.io.PrintWriter; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static com.wire.bots.hold.Const.*; - -public class ExportTask extends Task implements Runnable { - private final Client httpClient; - private final LifecycleEnvironment lifecycleEnvironment; - private final EventsDAO eventsDAO; - private final AccessDAO accessDAO; - private final ObjectMapper mapper = new ObjectMapper(); - private Cache cache; - - public ExportTask(Jdbi jdbi, Client httpClient, LifecycleEnvironment lifecycle) { - super("kibana"); - this.httpClient = httpClient; - - lifecycleEnvironment = lifecycle; - eventsDAO = jdbi.onDemand(EventsDAO.class); - accessDAO = jdbi.onDemand(AccessDAO.class); - } - - @Override - public void execute(Map> parameters, PrintWriter output) { - lifecycleEnvironment.scheduledExecutorService("Kibana") - .build() - .schedule(this, 1, TimeUnit.SECONDS); - - output.println("Kibana task has been queued"); - } - - public void run() { - int count = 0; - - final LHAccess access = accessDAO.getSingle(); - final API api = new API(httpClient, null, access.token); - - cache = new Cache(api, null); - - List events = eventsDAO.getUnexportedConvs(); - Logger.info("Exporting %d conversations to Kibana", events.size()); - - for (Event e : events) { - String name = null; - List participants = new ArrayList<>(); - Set uniques = new HashSet<>(); - - List messages = eventsDAO.listAllUnxported(e.conversationId); - - for (Event event : messages) { - try { - switch (event.type) { - case CONVERSATION_RENAME: { - SystemMessage msg = mapper.readValue(event.payload, SystemMessage.class); - if (!uniques.add(msg.id)) continue; - - name = msg.conversation.name; - - for (Member m : msg.conversation.members) { - User user = cache.getUser(m.id); - participants.add(user); - } - - String text = format(msg.conversation); - - log(event, name, participants, msg, text); - - if (eventsDAO.markExported(event.eventId) > 0) count++; - } - break; - case CONVERSATION_MEMBER_JOIN: { - SystemMessage msg = mapper.readValue(event.payload, SystemMessage.class); - if (!uniques.add(msg.id)) continue; - - StringBuilder sb = new StringBuilder(); - sb.append(String.format("%s added these participants: ", name(msg.from))); - for (UUID userId : msg.users) { - User user = cache.getUser(userId); - participants.add(user); - - sb.append(String.format("%s,", name(userId))); - } - - log(event, name, participants, msg, sb.toString()); - - if (eventsDAO.markExported(event.eventId) > 0) count++; - } - case CONVERSATION_MEMBER_LEAVE: { - SystemMessage msg = mapper.readValue(event.payload, SystemMessage.class); - if (!uniques.add(msg.id)) continue; - - StringBuilder sb = new StringBuilder(); - sb.append(String.format("%s removed these participants: ", name(msg.from))); - for (UUID userId : msg.users) { - participants.removeIf(x -> x.id.equals(userId)); - - sb.append(String.format("%s,", name(userId))); - } - - log(event, name, participants, msg, sb.toString()); - - if (eventsDAO.markExported(event.eventId) > 0) count++; - } - break; - case CONVERSATION_OTR_MESSAGE_ADD_NEW_TEXT: - case CONVERSATION_OTR_MESSAGE_ADD_EDIT_TEXT: { - TextMessage msg = mapper.readValue(event.payload, TextMessage.class); - if (!uniques.add(msg.getMessageId())) continue; - - log(event, name, participants, msg); - - if (eventsDAO.markExported(event.eventId) > 0) - count++; - } - break; - case CONVERSATION_OTR_MESSAGE_ADD_CALL: - case CONVERSATION_OTR_MESSAGE_ADD_REACTION: - case CONVERSATION_OTR_MESSAGE_ADD_VIDEO_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_AUDIO_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_FILE_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_IMAGE_PREVIEW: - case CONVERSATION_OTR_MESSAGE_ADD_DELETE_TEXT: { - eventsDAO.markExported(event.eventId); - } - break; - case CONVERSATION_OTR_MESSAGE_ADD_ASSET_DATA: { - RemoteMessage msg = mapper.readValue(event.payload, RemoteMessage.class); - if (!uniques.add(msg.getMessageId())) continue; - - log(event, name, participants, msg); - - if (eventsDAO.markExported(event.eventId) > 0) - count++; - } - break; - } - } catch (Exception ex) { - Logger.exception(ex, "Export exception %s %s", event.conversationId, event.eventId); - } - } - } - Logger.info("Finished exporting %d messages to Kibana", count); - } - - private void log(Event event, String conversation, List participants, TextMessage msg) throws Exception { - Kibana kibana = new Kibana(); - kibana.id = event.eventId; - kibana.type = event.type; - kibana.conversationID = msg.getConversationId(); - kibana.conversationName = conversation; - kibana.participants = participants.stream().map(x -> x.handle != null ? x.handle : x.id.toString()).collect(Collectors.toList()); - kibana.messageID = msg.getMessageId(); - kibana.sender = name(msg.getUserId()); - kibana.text = msg.getText(); - kibana.sent = date(msg.getTime()); - - _Log log = new _Log(); - log.securehold = kibana; - System.out.println(mapper.writeValueAsString(log)); - } - - private void log(Event event, String conversation, List participants, RemoteMessage msg) throws Exception { - Kibana kibana = new Kibana(); - kibana.id = event.eventId; - kibana.type = event.type; - kibana.conversationID = msg.getConversationId(); - kibana.conversationName = conversation; - kibana.participants = participants.stream().map(x -> x.handle != null ? x.handle : x.id.toString()).collect(Collectors.toList()); - kibana.messageID = msg.getMessageId(); - kibana.sender = name(msg.getUserId()); - kibana.text = msg.getAssetId(); - kibana.sent = date(msg.getTime()); - - _Log log = new _Log(); - log.securehold = kibana; - System.out.println(mapper.writeValueAsString(log)); - } - - private void log(Event event, String conversation, List participants, SystemMessage msg, String text) throws Exception { - Kibana kibana = new Kibana(); - kibana.id = event.eventId; - kibana.type = event.type; - kibana.conversationID = msg.convId; - kibana.conversationName = conversation; - kibana.participants = participants.stream().map(x -> x.handle != null ? x.handle : x.id.toString()).collect(Collectors.toList()); - kibana.messageID = msg.id; - kibana.sender = name(msg.from); - kibana.text = text; - kibana.sent = date(msg.time); - - _Log log = new _Log(); - log.securehold = kibana; - System.out.println(mapper.writeValueAsString(log)); - } - - private String format(Conversation conversation) { - StringBuilder sb = new StringBuilder(); - sb.append(String.format("%s created conversation '%s' with: ", name(conversation.creator), conversation.name)); - for (Member member : conversation.members) { - sb.append(String.format("%s,", name(member.id))); - } - return sb.toString(); - } - - private String name(UUID userId) { - User user = cache.getUser(userId); - return user.handle != null ? user.handle : user.id.toString(); - } - - public static long date(String date) throws ParseException { - SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); - Date ret = parser.parse(date); - return ret.getTime(); - } - - static class Kibana { - public UUID id; - public String type; - public UUID conversationID; - public String conversationName; - public List participants; - public long sent; - public String sender; - public UUID messageID; - public String text; - } - - static class _Log { - public Kibana securehold; - } -} diff --git a/src/main/java/com/wire/bots/hold/utils/HoldClientRepo.java b/src/main/java/com/wire/bots/hold/utils/HoldClientRepo.java index 798f8c7..b03f1b3 100644 --- a/src/main/java/com/wire/bots/hold/utils/HoldClientRepo.java +++ b/src/main/java/com/wire/bots/hold/utils/HoldClientRepo.java @@ -27,7 +27,7 @@ public HoldClientRepo(Jdbi jdbi, CryptoFactory cf, Client httpClient) { public WireClient getClient(UUID userId, String deviceId, UUID convId) throws CryptoException { Crypto crypto = cf.create(userId); final LHAccess single = jdbi.onDemand(AccessDAO.class).get(userId); - final API api = new LegalHoldAPI(httpClient, convId, single.token); + final API api = new API(httpClient, convId, single.token); return new HoldWireClient(userId, deviceId, convId, crypto, api); } } diff --git a/src/main/java/com/wire/bots/hold/utils/HoldWireClient.java b/src/main/java/com/wire/bots/hold/utils/HoldWireClient.java index 5fddc09..dfd2a05 100644 --- a/src/main/java/com/wire/bots/hold/utils/HoldWireClient.java +++ b/src/main/java/com/wire/bots/hold/utils/HoldWireClient.java @@ -4,14 +4,12 @@ import com.wire.xenon.WireClient; import com.wire.xenon.WireClientBase; import com.wire.xenon.assets.IAsset; -import com.wire.xenon.backend.models.Conversation; import com.wire.xenon.backend.models.User; import com.wire.xenon.crypto.Crypto; import com.wire.xenon.models.AssetKey; import com.wire.xenon.models.otr.PreKey; import java.util.ArrayList; -import java.util.Collection; import java.util.UUID; public class HoldWireClient extends WireClientBase implements WireClient { @@ -20,7 +18,7 @@ public class HoldWireClient extends WireClientBase implements WireClient { private final UUID convId; private final String deviceId; - HoldWireClient(UUID userId, String deviceId, UUID convId, Crypto crypto, API api) { + public HoldWireClient(UUID userId, String deviceId, UUID convId, Crypto crypto, API api) { super(api, crypto, null); this.userId = userId; this.convId = convId; diff --git a/src/main/java/com/wire/bots/hold/utils/LegalHoldAPI.java b/src/main/java/com/wire/bots/hold/utils/LegalHoldAPI.java deleted file mode 100644 index 39c227d..0000000 --- a/src/main/java/com/wire/bots/hold/utils/LegalHoldAPI.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.wire.bots.hold.utils; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.wire.helium.API; -import com.wire.xenon.backend.models.Conversation; -import com.wire.xenon.backend.models.Member; - -import javax.ws.rs.client.Client; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.HttpHeaders; -import java.util.List; -import java.util.UUID; - -public class LegalHoldAPI extends API { - private final WebTarget conversationsPath; - - private final UUID convId; - private final String token; - - public LegalHoldAPI(Client client, UUID convId, String token) { - super(client, convId, token); - - this.convId = convId; - this.token = token; - - WebTarget target = client - .target(host()); - - conversationsPath = target.path("legalhold/conversations"); - } - - @Override - public Conversation getConversation() { - _Conv conv = conversationsPath. - path(convId.toString()). - request(). - header(HttpHeaders.AUTHORIZATION, bearer(token)). - get(_Conv.class); - - Conversation ret = new Conversation(); - ret.name = conv.name; - ret.id = conv.id; - ret.members = conv.members.others; - return ret; - } - - @JsonIgnoreProperties(ignoreUnknown = true) - public static class _Conv { - @JsonProperty - public UUID id; - - @JsonProperty - public String name; - - @JsonProperty - public _Members members; - } - - @JsonIgnoreProperties(ignoreUnknown = true) - static class _Members { - @JsonProperty - public List others; - } -} diff --git a/src/main/resources/db/migration/V106__events_drop_export.sql b/src/main/resources/db/migration/V106__events_drop_export.sql new file mode 100644 index 0000000..66f3f20 --- /dev/null +++ b/src/main/resources/db/migration/V106__events_drop_export.sql @@ -0,0 +1,2 @@ +alter table Events +DROP column EXPORTED; diff --git a/src/test/java/com/wire/bots/hold/DatabaseTest.java b/src/test/java/com/wire/bots/hold/DatabaseTest.java index f938bf5..c6c05d4 100644 --- a/src/test/java/com/wire/bots/hold/DatabaseTest.java +++ b/src/test/java/com/wire/bots/hold/DatabaseTest.java @@ -70,10 +70,8 @@ public void eventsTextMessageTest() throws JsonProcessingException { assert textMessage.getMessageId().equals(message.getMessageId()); - List events = eventsDAO.listAllUnxported(convId); + List events = eventsDAO.listAll(convId); assert events.size() == 2; - - List events1 = eventsDAO.listAllUnxported(); } @Test diff --git a/src/test/java/com/wire/bots/hold/Serialization.java b/src/test/java/com/wire/bots/hold/Serialization.java index 3e70cb4..81f6906 100644 --- a/src/test/java/com/wire/bots/hold/Serialization.java +++ b/src/test/java/com/wire/bots/hold/Serialization.java @@ -1,23 +1,13 @@ package com.wire.bots.hold; import com.fasterxml.jackson.databind.ObjectMapper; -import com.wire.bots.hold.tasks.ExportTask; import com.wire.xenon.models.ImageMessage; import com.wire.xenon.models.TextMessage; import org.junit.Test; import java.io.IOException; -import java.text.ParseException; public class Serialization { - - @Test - public void testDateParser() throws ParseException { - String str = "2022-05-28T04:26:07.325Z"; - long date = ExportTask.date(str); - assert date == 1653704767000L; - } - @Test public void imageMessage() throws IOException { String payload = "{" +