diff --git a/buildScript/dependencies.gradle b/buildScript/dependencies.gradle
index ca5d6c4ab9..b82a7659eb 100644
--- a/buildScript/dependencies.gradle
+++ b/buildScript/dependencies.gradle
@@ -52,8 +52,8 @@ dependencies {
implementation ('com.googlecode.json-simple:json-simple:1.1.1') { transitive = false }
- // jedis
- implementation ('redis.clients:jedis:3.1.0') { exclude group: 'org.slf4j' }
+ // jedis; support redis versions: 5.0 to 7.2 Family of releases
+ implementation ('redis.clients:jedis:4.4.8') { exclude group: 'org.slf4j' }
// ehcache. exclude core because it contains duplicated older CacheManager
implementation ('net.sf.ehcache:ehcache:2.7.4')
@@ -98,6 +98,9 @@ dependencies {
// duckdb - SQL support for parquet, csv, and tsv files
implementation 'org.duckdb:duckdb_jdbc:1.1.3'
+
+ // embedded redis server; version ~ 6.2
+ implementation 'com.github.codemonstur:embedded-redis:1.4.3'
}
diff --git a/config/ehcache.xml b/config/ehcache.xml
index bc21a18aa2..4741c2cb39 100644
--- a/config/ehcache.xml
+++ b/config/ehcache.xml
@@ -1,5 +1,5 @@
The class abstracts the complexity of connecting to Redis, allowing other components of + * the application to interact with Redis without worrying about low-level connection details.
+ * + *Common use cases include checking the availability of the Redis server, monitoring + * connection health, and ensuring proper connectivity before performing Redis operations.
+ * + * Date: 2024-11-18 + * + * @author loi + * @version $Id: $ + */ +public class RedisService { + public enum Status {ONLINE, FAIL_TO_CONNECT, OFFLINE}; + public static final String REDIS_HOST = "redis.host"; + public static final String MAX_POOL_SIZE = "redis.max.poolsize"; + private static final int REDIS_PORT = AppProperties.getIntProperty("redis.port", 6379); + private static final String MAX_MEM = AppProperties.getProperty("redis.max.mem", "128M"); + private static final String DB_DIR = AppProperties.getProperty("redis.db.dir", System.getProperty("java.io.tmpdir")); + private static final String REDIS_PASSWORD = getRedisPassword(); + private static final Logger.LoggerImpl LOG = Logger.getLogger(); + + // message broker.. Jedis + private static final String redisHost = AppProperties.getProperty(REDIS_HOST, "localhost"); + public static final int maxPoolSize = AppProperties.getIntProperty(MAX_POOL_SIZE, 100); + private static JedisPool jedisPool; + private static Status status = Status.OFFLINE; + + private static String getRedisPassword() { + String passwd = System.getenv("REDIS_PASSWORD"); + if (passwd == null) passwd = AppProperties.getProperty("REDIS_PASSWORD"); + return passwd; + } + + static { + connect(); + } + + static JedisPool createJedisPool() { + try { + JedisPoolConfig pconfig = new JedisPoolConfig(); + pconfig.setTestOnBorrow(true); + pconfig.setMaxTotal(maxPoolSize); + pconfig.setBlockWhenExhausted(true); // wait; if needed + pconfig.setMaxWait(Duration.of(5, ChronoUnit.SECONDS)); + JedisPool pool = new JedisPool(pconfig, redisHost, REDIS_PORT, Protocol.DEFAULT_TIMEOUT, REDIS_PASSWORD); + pool.getResource().close(); // test connection + return pool; + } catch(Exception ignored) {} + return null; + } + + static void startLocal() { + try { + RedisServer redisServer = RedisServer.newRedisServer() + .port(REDIS_PORT) + .setting("maxmemory %s".formatted(MAX_MEM)) + .setting("dir %s".formatted(DB_DIR)) // Directory where redis database files are stored + .setting("dbfilename redis.rdb") // RDB file name + .setting("save 600 1") // RDB file name + .build(); + redisServer.start(); + connect(); + } catch (IOException ignored) {} + } + + public static void connect() throws RuntimeException { + if (jedisPool != null && !jedisPool.isClosed()) { + status = Status.ONLINE; + return; + } + + jedisPool = createJedisPool(); + if (jedisPool == null && redisHost.equals("localhost")) { + // can't connect; will start up embedded version if localhost + startLocal(); + jedisPool = createJedisPool(); + } + if (jedisPool == null) { + LOG.error("Unable to connect to Redis at " + redisHost + ":" + REDIS_PORT); + status = Status.FAIL_TO_CONNECT; + } else { + status = ONLINE; + } + } + + public static void disconnect() { + status = OFFLINE; + if (jedisPool != null) { + jedisPool.close(); + jedisPool = null; + } + } + + public static Status getStatus() { return status; } + + public static Map+ * This method converts the given object into a byte stream, + * encodes the byte stream into a Base64 string, and returns the result. + * The object must implement {@link java.io.Serializable} for this method to work. + *
+ * @param obj the object to serialize; must implement {@link java.io.Serializable} + * @return a Base64-encoded string representing the serialized object, or null + */ + public static String serialize(Object obj) { + if (obj == null) return null; + try { + ByteArrayOutputStream bstream = new ByteArrayOutputStream(); + ObjectOutputStream ostream = new ObjectOutputStream(bstream); + ostream.writeObject(obj); + ostream.flush(); + byte[] bytes = bstream.toByteArray(); + return Base64.getEncoder().encodeToString(bytes); + } catch (Exception e) { + logger.warn(e); + return null; + } + } + + /** + * Deserializes a Base64-encoded string into a Java object. + *+ * This method decodes the provided Base64 string into a byte stream, + * then reconstructs the original object using Java's object serialization mechanism. + *
+ * @param base64 the Base64-encoded string representing the serialized object + * @return the deserialized Java object, or null. + */ + public static Object deserialize(String base64) { + try { + if (base64 == null) return null; + byte[] bytes = Base64.getDecoder().decode(base64); + ByteArrayInputStream bstream = new ByteArrayInputStream(bytes); + ObjectInputStream ostream = new ObjectInputStream(bstream); + return ostream.readObject(); + } catch (Exception e) { + logger.warn(e); + return null; + } + } + + +//==================================================================== +// Functional Helpers +//==================================================================== + + /** + * A function that throws exception + * @param {
+ T apply(P p) throws Exception;
+ }
+
+ public static class Try Try func, P param) {
+ try {
+ return new Try<>(func.apply(param), null);
+ } catch (Exception e) {
+ return new Try<>(null, e);
+ }
+ }
+
+ public static Example usage:
+ * Date: Nov 18, 2024
+ *
+ * @author loi
+ * @version $Id: EhcacheImpl.java,v 1.8 2009/12/16 21:43:25 loi Exp $
+ */
+public class DistribMapCache extends DistributedCache {
+ String mapKey;
+ long lifespanInSecs;
+
+ public DistribMapCache(String mapKey) {
+ this(mapKey, 0);
+ }
+
+ public DistribMapCache(String mapKey, long lifespanInSecs) {
+ this.mapKey = mapKey;
+ this.lifespanInSecs = lifespanInSecs;
+ }
+
+//====================================================================
+// override for Redis Map implementation
+//====================================================================
+
+ String get(Jedis redis, String key) {
+ return redis.hget(mapKey, key);
+ }
+
+ void del(Jedis redis, String key) {
+ redis.hdel(mapKey, key);
+ }
+
+ void set(Jedis redis, String key, String value) {
+ boolean exists = redis.exists(mapKey);
+ redis.hset(mapKey, key, value);
+ if (!exists && lifespanInSecs > 0) {
+ redis.expire(mapKey, lifespanInSecs); // set only when creating a new map; setting here instead of hset to accommodate older version of redis.
+ }
+ }
+
+ void setex(Jedis redis, String key, String value, long lifespanInSecs) {
+ throw new IllegalArgumentException("Cannot set expiry on individual key. Do it as Map");
+ }
+
+ List
+ * While Redis supports various data structures such as lists, sets, and maps,
+ * this implementation is designed to focus solely on string-based storage.
+ * In some cases, the stored strings may represent JSON-formatted data to encapsulate
+ * more complex data structures.
+ *
+ * For storage of plain objects beyond simple strings, this implementation
+ * uses Base64 encoding. Objects are serialized into Base64-encoded strings
+ * for storage and are deserialized back into objects when retrieved from the cache.
+ * This approach ensures compatibility with Redis's string data type while
+ * maintaining flexibility for handling diverse data types.
+ *
+ * Date: Nov 18, 2024
+ * @author loi
+ * @version $Id: EhcacheImpl.java,v 1.8 2009/12/16 21:43:25 loi Exp $
+ */
+public class DistributedCache implements Cache {
+ static final Logger.LoggerImpl LOG = Logger.getLogger();
+ private static final String BASE64 = "BASE64::";
+
+ public void put(CacheKey key, Object value) {
+ put(key, value, 0);
+ }
+
+ public void put(CacheKey key, Object value, int lifespanInSecs) {
+ String keystr = key.getUniqueString();
+ try(Jedis redis = RedisService.getConnection()) {
+ if (redis != null) {
+ if (value == null) {
+ del(redis, keystr);
+ } else {
+ if (lifespanInSecs > 0) {
+ setex(redis, keystr, v2set(value), lifespanInSecs);
+ } else {
+ set(redis, keystr, v2set(value));
+ }
+ }
+ }
+ } catch (Exception ex) { LOG.error(ex); }
+ }
+
+ public Object get(CacheKey key) {
+ try(Jedis redis = RedisService.getConnection()) {
+ return v2get( get(redis, key.getUniqueString()) );
+ } catch (Exception ex) { LOG.error(ex); }
+ return null;
+ }
+
+ public boolean isCached(CacheKey key) {
+ try(Jedis redis = RedisService.getConnection()) {
+ return exists(redis, key.getUniqueString());
+ } catch (Exception ex) { LOG.error(ex); }
+ return false;
+ }
+
+ public List
+ * {@code
+ * ServerEvent serverEvent = new ServerEvent(name, scope, data);
+ * Message.Event eventMessage = new Message.Event(serverEvent);
+ * }
+ *
+ **/
+ public static final class Event extends Message {
+ public static final String TOPIC = "firefly-events";
+
+ public Event(ServerEvent se) {
+ setTopic(TOPIC);
+ setValue(se.getName().getName(), "name");
+ applyIfNotEmpty(se.getTarget().getScope(), (s) -> setValue(s.name(), "target", "scope"));
+ applyIfNotEmpty(se.getTarget().getChannel(), (s) -> setValue(s, "target", "channel"));
+ applyIfNotEmpty(se.getTarget().getConnID(), (s) -> setValue(s, "target", "connID"));
+ applyIfNotEmpty(se.getTarget().getUserKey(), (s) -> setValue(s, "target", "userKey"));
+ setValue(se.getDataType().name(), "dataType");
+ setValue(se.getData(), "data");
+ setValue(se.getFrom(), "from");
+ }
+ public static ServerEvent parse(Message msg) {
+ try {
+ Name name = Name.parse(msg.getValue(null, "name"));
+ if (name == null) return null;
+
+ Scope scope = Scope.valueOf(msg.getValue(null, "target", "scope"));
+ String connID = msg.getValue(null, "target", "connID");
+ String channel = msg.getValue(null, "target", "channel");
+ String userKey = msg.getValue(null, "target", "userKey");
+ DataType dtype = DataType.valueOf(msg.getValue(null, "dataType"));
+ Serializable data = msg.getValue(null, "data");
+ String from = msg.getValue(null, "from");
+ return new ServerEvent(name, new EventTarget(scope, connID, channel, userKey), dtype, data, from);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }
+
+ public static final class JobCompleted extends Message {
+ public static final String TOPIC = "JobCompleted";
+ static final String FROM = AppProperties.getProperty("mail.smtp.from", "donotreply@ipac.caltech.edu");
+ JobInfo job;
+ public JobCompleted(JobInfo jobInfo) {
+ job = jobInfo;
+ setTopic(TOPIC);
+ setHeader(Scope.CHANNEL, jobInfo.getOwner(), TOPIC, FROM);
+ setValue(JobManager.toJsonObject(jobInfo), "jobInfo");
+ var user = ServerContext.getRequestOwner().getUserInfo();
+ if (user != null) {
+ setValue(user.getName(), "user", "name");
+ setValue(user.getEmail(), "user", "email");
+ setValue(user.getLoginName(), "user", "loginName");
+ }
+ var ssoAdpt = ServerContext.getRequestOwner().getSsoAdapter();
+ if (ssoAdpt != null) {
+ setValue(ssoAdpt.getAuthToken(), "user", "authToken");
+ }
+ }
+
+ public MsgHeader getHeader() {
+ Scope scope = Scope.valueOf(getValue(Scope.SELF.name(), "header", "scope"));
+ String to = getValue("", "header", "to");
+ String from = getValue("", "header", "from");
+ String subject = getValue("", "header", "subject");
+ MsgHeader header = new MsgHeader(scope, to, subject);
+ header.setFrom(from);
+ return header;
+ }
+
+ public void setHeader(Scope scope, String to, String subject, String from) {
+ setValue(scope.name(), "header", "scope");
+ if (to != null) setValue(to, "header", "to");
+ if (from != null) setValue(from, "header", "from");
+ if (subject != null) setValue(subject, "header", "subject");
+ }
+ }
+
}
\ No newline at end of file
diff --git a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java
index f02fda0690..8fd31ecf5b 100644
--- a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java
+++ b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java
@@ -4,25 +4,20 @@
package edu.caltech.ipac.firefly.messaging;
+import edu.caltech.ipac.firefly.core.RedisService;
import edu.caltech.ipac.firefly.server.util.Logger;
-import edu.caltech.ipac.util.AppProperties;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
-import redis.clients.jedis.Protocol;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static edu.caltech.ipac.util.StringUtils.applyIfNotEmpty;
/**
* An implementation of publish-subscribe messaging pattern based on Jedis client and Redis backend.
- * This class abstract the use of threads and it ensures that there is only 1 thread used per topic.
- *
+ * This class abstract the use of threads, and it ensures that there is only 1 thread used per topic.
* Topic refer here is equivalent to jedis 'channel'. This is so it will not be confused with channel
* used in Scope.
*
@@ -33,117 +28,38 @@
* @version $Id: $
*/
public class Messenger {
- public enum Status {CONNECTED, FAIL_TO_CONNECT, NOT_USING};
- private static final String REDIS_HOST = AppProperties.getProperty("redis.host", "127.0.0.1");
- private static final int REDIS_PORT = AppProperties.getIntProperty("redis.port", 6379);
- private static final int MAX_POOL_SIZE = AppProperties.getIntProperty("redis.max.poolsize", 25);
- private static final String REDIS_PASSWORD = getRedisPassword();
- private static final Logger.LoggerImpl LOG = Logger.getLogger();
-
- // message broker.. Jedis
- private static JedisPool jedisPool;
- private static Status status = Status.NOT_USING;
+ private static Logger.LoggerImpl LOG = Logger.getLogger();
// to limit one thread per topic
private static ConcurrentHashMap