From 95d27b147f6ebb101a45fe571c23864812648cdd Mon Sep 17 00:00:00 2001 From: loi Date: Fri, 20 Dec 2024 18:23:12 -0800 Subject: [PATCH] convert events to redis sub/pub --- .../ipac/firefly/core/RedisService.java | 36 +++- .../edu/caltech/ipac/firefly/core/Util.java | 130 +++++++++++++ .../ipac/firefly/messaging/Message.java | 102 ++++++++-- .../ipac/firefly/messaging/Messenger.java | 13 +- .../ipac/firefly/server/db/DbInstance.java | 2 +- .../ipac/firefly/server/db/DbMonitor.java | 11 +- .../ipac/firefly/server/db/DuckDbAdapter.java | 4 +- .../firefly/server/db/EmbeddedDbUtil.java | 5 +- .../firefly/server/db/spring/JdbcFactory.java | 9 +- .../server/events/CacheEventWorker.java | 3 + .../server/events/MessageEventWorker.java | 36 ++++ .../server/events/ReplicatedQueueList.java | 26 +-- .../server/events/ServerEventManager.java | 46 +++-- .../server/events/ServerEventQueue.java | 5 + .../server/events/WebsocketConnector.java | 4 +- .../server/query/EmbeddedDbProcessor.java | 7 +- .../firefly/server/query/SearchProcessor.java | 26 --- .../caltech/ipac/firefly/util/event/Name.java | 10 +- .../ipac/table/io/TableParseHandler.java | 10 +- .../edu/caltech/ipac/util/StringUtils.java | 78 -------- .../ipac/firefly/core/RedisServiceTest.java | 14 +- .../caltech/ipac/firefly/core/UtilTest.java | 179 ++++++++++++++++++ .../ipac/firefly/messaging/MessengerTest.java | 2 +- .../caltech/ipac/table/DuckDbAdapterTest.java | 49 ----- .../ipac/table/EmbeddedDbUtilTest.java | 27 --- .../cache/CachePeerProviderFactoryTest.java | 2 +- .../caltech/ipac/util/cache/CacheTest.java | 9 +- 27 files changed, 558 insertions(+), 287 deletions(-) create mode 100644 src/firefly/java/edu/caltech/ipac/firefly/server/events/MessageEventWorker.java create mode 100644 src/firefly/test/edu/caltech/ipac/firefly/core/UtilTest.java diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java b/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java index 6383a3d612..160c184351 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java @@ -89,6 +89,7 @@ static void startLocal() { .setting("maxmemory %s".formatted(MAX_MEM)) .setting("dir %s".formatted(DB_DIR)) // Directory where redis database files are stored .setting("dbfilename redis.rdb") // RDB file name + .setting("save 600 1") // RDB file name .build(); redisServer.start(); connect(); @@ -138,27 +139,42 @@ public static Map getStats() { } catch (NoSuchAlgorithmException e) {/* ignore */} try (Jedis redis = getConnection()) { + var infos = redis.info().split("\r\n"); + Arrays.stream(infos).filter(s -> s.contains("version")).findFirst() + .ifPresent(s -> stats.put("version", s.split(":")[1].trim())); + stats.put("active conn", jedisPool.getNumActive()); stats.put("idle conn", jedisPool.getNumIdle()); stats.put("max conn", maxPoolSize); stats.put("max-wait", jedisPool.getMaxBorrowWaitTimeMillis()); stats.put("avg-wait", jedisPool.getMeanBorrowWaitTimeMillis()); - stats.put("cache-size", redis.dbSize()); - stats.put("DB_DIR", DB_DIR); - stats.put("MAX_MEM", MAX_MEM); stats.put("password", passwd); - - Arrays.stream(redis.info().split("\r\n")).forEach((s) -> { - if (!s.startsWith("#")) { - String[] kv = s.split(":"); - stats.put(kv[0], kv[1]); - } - }); + stats.put("db-size", redis.dbSize()); + addStat(stats, redis, "maxmemory"); + addStat(stats, redis, "save"); + addStat(stats, redis, "dir"); + addStat(stats, redis, "dbfilename"); + addStat(stats, redis, "appendfilename"); + stats.put("---MEMORY STATS----", ""); + var mem = redis.memoryStats(); + stats.put("Total memory used", mem.get("dataset.bytes")); + stats.put("Total memory allocated", mem.get("allocator.allocated")); + stats.put("Fragmented memory", mem.get("fragmentation")); + stats.put("Fragmentation ratio", mem.get("allocator-fragmentation.ratio")); + stats.put("Number of keys stored", mem.get("keys.count")); + stats.put("Avg per key", mem.get("keys.bytes-per-key")); + stats.put("Pct of memory used", mem.get("dataset.percentage")); + stats.put("Peak memory used", mem.get("peak.allocated")); } catch (Exception ignored) {} } return stats; } + public static void addStat(Map stats, Jedis redis, String key) { + var c = redis.configGet(key); + if (c.size() > 1) stats.put(key, c.get(1)); + } + public static String getRedisHostPortDesc() { return redisHost + ":" + REDIS_PORT + " ("+ getStatus() + ")"; } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/Util.java b/src/firefly/java/edu/caltech/ipac/firefly/core/Util.java index f23567824b..ef3c7b9d5e 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/Util.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/Util.java @@ -11,6 +11,10 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Base64; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Predicate; /** * Date: 11/19/24 @@ -70,6 +74,132 @@ public static Object deserialize(String base64) { } +//==================================================================== +// Functional Helpers +//==================================================================== + /** + * A function that throws exception + * @param the return type of this function + */ + @FunctionalInterface + public interface FuncWithEx { + T get() throws Exception; + } + + /** + * A callable that throws exception + */ + @FunctionalInterface + public interface CallWithEx { + void run() throws Exception; + } + /** + * A function with parameter that throws exception + */ + @FunctionalInterface + public interface FuncParamWithEx { + T apply(P p) throws Exception; + } + + public static class Try { + private final T val; + private final Exception ex; + + Try(T val, Exception ex) { + this.val = val; + this.ex = ex; + } + + /** + * Get the value if no exception was thrown + * @return the value if no exception was thrown, or null + */ + public T get() { + return ex == null ? val : null; + } + + public T getOrElse(T defVal) { + return ex == null ? val : defVal; + } + + public T getOrElse(Consumer onError) { + if (ex == null) return val; + onError.accept(ex); + return null; + } + + public static Try it(FuncParamWithEx func, P param) { + try { + return new Try<>(func.apply(param), null); + } catch (Exception e) { + return new Try<>(null, e); + } + } + + public static Try it(CallWithEx func) { + try { + func.run(); + return new Try<>(null, null); + } catch (Exception e) { + return new Try<>(null, e); + } + } + + public static Try it(FuncWithEx func) { + try { + return new Try<>(func.get(), null); + } catch (Exception e) { + return new Try<>(null, e); + } + } + + /** + * Execute the given function until it passes test, then return the result + * @param func the function to execute + * @param test test the returned value + * @param tries the number of times to try + * @return Try results + */ + public static Try until(FuncWithEx func, Predicate test, int tries) { + for (int i = 0; i < tries; i++) { + var res = it(func).get(); + if (res != null && test.test(res)) return new Try<>(res, null); + } + return new Try<>(null, new IndexOutOfBoundsException("Exceeded max tries")); + } + + } + + + public static class SynchronizedAccess { + private final ConcurrentHashMap activeRequests = new ConcurrentHashMap<>(); + + @FunctionalInterface + public interface LockHandle { + void unlock(); + } + + /** + * Acquires a lock associated with the given ID. If the lock does not already exist, it is created. + * + * @param id the identifier for the lock + * @return a {@code Runnable} that, when executed, releases the lock and removes it from the active requests + */ + public LockHandle lock(String id) { + ReentrantLock lock = activeRequests.computeIfAbsent(id, k -> new ReentrantLock()); + Logger.getLogger().trace("waiting %s: %s\n".formatted(id, lock)); + lock.lock(); + Logger.getLogger().trace("got lock %s: %s\n".formatted(id, lock)); + return () -> { + try { + lock.unlock(); // Ensure lock is released even if an exception occurs + } finally { + if (!lock.isLocked()) activeRequests.remove(id); // Remove the lock from activeRequests if no threads are using it + Logger.getLogger().trace("unlock %s: %s\n".formatted(id, lock)); + } + }; + } + } } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Message.java b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Message.java index b0e4e93ef2..3133511fa0 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Message.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Message.java @@ -6,38 +6,35 @@ import edu.caltech.ipac.firefly.core.background.JobInfo; import edu.caltech.ipac.firefly.core.background.JobManager; +import edu.caltech.ipac.firefly.data.ServerEvent; import edu.caltech.ipac.firefly.data.ServerEvent.Scope; import edu.caltech.ipac.firefly.server.ServerContext; +import edu.caltech.ipac.firefly.util.event.Name; import edu.caltech.ipac.util.AppProperties; +import java.io.Serializable; + +import static edu.caltech.ipac.util.StringUtils.applyIfNotEmpty; +import static edu.caltech.ipac.firefly.data.ServerEvent.*; + /** * The message object sent or received by Messenger. As implemented, Messenger * uses JSON when serialize/deserialize is needed. However, this is an internal * implementation, and it can be changed without breaking Messenger/Message contract. * - * Date: 2019-03-15 - * * @author loi * @version $Id: $ */ public class Message { + private static final String TOPIC_KEY = "msg_topic"; protected JsonHelper helper = new JsonHelper(); - public MsgHeader getHeader() { - Scope scope = Scope.valueOf(helper.getValue(Scope.SELF.name(), "header", "scope")); - String to = helper.getValue("", "header", "to"); - String from = helper.getValue("", "header", "from"); - String subject = helper.getValue("", "header", "subject"); - MsgHeader header = new MsgHeader(scope, to, subject); - header.setFrom(from); - return header; + public void setTopic(String topic) { + setValue(topic, TOPIC_KEY); } - public void setHeader(Scope scope, String to, String subject, String from) { - helper.setValue(scope.name(), "header", "scope"); - if (to != null) helper.setValue(to, "header", "to"); - if (from != null) helper.setValue(from, "header", "from"); - if (subject != null) helper.setValue(subject, "header", "subject"); + public String getTopic() { + return getValue(null, TOPIC_KEY); } public Message setValue(Object value, String... paths) { @@ -63,24 +60,89 @@ public String toJson() { // Predefined messages //==================================================================== + /** + * Represents a server event message that can be sent or received by the Messenger. + * This class encapsulates the details of a server event, including its name, target, data type, data, and source. + * It provides methods to construct an event from a ServerEvent object and to parse a Message object back into a ServerEvent. + * + *

Example usage:

+ *
+     * {@code
+     * ServerEvent serverEvent = new ServerEvent(name, scope, data);
+     * Message.Event eventMessage = new Message.Event(serverEvent);
+     * }
+     * 
+ **/ + public static final class Event extends Message { + public static final String TOPIC = "firefly-events"; + + public Event(ServerEvent se) { + setTopic(TOPIC); + setValue(se.getName().getName(), "name"); + applyIfNotEmpty(se.getTarget().getScope(), (s) -> setValue(s.name(), "target", "scope")); + applyIfNotEmpty(se.getTarget().getChannel(), (s) -> setValue(s, "target", "channel")); + applyIfNotEmpty(se.getTarget().getConnID(), (s) -> setValue(s, "target", "connID")); + applyIfNotEmpty(se.getTarget().getUserKey(), (s) -> setValue(s, "target", "userKey")); + setValue(se.getDataType().name(), "dataType"); + setValue(se.getData(), "data"); + setValue(se.getFrom(), "from"); + } + public static ServerEvent parse(Message msg) { + try { + Name name = Name.parse(msg.getValue(null, "name")); + if (name == null) return null; + + Scope scope = Scope.valueOf(msg.getValue(null, "target", "scope")); + String connID = msg.getValue(null, "target", "connID"); + String channel = msg.getValue(null, "target", "channel"); + String userKey = msg.getValue(null, "target", "userKey"); + DataType dtype = DataType.valueOf(msg.getValue(null, "dataType")); + Serializable data = msg.getValue(null, "data"); + String from = msg.getValue(null, "from"); + return new ServerEvent(name, new EventTarget(scope, connID, channel, userKey), dtype, data, from); + } catch (Exception e) { + return null; + } + } + } + public static final class JobCompleted extends Message { public static final String TOPIC = "JobCompleted"; static final String FROM = AppProperties.getProperty("mail.smtp.from", "donotreply@ipac.caltech.edu"); JobInfo job; public JobCompleted(JobInfo jobInfo) { job = jobInfo; + setTopic(TOPIC); setHeader(Scope.CHANNEL, jobInfo.getOwner(), TOPIC, FROM); - helper.setValue(JobManager.toJsonObject(jobInfo), "jobInfo"); + setValue(JobManager.toJsonObject(jobInfo), "jobInfo"); var user = ServerContext.getRequestOwner().getUserInfo(); if (user != null) { - helper.setValue(user.getName(), "user", "name"); - helper.setValue(user.getEmail(), "user", "email"); - helper.setValue(user.getLoginName(), "user", "loginName"); + setValue(user.getName(), "user", "name"); + setValue(user.getEmail(), "user", "email"); + setValue(user.getLoginName(), "user", "loginName"); } var ssoAdpt = ServerContext.getRequestOwner().getSsoAdapter(); if (ssoAdpt != null) { - helper.setValue(ssoAdpt.getAuthToken(), "user", "authToken"); + setValue(ssoAdpt.getAuthToken(), "user", "authToken"); } } + + public MsgHeader getHeader() { + Scope scope = Scope.valueOf(getValue(Scope.SELF.name(), "header", "scope")); + String to = getValue("", "header", "to"); + String from = getValue("", "header", "from"); + String subject = getValue("", "header", "subject"); + MsgHeader header = new MsgHeader(scope, to, subject); + header.setFrom(from); + return header; + } + + public void setHeader(Scope scope, String to, String subject, String from) { + setValue(scope.name(), "header", "scope"); + if (to != null) setValue(to, "header", "to"); + if (from != null) setValue(from, "header", "from"); + if (subject != null) setValue(subject, "header", "subject"); + } } + } \ No newline at end of file diff --git a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java index 77c2fc108a..8fd31ecf5b 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java @@ -13,6 +13,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import static edu.caltech.ipac.util.StringUtils.applyIfNotEmpty; + /** * An implementation of publish-subscribe messaging pattern based on Jedis client and Redis backend. * This class abstract the use of threads, and it ensures that there is only 1 thread used per topic. @@ -77,16 +79,13 @@ public static void publish(String topic, Message msg) { } /** - * Send the given message. The message's subject is used as the topic. - * @param msg the message to send + * Publishes the given message to its topic. + * @param msg the message to be published */ - public static void publish(Message msg) { - // some firefly's specific logic here... - String topic = msg.getHeader().getSubject(); - publish(topic, msg); + public static void publish(Message msg) { + applyIfNotEmpty(msg.getTopic(), (topic) -> publish(topic, msg)); } - /** * Internal handler class used to manage the one-to-many relationship of Messenger's subscriber and * Jedis's subscriber diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbInstance.java b/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbInstance.java index f4710aaaa0..a3f43da12d 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbInstance.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbInstance.java @@ -109,7 +109,7 @@ public String getDbUrl() { public void setPooled(boolean pooled) { isPooled = pooled;} public boolean testConn(Connection conn) { try (Statement stmt = conn.createStatement()) { - stmt.execute("SELECT 1"); // works with all DMBS + stmt.execute("SELECT 1 FROM (VALUES (0))"); // HSQL required FROM clause return true; } catch (SQLException e) { return false; } }; diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbMonitor.java b/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbMonitor.java index 3edda7f52a..da4811675c 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbMonitor.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/db/DbMonitor.java @@ -5,13 +5,18 @@ package edu.caltech.ipac.firefly.server.db; import edu.caltech.ipac.firefly.server.util.Logger; +import edu.caltech.ipac.firefly.util.Ref; import edu.caltech.ipac.util.AppProperties; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static edu.caltech.ipac.firefly.core.Util.Try; +import static edu.caltech.ipac.firefly.server.ServerContext.SHORT_TASK_EXEC; /** * Date: 5/3/24 @@ -92,9 +97,11 @@ public static DbAdapter.EmbeddedDbStats getRuntimeStats(boolean doUpdate) { public static void updateDbStats() { LOGGER.trace("DbAdapter -> updateDbStats"); + Ref> t = new Ref<>(); for (DbAdapter.EmbeddedDbInstance db : dbInstances.values()) { - db.updateStats(); + t.set(SHORT_TASK_EXEC.submit(db::updateStats)); } + Try.it(() -> t.get().get(5, TimeUnit.SECONDS)); // run all in parallel, but wait for up to 5 seconds } //==================================================================== diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/db/DuckDbAdapter.java b/src/firefly/java/edu/caltech/ipac/firefly/server/db/DuckDbAdapter.java index cf80c612e1..633785b747 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/db/DuckDbAdapter.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/db/DuckDbAdapter.java @@ -32,6 +32,7 @@ import java.util.Date; import java.util.List; +import static edu.caltech.ipac.firefly.core.Util.Try; import static edu.caltech.ipac.firefly.server.db.DuckDbUDF.*; import static edu.caltech.ipac.firefly.server.db.EmbeddedDbUtil.*; import static edu.caltech.ipac.util.StringUtils.*; @@ -253,7 +254,8 @@ public String interpretError(Throwable e) { if (e instanceof SQLException ex) { JSONObject json = (JSONObject) JSONValue.parse(ex.getMessage().replace("%s:".formatted(ex.getClass().getName()), "")); String msg = json.get("exception_message").toString().split("\n")[0]; - String type = getSafe(() -> json.get("error_subtype").toString(), json.get("exception_type").toString()); + String type = Try.it(() -> json.get("error_subtype").toString()) + .getOrElse(json.get("exception_type").toString()); return type + ":" + msg; } return super.interpretError(e); diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/db/EmbeddedDbUtil.java b/src/firefly/java/edu/caltech/ipac/firefly/server/db/EmbeddedDbUtil.java index ab30369b14..f1965c0488 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/db/EmbeddedDbUtil.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/db/EmbeddedDbUtil.java @@ -35,6 +35,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static edu.caltech.ipac.firefly.core.Util.Try; import static edu.caltech.ipac.firefly.server.ServerContext.SHORT_TASK_EXEC; import static edu.caltech.ipac.firefly.data.TableServerRequest.TBL_FILE_PATH; import static edu.caltech.ipac.firefly.data.TableServerRequest.TBL_FILE_TYPE; @@ -413,10 +414,10 @@ public static void dbToDataType(DataType dtype, ResultSet rs) { // into base64 string for storage. //==================================================================== public static Object deserialize(ResultSet rs, String cname) { - return getSafe(() -> Util.deserialize(rs.getString(cname))); + return Try.it(() -> Util.deserialize(rs.getString(cname))).get(); } public static Object deserialize(ResultSet rs, int cidx) { - return getSafe(() -> Util.deserialize(rs.getString(cidx))); + return Try.it(() -> Util.deserialize(rs.getString(cidx))).get(); } //==================================================================== // privates functions diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/db/spring/JdbcFactory.java b/src/firefly/java/edu/caltech/ipac/firefly/server/db/spring/JdbcFactory.java index c2801d296d..98026b2d09 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/db/spring/JdbcFactory.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/db/spring/JdbcFactory.java @@ -20,9 +20,10 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Properties; -import static edu.caltech.ipac.util.StringUtils.getIf; +import static edu.caltech.ipac.firefly.core.Util.Try; /** * Date: Oct 7, 2008 @@ -40,7 +41,8 @@ public class JdbcFactory { * @return */ public static JdbcTemplate getTemplate(DbInstance dbInstance) { - DataSource datasource = getIf( () -> getDataSource(dbInstance), (ds) -> ds != null,3); // return null if failed after 3 tries + DataSource datasource = Try.until( () -> getDataSource(dbInstance), Objects::nonNull,3) + .getOrElse((e) -> logger.info("Failed to get DataSource after 3 tries")); return datasource == null ? null : new JdbcTemplate(datasource); } @@ -65,7 +67,8 @@ public static TransactionTemplate getTransactionTemplate(DataSource dataSource) * @return */ public static SimpleJdbcTemplate getSimpleTemplate(DbInstance dbInstance) { - DataSource datasource = getIf( () -> getDataSource(dbInstance), (ds) -> ds != null,3); // return null if failed after 3 tries + DataSource datasource = Try.until( () -> getDataSource(dbInstance), Objects::nonNull,3) + .getOrElse((e) -> logger.info("Failed to get DataSource after 3 tries")); return datasource == null ? null : new SimpleJdbcTemplate(datasource); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/CacheEventWorker.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/CacheEventWorker.java index 95c1a93f3c..6f050ba1c1 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/events/CacheEventWorker.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/CacheEventWorker.java @@ -24,7 +24,10 @@ /** * @author Trey Roby + * + * @deprecated - no longer using replicating cache; use {@link edu.caltech.ipac.firefly.server.events.MessageEventWorker} instead */ +@Deprecated public class CacheEventWorker implements ServerEventManager.EventWorker { private static final Cache cache= CacheManager.getDistributed(); diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/MessageEventWorker.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/MessageEventWorker.java new file mode 100644 index 0000000000..37ff6ddfa2 --- /dev/null +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/MessageEventWorker.java @@ -0,0 +1,36 @@ +/* + * License information at https://github.com/Caltech-IPAC/firefly/blob/master/License.txt + */ +package edu.caltech.ipac.firefly.server.events; + +import edu.caltech.ipac.firefly.data.ServerEvent; +import edu.caltech.ipac.firefly.messaging.Message; +import edu.caltech.ipac.firefly.messaging.Messenger; + +import static edu.caltech.ipac.firefly.messaging.Message.Event.TOPIC; + +/** + * Use Messenger to deliver events. + */ +public class MessageEventWorker implements ServerEventManager.EventWorker { + + public MessageEventWorker() { + Messenger.subscribe(TOPIC, msg -> { + ServerEvent sev = Message.Event.parse(msg); + if (sev != null) { + processEvent(sev); + } + }); + } + + public void deliver(ServerEvent sev) { + if (sev != null) { + Messenger.publish(new Message.Event(sev)); + } + } + + public void processEvent(final ServerEvent sev) { + ServerEventManager.processEvent(sev); + } +} + diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java index 6c09abb3f4..b4b2641d3e 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java @@ -20,31 +20,21 @@ */ class ReplicatedQueueList { - private static final String HOST_NAME= FileUtil.getHostname(); - private static final StringKey REP_QUEUE_MAP = new StringKey("ReplicatedEventQueueMap"); - private static Cache getCache() { return CacheManager.getLocal(); } + private static final StringKey HOST_NAME= new StringKey(FileUtil.getHostname()); + private static final String REP_QUEUE_MAP = "ReplicatedEventQueueMap"; + private static Cache getCache() { return CacheManager.getDistributedMap(REP_QUEUE_MAP); } synchronized void setQueueListForNode(List list) { Cache cache= getCache(); - List replicatedList= new ArrayList<>(); - - for(ServerEventQueue q : list) { - replicatedList.add(new ServerEventQueue(q.getConnID(),q.getChannel(),q.getUserKey(),null)); - } - Map allListMap= (Map)cache.get(REP_QUEUE_MAP); - if (allListMap==null) allListMap= new HashMap(); - allListMap.put(HOST_NAME,replicatedList); - cache.put(REP_QUEUE_MAP, allListMap); + cache.put(HOST_NAME, list); } synchronized List getCombinedNodeList() { - Cache cache= getCache(); List retList= new ArrayList<>(); - Map allListMap= (Map)cache.get(REP_QUEUE_MAP); - if (allListMap==null) return Collections.emptyList(); - - for(Object v : allListMap.values()) retList.addAll((List)v); - + Cache cache= getCache(); + for(String k : cache.getKeys()) { + retList.addAll((List)cache.get(new StringKey(k))); + } return retList; } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventManager.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventManager.java index 58f42f113f..c0181c03d0 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventManager.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventManager.java @@ -23,14 +23,14 @@ */ public class ServerEventManager { - private static final boolean USE_CACHE_EVENT_WORKER = true; - private static final EventWorker eventWorker = USE_CACHE_EVENT_WORKER ? - new CacheEventWorker() : new SimpleEventWorker(); - private static final List evQueueList= new CopyOnWriteArrayList(); + private static final boolean USE_MESSAGE_EVENT_WORKER = true; + private static final EventWorker eventWorker = USE_MESSAGE_EVENT_WORKER ? + new MessageEventWorker() : new LocalEventWorker(); + private static final List localEventQueues = new CopyOnWriteArrayList<>(); + private static final ReplicatedQueueList allEventQueues = new ReplicatedQueueList(); private static final Logger.LoggerImpl LOG = Logger.getLogger(); private static long totalEventCnt; private static long deliveredEventCnt; - private static ReplicatedQueueList repQueueList= new ReplicatedQueueList(); /** @@ -99,22 +99,30 @@ public static void fireEvent(ServerEvent sev) { public static void addEventQueue(ServerEventQueue queue) { Logger.briefInfo("Channel: create new Queue for: "+ queue.getQueueID() ); - evQueueList.add(queue); - repQueueList.setQueueListForNode(evQueueList); + localEventQueues.add(queue); + allEventQueues.setQueueListForNode(localEventQueues); } - static List getEvQueueList() { - return evQueueList; + /** + * Get the list of ServerEventQueue that are local to this node. + * @return list of ServerEventQueue + */ + static List getLocalEventQueues() { + return localEventQueues; } - static List getAllServerEvQueueList() { - return repQueueList.getCombinedNodeList(); + /** + * Get the list of all ServerEventQueue across all nodes(multiple instances of Firefly). + * @return list of ServerEventQueue + */ + static List getAllEventQueue() { + return allEventQueues.getCombinedNodeList(); } static void processEvent(ServerEvent ev) { totalEventCnt++; boolean delivered = false; - for(ServerEventQueue queue : evQueueList) { + for(ServerEventQueue queue : localEventQueues) { try { if (queue.matches(ev)) { try { @@ -137,8 +145,8 @@ static void processEvent(ServerEvent ev) { } public static void removeEventQueue(ServerEventQueue queue) { - evQueueList.remove(queue); - repQueueList.setQueueListForNode(evQueueList); + localEventQueues.remove(queue); + allEventQueues.setQueueListForNode(localEventQueues); } //==================================================================== @@ -147,7 +155,7 @@ public static void removeEventQueue(ServerEventQueue queue) { public static int getActiveQueueCnt() { int cnt = 0; - for(ServerEventQueue queue : evQueueList) { + for(ServerEventQueue queue : localEventQueues) { if (queue.getEventConnector().isOpen()) { cnt++; } @@ -156,7 +164,7 @@ public static int getActiveQueueCnt() { } public static List getQueueDescriptionList(int limit) { - return evQueueList.stream() + return localEventQueues.stream() .map(ServerEventQueue::convertToDescription) .sorted((d1,d2) -> (int)(d2.lastPutTime()-d1.lastPutTime())) .limit(limit) @@ -173,7 +181,7 @@ public static List getQueueDescriptionList(in public static int getActiveQueueChannelCnt(String channel) { int cnt = 0; if (StringUtils.isEmpty(channel)) return 0; - for(ServerEventQueue queue : evQueueList) { + for(ServerEventQueue queue : localEventQueues) { if (channel.equals(queue.getChannel()) && queue.getEventConnector().isOpen()) { cnt++; } else { @@ -198,10 +206,10 @@ public static long getDeliveredEventCnt() { //==================================================================== public interface EventWorker { - public void deliver(ServerEvent sev); + void deliver(ServerEvent sev); } - private static class SimpleEventWorker implements EventWorker { + private static class LocalEventWorker implements EventWorker { public void deliver(ServerEvent sev) { ServerEventManager.processEvent(sev); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java index a81da0091e..9a524b6673 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java @@ -14,6 +14,11 @@ import java.io.Serializable; /** + * This class manages server events and their delivery to clients. + * It holds information about the connection ID, channel, and user key associated with the event queue. + * The class provides methods to convert events to JSON, parse JSON events, and deliver events to the appropriate clients. + * It also includes functionality to match events based on their scope and target information. + * * @author Trey Roby */ public class ServerEventQueue implements Serializable { diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/WebsocketConnector.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/WebsocketConnector.java index f5f2cb01e7..83f55bb501 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/events/WebsocketConnector.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/WebsocketConnector.java @@ -142,7 +142,7 @@ public void close() { * @param userKey */ public static void pingClient(String userKey, String eventConnId) { - List conns = ServerEventManager.getEvQueueList(); + List conns = ServerEventManager.getLocalEventQueues(); for (ServerEventQueue seq : conns) { // need to notify clients that are affected by update if (seq.getConnID().equals(eventConnId) || seq.getUserKey().equals(userKey)) { @@ -164,7 +164,7 @@ public static void pingClient(String userKey, String eventConnId) { */ private void updateClientConnections(String type, String channelID, String userKey) { // List conns = ServerEventManager.getEvQueueList(); - List conns = ServerEventManager.getAllServerEvQueueList(); + List conns = ServerEventManager.getAllEventQueue(); for (ServerEventQueue seq : conns) { // need to notify clients that are affected by update if (seq.getChannel().equals(channelID) || seq.getUserKey().equals(userKey)) { diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java b/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java index 1f3125a213..cbe2db4c9b 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java @@ -3,6 +3,7 @@ */ package edu.caltech.ipac.firefly.server.query; +import edu.caltech.ipac.firefly.core.Util; import edu.caltech.ipac.firefly.server.ServCommand; import edu.caltech.ipac.firefly.server.db.DuckDbReadable; import edu.caltech.ipac.firefly.server.util.Logger; @@ -84,7 +85,7 @@ abstract public class EmbeddedDbProcessor implements SearchProcessor, SearchProcessor.CanGetDataFile, SearchProcessor.CanFetchDataGroup, Job.Worker { private static final Logger.LoggerImpl logger = Logger.getLogger(); - private static final SynchronizedAccess GET_DATA_CHECKER = new SynchronizedAccess(); + private static final Util.SynchronizedAccess GET_DATA_CHECKER = new Util.SynchronizedAccess(); private Job job; public void setJob(Job job) { @@ -124,7 +125,7 @@ public DataGroupPart getData(ServerRequest request) throws DataAccessException { // make sure multiple requests for the same data waits for the first one to create before accessing. String uniqueID = this.getUniqueID(request); - var release = GET_DATA_CHECKER.lock(uniqueID); + var locked = GET_DATA_CHECKER.lock(uniqueID); try { var dbAdapter = getDbAdapter(treq); jobExecIf(v -> v.progress(10, "fetching data...")); @@ -170,7 +171,7 @@ public DataGroupPart getData(ServerRequest request) throws DataAccessException { logger.error(e); throw e; } finally { - release.run(); + locked.unlock(); } } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/query/SearchProcessor.java b/src/firefly/java/edu/caltech/ipac/firefly/server/query/SearchProcessor.java index d9024a01ac..6db288dc42 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/query/SearchProcessor.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/query/SearchProcessor.java @@ -24,8 +24,6 @@ import java.io.OutputStream; import java.util.List; import java.util.SortedSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import static edu.caltech.ipac.firefly.data.TableServerRequest.FF_SESSION_ID; import static edu.caltech.ipac.util.StringUtils.applyIfNotEmpty; @@ -103,28 +101,4 @@ interface CanGetDataFile { File getDataFile(TableServerRequest request) throws IpacTableException, IOException, DataAccessException; } - class SynchronizedAccess { - private final ConcurrentHashMap activeRequests = new ConcurrentHashMap<>(); - - /** - * Acquires a lock associated with the given ID. If the lock does not already exist, it is created. - * - * @param id the identifier for the lock - * @return a {@code Runnable} that, when executed, releases the lock and removes it from the active requests - */ - public Runnable lock(String id) { - ReentrantLock lock = activeRequests.computeIfAbsent(id, k -> new ReentrantLock()); - Logger.getLogger().trace("waiting %s: %s\n".formatted(id, lock)); - lock.lock(); - Logger.getLogger().trace("got lock %s: %s\n".formatted(id, lock)); - return () -> { - try { - lock.unlock(); // Ensure lock is released even if an exception occurs - } finally { - if (!lock.isLocked()) activeRequests.remove(id); // Remove the lock from activeRequests if no threads are using it - Logger.getLogger().trace("unlock %s: %s\n".formatted(id, lock)); - } - }; - } - } } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java b/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java index 1e3c162bf7..5d2bb97d82 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java @@ -21,11 +21,10 @@ public class Name implements Serializable { public static final Name EVT_CONN_EST = new Name("EVT_CONN_EST", "Event connection established. Along with this event, you can expect connID and channel in the event's data. ie. {connID: val, channel: val}"); - public static final Name REPORT_USER_ACTION = new Name("REPORT_USER_ACTION", "report a user response"); - public static final Name ACTION = new Name("FLUX_ACTION", "an action message."); + public static final Name REPORT_USER_ACTION = new Name("REPORT_USER_ACTION", "report a user response"); + public static final Name ACTION = new Name("FLUX_ACTION", "an action message."); public static final Name PING = new Name("PING", "keepalive ping"); - private final String _name; private final String _desc; @@ -69,6 +68,11 @@ else if (other!=null && other instanceof Name) { * @return */ public static Name parse(String name) { + if (name==null) return null; + if (EVT_CONN_EST.getName().equals(name)) return EVT_CONN_EST; + if (REPORT_USER_ACTION.getName().equals(name)) return REPORT_USER_ACTION; + if (ACTION.getName().equals(name)) return ACTION; + if (PING.getName().equals(name)) return PING; return new Name(name, "unknown"); } } diff --git a/src/firefly/java/edu/caltech/ipac/table/io/TableParseHandler.java b/src/firefly/java/edu/caltech/ipac/table/io/TableParseHandler.java index 7d04669338..b21d85c9b0 100644 --- a/src/firefly/java/edu/caltech/ipac/table/io/TableParseHandler.java +++ b/src/firefly/java/edu/caltech/ipac/table/io/TableParseHandler.java @@ -22,8 +22,7 @@ import static edu.caltech.ipac.firefly.server.db.DuckDbAdapter.addRow; import static edu.caltech.ipac.firefly.server.db.EmbeddedDbUtil.colIdxWithArrayData; import static edu.caltech.ipac.firefly.core.Util.serialize; -import static edu.caltech.ipac.util.StringUtils.tryIt; - +import static edu.caltech.ipac.firefly.core.Util.Try; /** * Date: 10/23/24 * @@ -139,10 +138,9 @@ public void data(Object[] row) throws IOException { } public void end() { - tryIt(() -> { if (conn != null) conn.commit(); }); - tryIt(() -> { if (appender != null) appender.close(); }); - appender = null; - tryIt(() -> { if (conn != null) conn.close(); }); + if (conn != null) Try.it(() -> conn.commit()); + if (appender != null) Try.it(() -> appender.close()); + if (conn != null) Try.it(() -> conn.close()); } } } diff --git a/src/firefly/java/edu/caltech/ipac/util/StringUtils.java b/src/firefly/java/edu/caltech/ipac/util/StringUtils.java index d8b1b9179b..7ef108e948 100644 --- a/src/firefly/java/edu/caltech/ipac/util/StringUtils.java +++ b/src/firefly/java/edu/caltech/ipac/util/StringUtils.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -721,83 +720,6 @@ public static void applyIfNotEmpty(T v, Consumer f){ if (!isEmpty(v)) f.accept(v); } - /** - * A supplier that throws exception - * @param the return type of this function - */ - public interface FuncWithEx { - R get() throws Exception; - } - - /** - * @param func a function to execute - * @return the value from executing this func. Null if func produces exception - */ - public static R getSafe(FuncWithEx func) { - return getSafe(func, null); - } - - /** - * @param func a function to execute - * @param defaultVal return defaultVal if func produces an exception or the value is null - * @return the value from executing this func - */ - public static R getSafe(FuncWithEx func, R defaultVal) { - try { - R v = func.get(); - return v == null ? defaultVal : v; - } catch (Exception e) { - return defaultVal; - } - } - - /** - * A supplier that throws exception - */ - public interface TryWithEx { - void run() throws Exception; - } - - public static void tryIt(TryWithEx func) { - try { - func.run(); - } catch (Exception e) { - logger.warn("tryIf failed with:" + e.getMessage()); - } - } - - /** - * Execute the given function and return the value if it passes test - * @param func the function to execute - * @param test test the returned value - * @param defaultValue returns when encountering exception or test fail - * @return the value of func if it passes test. otherwise, return null - */ - public static R getIf(FuncWithEx func, Predicate test, R defaultValue) { - try { - var result = func.get(); - if (test.test(result)) return result; - } catch (Exception e) { - logger.info("returning (%s) because %s failed: %s".formatted(defaultValue, func.toString(), e.getMessage())); - } - return defaultValue; - } - - /** - * Execute the given function and return the value if it passes test - * @param func the function to execute - * @param test test the returned value - * @param tries the number of times to try - * @return the value of func if it passes test. otherwise, return null - */ - public static R getIf(FuncWithEx func, Predicate test, int tries) { - for (int i = 0; i < tries; i++) { - R v = getIf(func, test, null); - if (v != null) return v; - } - return null; - } - /** * Return a URL if the input can be successfully parsed as a URL; otherwise, return null. * @param urlString URL string diff --git a/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java b/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java index b8e340c893..0a5d1d8a93 100644 --- a/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java +++ b/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java @@ -8,9 +8,9 @@ import edu.caltech.ipac.firefly.server.util.Logger; import edu.caltech.ipac.util.AppProperties; import org.apache.logging.log4j.Level; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import redis.clients.jedis.Jedis; @@ -28,17 +28,17 @@ */ public class RedisServiceTest extends ConfigTest { - @Before - public void setup() { + @BeforeClass + public static void setup() { RedisService.connect(); if (RedisService.isOffline()) { System.out.println("Messenger is offline; skipping test."); } - if (true) Logger.setLogLevel(Level.TRACE); // for debugging. + if (false) Logger.setLogLevel(Level.TRACE); // for debugging. } - @After - public void teardown() { + @AfterClass + public static void teardown() { RedisService.disconnect(); LOG.trace("tear down"); } diff --git a/src/firefly/test/edu/caltech/ipac/firefly/core/UtilTest.java b/src/firefly/test/edu/caltech/ipac/firefly/core/UtilTest.java new file mode 100644 index 0000000000..5162c60a65 --- /dev/null +++ b/src/firefly/test/edu/caltech/ipac/firefly/core/UtilTest.java @@ -0,0 +1,179 @@ +/* + * License information at https://github.com/Caltech-IPAC/firefly/blob/master/License.txt + */ + +package edu.caltech.ipac.firefly.core; + +/** + * Date: 12/20/24 + * + * @author loi + * @version : $ + */ +import edu.caltech.ipac.firefly.ConfigTest; +import edu.caltech.ipac.firefly.server.util.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.junit.Assert.*; +import static edu.caltech.ipac.firefly.core.Util.*; + +public class UtilTest extends ConfigTest { + + @Test + public void arrayOfBoolean() { + Object in = new Boolean[]{true, false, true}; + String s = serialize(in); + Object d = deserialize(s); + if (d instanceof Boolean[] v) { + Assert.assertArrayEquals(v, (Boolean[]) in); + } else Assert.fail("Deserialized type Boolean mismatch"); + } + + @Test + public void arrayOfDouble() { + Object in = new Double[]{1.0, 2.0, 3.0}; + String s = serialize(in); + Object d = deserialize(s); + if (d instanceof Double[] v) { + Assert.assertArrayEquals(v, (Double[]) in); + } else Assert.fail("Deserialized type Double mismatch"); + } + + @Test + public void arrayOfInt() { + Object in = new Integer[]{1, 2, 3}; + String s = serialize(in); + Object d = deserialize(s); + if (d instanceof Integer[] v) { + Assert.assertArrayEquals(v, (Integer[])in); + } else Assert.fail("Deserialized type Integer mismatch"); + } + + @Test + public void serializeValidObject() { + String original = "testString"; + String serialized = serialize(original); + assertNotNull(serialized); + } + + @Test + public void serializeNullObject() { + String serialized = serialize(null); + assertNull(serialized); + } + + @Test + public void deserializeValidString() { + String original = "testString"; + String serialized = serialize(original); + Object deserialized = deserialize(serialized); + assertEquals(original, deserialized); + } + + @Test + public void deserializeInvalidString() { + Object deserialized = deserialize("invalidBase64"); + assertNull(deserialized); + } + + @Test + public void deserializeNullString() { + Object deserialized = deserialize(null); + assertNull(deserialized); + } + + @Test + public void tryItFuncWithExSuccess() { + assertEquals("success", Try.it(() -> "success").getOrElse("default")); + } + + @Test + public void tryItFuncWithExFailure() { + Try result = Try.it(() -> { throw new Exception("failure"); }); + assertEquals("default", result.getOrElse("default")); + } + + @Test + public void tryItCallWithExSuccess() { + Try result = Try.it(() -> {}); + assertNull(result.get()); + } + + @Test + public void tryItCallWithExFailure() { + Try result = Try.it(() -> { throw new Exception("failure"); }); + assertNull(result.getOrElse((e) -> Logger.getLogger().trace("error"))); + } + + @Test + public void tryItFuncParamWithExSuccess() { + Try result = Try.it((param) -> param, "success"); + assertEquals("success", result.getOrElse("default")); + } + + @Test + public void tryItFuncParamWithExFailure() { + Try result = Try.it((param) -> { throw new Exception("failure"); }, "param"); + assertEquals("default", result.getOrElse("default")); + } + + @Test + public void tryUntilSuccess() { + AtomicInteger count = new AtomicInteger(); + Try result = Try.until(count::getAndIncrement, c -> c == 3, 5); + assertEquals(3, result.get().intValue()); + } + + @Test + public void testSynchronizedAccess() throws InterruptedException { + ArrayList even = new ArrayList<>(); + ArrayList odd = new ArrayList<>(); + + var locker = new Util.SynchronizedAccess(); + Function setResults = (q) -> { + long start = System.currentTimeMillis(); + var locked = locker.lock(q); + try { + Thread.sleep(1_000); + } catch (InterruptedException e) {} + finally { + locked.unlock(); + } + return System.currentTimeMillis() - start; + }; + + int ntimes = 10; + var p = Executors.newFixedThreadPool(ntimes); // when all threads start at the same time, all be blocked. + for(int i = 0; i < ntimes; i++) { + long a = i % 2; + p.submit(() -> { + if (a == 0 ) { + even.add(Math.round(setResults.apply("even")/1000.0)); + } else { + odd.add(Math.round(setResults.apply("odd")/1000.0)); + } + }); + } + p.shutdown(); + if (!p.awaitTermination(10, TimeUnit.SECONDS)) { + System.out.println("Not all tasks completed in time."); + } +// even.forEach(System.out::println); +// odd.forEach(System.out::println); + + assertEquals(ntimes/2, (long) Collections.max(even)); + assertEquals(1L, (long)Collections.min(even)); + assertEquals(ntimes/2, (long)Collections.max(odd)); + assertEquals(1L, (long)Collections.min(odd)); + } + + +} \ No newline at end of file diff --git a/src/firefly/test/edu/caltech/ipac/firefly/messaging/MessengerTest.java b/src/firefly/test/edu/caltech/ipac/firefly/messaging/MessengerTest.java index 9927e4ae21..8b5e068b82 100644 --- a/src/firefly/test/edu/caltech/ipac/firefly/messaging/MessengerTest.java +++ b/src/firefly/test/edu/caltech/ipac/firefly/messaging/MessengerTest.java @@ -42,7 +42,7 @@ public void setup() { System.out.println("Messenger is offline; skipping test."); isOffline = true; } - if (true) Logger.setLogLevel(Level.TRACE); // for debugging. + if (false) Logger.setLogLevel(Level.TRACE); // for debugging. } @After diff --git a/src/firefly/test/edu/caltech/ipac/table/DuckDbAdapterTest.java b/src/firefly/test/edu/caltech/ipac/table/DuckDbAdapterTest.java index 3e6703200f..037c6766ca 100644 --- a/src/firefly/test/edu/caltech/ipac/table/DuckDbAdapterTest.java +++ b/src/firefly/test/edu/caltech/ipac/table/DuckDbAdapterTest.java @@ -16,7 +16,6 @@ import edu.caltech.ipac.firefly.server.query.DecimationProcessor; import edu.caltech.ipac.firefly.server.query.EmbeddedDbProcessor; import edu.caltech.ipac.firefly.server.query.SearchManager; -import edu.caltech.ipac.firefly.server.query.SearchProcessor; import edu.caltech.ipac.firefly.server.query.tables.IpacTableFromSource; import edu.caltech.ipac.firefly.server.util.Logger; import edu.caltech.ipac.firefly.util.FileLoader; @@ -29,13 +28,8 @@ import org.junit.Test; import java.io.File; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -253,49 +247,6 @@ public void testCleanup() throws DataAccessException { assertFalse("DuckDb file is deleted", dbAdapter.getDbFile().exists()); } - @Test - public void testSynchronizedAccess() throws InterruptedException { - ArrayList even = new ArrayList<>(); - ArrayList odd = new ArrayList<>(); - - var locker = new SearchProcessor.SynchronizedAccess(); - Function setResults = (q) -> { - long start = System.currentTimeMillis(); - var release = locker.lock(q); - try { - Thread.sleep(1_000); - } catch (InterruptedException e) {} - finally { - release.run(); - } - return System.currentTimeMillis() - start; - }; - - int ntimes = 10; - var p = Executors.newFixedThreadPool(ntimes); // when all threads start at the same time, all be blocked. - for(int i = 0; i < ntimes; i++) { - long a = i % 2; - p.submit(() -> { - if (a == 0 ) { - even.add(Math.round(setResults.apply("even")/1000.0)); - } else { - odd.add(Math.round(setResults.apply("odd")/1000.0)); - } - }); - } - p.shutdown(); - if (!p.awaitTermination(10, TimeUnit.SECONDS)) { - System.out.println("Not all tasks completed in time."); - } -// even.forEach(System.out::println); -// odd.forEach(System.out::println); - - assertEquals(ntimes/2, (long)Collections.max(even)); - assertEquals(1L, (long)Collections.min(even)); - assertEquals(ntimes/2, (long)Collections.max(odd)); - assertEquals(1L, (long)Collections.min(odd)); - } - @Test public void testLikeSubstitution() { // replace uppercase LIKE diff --git a/src/firefly/test/edu/caltech/ipac/table/EmbeddedDbUtilTest.java b/src/firefly/test/edu/caltech/ipac/table/EmbeddedDbUtilTest.java index 2e0a6aba5b..b1d800a8b3 100644 --- a/src/firefly/test/edu/caltech/ipac/table/EmbeddedDbUtilTest.java +++ b/src/firefly/test/edu/caltech/ipac/table/EmbeddedDbUtilTest.java @@ -308,31 +308,4 @@ public void testAddColumnErrors() { Assert.assertEquals("TABLE out-of-sync; Reload table to resume", e.getMessage()); } } - - @Test - public void testSerializer() { - // array of boolean - Object in = new Boolean[]{true, false, true}; - String s = Util.serialize(in); - Object d = Util.deserialize(s); - if (d instanceof Boolean[] v) { - Assert.assertArrayEquals(v, (Boolean[])in); - } else Assert.fail("Deserialized type Boolean mismatch"); - - // array of double - in = new Double[]{1.0, 2.0, 3.0}; - s = Util.serialize(in); - d = Util.deserialize(s); - if (d instanceof Double[] v) { - Assert.assertArrayEquals(v, (Double[])in); - } else Assert.fail("Deserialized type Double mismatch"); - - // array of integer - in = new Integer[]{1, 2, 3}; - s = Util.serialize(in); - d = Util.deserialize(s); - if (d instanceof Integer[] v) { - Assert.assertArrayEquals(v, (Integer[])in); - } else Assert.fail("Deserialized type Integer mismatch"); - } } \ No newline at end of file diff --git a/src/firefly/test/edu/caltech/ipac/util/cache/CachePeerProviderFactoryTest.java b/src/firefly/test/edu/caltech/ipac/util/cache/CachePeerProviderFactoryTest.java index d9748286c5..c24ab4ac26 100644 --- a/src/firefly/test/edu/caltech/ipac/util/cache/CachePeerProviderFactoryTest.java +++ b/src/firefly/test/edu/caltech/ipac/util/cache/CachePeerProviderFactoryTest.java @@ -87,7 +87,7 @@ public static void setUp() throws InterruptedException { System.out.println("Messenger is offline; skipping all tests in CachePeerProviderFactoryTest."); return; } - if (true) Logger.setLogLevel(Level.TRACE); // for debugging. + if (false) Logger.setLogLevel(Level.TRACE); // for debugging. LOG.debug("Initial setup for PubSub, creating 3 peers"); peer1 = createCM("PubSub", "peer1"); diff --git a/src/firefly/test/edu/caltech/ipac/util/cache/CacheTest.java b/src/firefly/test/edu/caltech/ipac/util/cache/CacheTest.java index 09eee5d751..324cdcaae9 100644 --- a/src/firefly/test/edu/caltech/ipac/util/cache/CacheTest.java +++ b/src/firefly/test/edu/caltech/ipac/util/cache/CacheTest.java @@ -5,9 +5,11 @@ package edu.caltech.ipac.util.cache; import edu.caltech.ipac.firefly.ConfigTest; +import edu.caltech.ipac.firefly.core.RedisService; import edu.caltech.ipac.firefly.data.userdata.UserInfo; import edu.caltech.ipac.firefly.server.util.Logger; import org.apache.logging.log4j.Level; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -28,10 +30,15 @@ public class CacheTest extends ConfigTest { @BeforeClass public static void setUp() throws InterruptedException { + RedisService.connect(); setupServerContext(null); - if (true) Logger.setLogLevel(Level.TRACE); // for debugging. + if (false) Logger.setLogLevel(Level.TRACE); // for debugging. } + @AfterClass + public static void tearDown() { + RedisService.disconnect(); + } @Test public void localCache() {