Skip to content

Commit

Permalink
convert events to redis sub/pub
Browse files Browse the repository at this point in the history
  • Loading branch information
loitly committed Dec 21, 2024
1 parent 5d90345 commit 95d27b1
Show file tree
Hide file tree
Showing 27 changed files with 558 additions and 287 deletions.
36 changes: 26 additions & 10 deletions src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ static void startLocal() {
.setting("maxmemory %s".formatted(MAX_MEM))
.setting("dir %s".formatted(DB_DIR)) // Directory where redis database files are stored
.setting("dbfilename redis.rdb") // RDB file name
.setting("save 600 1") // RDB file name
.build();
redisServer.start();
connect();
Expand Down Expand Up @@ -138,27 +139,42 @@ public static Map<String, Object> getStats() {
} catch (NoSuchAlgorithmException e) {/* ignore */}
try (Jedis redis = getConnection()) {

var infos = redis.info().split("\r\n");
Arrays.stream(infos).filter(s -> s.contains("version")).findFirst()
.ifPresent(s -> stats.put("version", s.split(":")[1].trim()));

stats.put("active conn", jedisPool.getNumActive());
stats.put("idle conn", jedisPool.getNumIdle());
stats.put("max conn", maxPoolSize);
stats.put("max-wait", jedisPool.getMaxBorrowWaitTimeMillis());
stats.put("avg-wait", jedisPool.getMeanBorrowWaitTimeMillis());
stats.put("cache-size", redis.dbSize());
stats.put("DB_DIR", DB_DIR);
stats.put("MAX_MEM", MAX_MEM);
stats.put("password", passwd);

Arrays.stream(redis.info().split("\r\n")).forEach((s) -> {
if (!s.startsWith("#")) {
String[] kv = s.split(":");
stats.put(kv[0], kv[1]);
}
});
stats.put("db-size", redis.dbSize());
addStat(stats, redis, "maxmemory");
addStat(stats, redis, "save");
addStat(stats, redis, "dir");
addStat(stats, redis, "dbfilename");
addStat(stats, redis, "appendfilename");
stats.put("---MEMORY STATS----", "");
var mem = redis.memoryStats();
stats.put("Total memory used", mem.get("dataset.bytes"));
stats.put("Total memory allocated", mem.get("allocator.allocated"));
stats.put("Fragmented memory", mem.get("fragmentation"));
stats.put("Fragmentation ratio", mem.get("allocator-fragmentation.ratio"));
stats.put("Number of keys stored", mem.get("keys.count"));
stats.put("Avg per key", mem.get("keys.bytes-per-key"));
stats.put("Pct of memory used", mem.get("dataset.percentage"));
stats.put("Peak memory used", mem.get("peak.allocated"));
} catch (Exception ignored) {}
}
return stats;
}

public static void addStat(Map<String, Object> stats, Jedis redis, String key) {
var c = redis.configGet(key);
if (c.size() > 1) stats.put(key, c.get(1));
}

public static String getRedisHostPortDesc() {
return redisHost + ":" + REDIS_PORT + " ("+ getStatus() + ")";
}
Expand Down
130 changes: 130 additions & 0 deletions src/firefly/java/edu/caltech/ipac/firefly/core/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Date: 11/19/24
Expand Down Expand Up @@ -70,6 +74,132 @@ public static Object deserialize(String base64) {
}


//====================================================================
// Functional Helpers
//====================================================================

/**
* A function that throws exception
* @param <T> the return type of this function
*/
@FunctionalInterface
public interface FuncWithEx<T> {
T get() throws Exception;
}

/**
* A callable that throws exception
*/
@FunctionalInterface
public interface CallWithEx {
void run() throws Exception;
}

/**
* A function with parameter that throws exception
*/
@FunctionalInterface
public interface FuncParamWithEx<P, T> {
T apply(P p) throws Exception;
}

public static class Try<T> {
private final T val;
private final Exception ex;

Try(T val, Exception ex) {
this.val = val;
this.ex = ex;
}

/**
* Get the value if no exception was thrown
* @return the value if no exception was thrown, or null
*/
public T get() {
return ex == null ? val : null;
}

public T getOrElse(T defVal) {
return ex == null ? val : defVal;
}

public T getOrElse(Consumer<Exception> onError) {
if (ex == null) return val;
onError.accept(ex);
return null;
}

public static <P, T> Try<T> it(FuncParamWithEx<P,T> func, P param) {
try {
return new Try<>(func.apply(param), null);
} catch (Exception e) {
return new Try<>(null, e);
}
}

public static <T> Try<T> it(CallWithEx func) {
try {
func.run();
return new Try<>(null, null);
} catch (Exception e) {
return new Try<>(null, e);
}
}

public static <T> Try<T> it(FuncWithEx<T> func) {
try {
return new Try<>(func.get(), null);
} catch (Exception e) {
return new Try<>(null, e);
}
}

/**
* Execute the given function until it passes test, then return the result
* @param func the function to execute
* @param test test the returned value
* @param tries the number of times to try
* @return Try results
*/
public static <T> Try<T> until(FuncWithEx<T> func, Predicate<T> test, int tries) {
for (int i = 0; i < tries; i++) {
var res = it(func).get();
if (res != null && test.test(res)) return new Try<>(res, null);
}
return new Try<>(null, new IndexOutOfBoundsException("Exceeded max tries"));
}

}


public static class SynchronizedAccess {
private final ConcurrentHashMap<String, ReentrantLock> activeRequests = new ConcurrentHashMap<>();

@FunctionalInterface
public interface LockHandle {
void unlock();
}

/**
* Acquires a lock associated with the given ID. If the lock does not already exist, it is created.
*
* @param id the identifier for the lock
* @return a {@code Runnable} that, when executed, releases the lock and removes it from the active requests
*/
public LockHandle lock(String id) {
ReentrantLock lock = activeRequests.computeIfAbsent(id, k -> new ReentrantLock());
Logger.getLogger().trace("waiting %s: %s\n".formatted(id, lock));
lock.lock();
Logger.getLogger().trace("got lock %s: %s\n".formatted(id, lock));
return () -> {
try {
lock.unlock(); // Ensure lock is released even if an exception occurs
} finally {
if (!lock.isLocked()) activeRequests.remove(id); // Remove the lock from activeRequests if no threads are using it
Logger.getLogger().trace("unlock %s: %s\n".formatted(id, lock));
}
};
}
}
}
102 changes: 82 additions & 20 deletions src/firefly/java/edu/caltech/ipac/firefly/messaging/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,35 @@

import edu.caltech.ipac.firefly.core.background.JobInfo;
import edu.caltech.ipac.firefly.core.background.JobManager;
import edu.caltech.ipac.firefly.data.ServerEvent;
import edu.caltech.ipac.firefly.data.ServerEvent.Scope;
import edu.caltech.ipac.firefly.server.ServerContext;
import edu.caltech.ipac.firefly.util.event.Name;
import edu.caltech.ipac.util.AppProperties;

import java.io.Serializable;

import static edu.caltech.ipac.util.StringUtils.applyIfNotEmpty;
import static edu.caltech.ipac.firefly.data.ServerEvent.*;

/**
* The message object sent or received by Messenger. As implemented, Messenger
* uses JSON when serialize/deserialize is needed. However, this is an internal
* implementation, and it can be changed without breaking Messenger/Message contract.
*
* Date: 2019-03-15
*
* @author loi
* @version $Id: $
*/
public class Message {
private static final String TOPIC_KEY = "msg_topic";
protected JsonHelper helper = new JsonHelper();

public MsgHeader getHeader() {
Scope scope = Scope.valueOf(helper.getValue(Scope.SELF.name(), "header", "scope"));
String to = helper.getValue("", "header", "to");
String from = helper.getValue("", "header", "from");
String subject = helper.getValue("", "header", "subject");
MsgHeader header = new MsgHeader(scope, to, subject);
header.setFrom(from);
return header;
public void setTopic(String topic) {
setValue(topic, TOPIC_KEY);
}

public void setHeader(Scope scope, String to, String subject, String from) {
helper.setValue(scope.name(), "header", "scope");
if (to != null) helper.setValue(to, "header", "to");
if (from != null) helper.setValue(from, "header", "from");
if (subject != null) helper.setValue(subject, "header", "subject");
public String getTopic() {
return getValue(null, TOPIC_KEY);
}

public Message setValue(Object value, String... paths) {
Expand All @@ -63,24 +60,89 @@ public String toJson() {
// Predefined messages
//====================================================================

/**
* Represents a server event message that can be sent or received by the Messenger.
* This class encapsulates the details of a server event, including its name, target, data type, data, and source.
* It provides methods to construct an event from a ServerEvent object and to parse a Message object back into a ServerEvent.
*
* <p>Example usage:</p>
* <pre>
* {@code
* ServerEvent serverEvent = new ServerEvent(name, scope, data);
* Message.Event eventMessage = new Message.Event(serverEvent);
* }
* </pre>
**/
public static final class Event extends Message {
public static final String TOPIC = "firefly-events";

public Event(ServerEvent se) {
setTopic(TOPIC);
setValue(se.getName().getName(), "name");
applyIfNotEmpty(se.getTarget().getScope(), (s) -> setValue(s.name(), "target", "scope"));
applyIfNotEmpty(se.getTarget().getChannel(), (s) -> setValue(s, "target", "channel"));
applyIfNotEmpty(se.getTarget().getConnID(), (s) -> setValue(s, "target", "connID"));
applyIfNotEmpty(se.getTarget().getUserKey(), (s) -> setValue(s, "target", "userKey"));
setValue(se.getDataType().name(), "dataType");
setValue(se.getData(), "data");
setValue(se.getFrom(), "from");
}
public static ServerEvent parse(Message msg) {
try {
Name name = Name.parse(msg.getValue(null, "name"));
if (name == null) return null;

Scope scope = Scope.valueOf(msg.getValue(null, "target", "scope"));
String connID = msg.getValue(null, "target", "connID");
String channel = msg.getValue(null, "target", "channel");
String userKey = msg.getValue(null, "target", "userKey");
DataType dtype = DataType.valueOf(msg.getValue(null, "dataType"));
Serializable data = msg.getValue(null, "data");
String from = msg.getValue(null, "from");
return new ServerEvent(name, new EventTarget(scope, connID, channel, userKey), dtype, data, from);
} catch (Exception e) {
return null;
}
}
}

public static final class JobCompleted extends Message {
public static final String TOPIC = "JobCompleted";
static final String FROM = AppProperties.getProperty("mail.smtp.from", "[email protected]");
JobInfo job;
public JobCompleted(JobInfo jobInfo) {
job = jobInfo;
setTopic(TOPIC);
setHeader(Scope.CHANNEL, jobInfo.getOwner(), TOPIC, FROM);
helper.setValue(JobManager.toJsonObject(jobInfo), "jobInfo");
setValue(JobManager.toJsonObject(jobInfo), "jobInfo");
var user = ServerContext.getRequestOwner().getUserInfo();
if (user != null) {
helper.setValue(user.getName(), "user", "name");
helper.setValue(user.getEmail(), "user", "email");
helper.setValue(user.getLoginName(), "user", "loginName");
setValue(user.getName(), "user", "name");
setValue(user.getEmail(), "user", "email");
setValue(user.getLoginName(), "user", "loginName");
}
var ssoAdpt = ServerContext.getRequestOwner().getSsoAdapter();
if (ssoAdpt != null) {
helper.setValue(ssoAdpt.getAuthToken(), "user", "authToken");
setValue(ssoAdpt.getAuthToken(), "user", "authToken");
}
}

public MsgHeader getHeader() {
Scope scope = Scope.valueOf(getValue(Scope.SELF.name(), "header", "scope"));
String to = getValue("", "header", "to");
String from = getValue("", "header", "from");
String subject = getValue("", "header", "subject");
MsgHeader header = new MsgHeader(scope, to, subject);
header.setFrom(from);
return header;
}

public void setHeader(Scope scope, String to, String subject, String from) {
setValue(scope.name(), "header", "scope");
if (to != null) setValue(to, "header", "to");
if (from != null) setValue(from, "header", "from");
if (subject != null) setValue(subject, "header", "subject");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import static edu.caltech.ipac.util.StringUtils.applyIfNotEmpty;

/**
* An implementation of publish-subscribe messaging pattern based on Jedis client and Redis backend.
* This class abstract the use of threads, and it ensures that there is only 1 thread used per topic.
Expand Down Expand Up @@ -77,16 +79,13 @@ public static void publish(String topic, Message msg) {
}

/**
* Send the given message. The message's subject is used as the topic.
* @param msg the message to send
* Publishes the given message to its topic.
* @param msg the message to be published
*/
public static void publish(Message msg) {
// some firefly's specific logic here...
String topic = msg.getHeader().getSubject();
publish(topic, msg);
public static void publish(Message msg) {
applyIfNotEmpty(msg.getTopic(), (topic) -> publish(topic, msg));
}


/**
* Internal handler class used to manage the one-to-many relationship of Messenger's subscriber and
* Jedis's subscriber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public String getDbUrl() {
public void setPooled(boolean pooled) { isPooled = pooled;}
public boolean testConn(Connection conn) {
try (Statement stmt = conn.createStatement()) {
stmt.execute("SELECT 1"); // works with all DMBS
stmt.execute("SELECT 1 FROM (VALUES (0))"); // HSQL required FROM clause
return true;
} catch (SQLException e) { return false; }
};
Expand Down
Loading

0 comments on commit 95d27b1

Please sign in to comment.