Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/main/java/com/example/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class Message {

private String msgBody;

private int priority;

Message(String msgBody) {
this.msgBody = msgBody;
}
Expand All @@ -21,6 +23,16 @@ public class Message {
this.receiptId = receiptId;
}

Message(String msgBody, int priority) {
this(msgBody);
this.priority = priority;
}

Message(String msgBody, String receiptId, int priority) {
this(msgBody, receiptId);
this.priority = priority;
}

public String getReceiptId() {
return this.receiptId;
}
Expand Down Expand Up @@ -53,4 +65,8 @@ protected int getAttempts() {
protected void incrementAttempts() {
this.attempts++;
}

protected int getPriority() {
return priority;
}
}
143 changes: 143 additions & 0 deletions src/main/java/com/example/RedisQueueService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package com.example;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;

public class RedisQueueService implements QueueService {

private final String redisEndpointUrl;
private final String redisApiToken;

private static final String REDIS_ZADD_COMMAND = "zadd";
private static final String REDIS_ZPOPMAX_COMMAND = "zpopmax";
private static final String REDIS_REV_RANGE_COMMAND = "zrevrange";


RedisQueueService() {
String propFileName = "config.properties";
Properties confInfo = new Properties();

try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) {
confInfo.load(inStream);
} catch (IOException e) {
e.printStackTrace();
}

this.redisEndpointUrl = confInfo.getProperty("redisEndpointUrl");
this.redisApiToken = confInfo.getProperty("redisApiToken");
}

@Override
public void push(String queueUrl, String msgBody) {
int priority = extractPriorityFromJson(msgBody);
makePostRequest(redisEndpointUrl + "/"+ REDIS_ZADD_COMMAND + "/" + queueUrl + "/" + priority, msgBody);
}

@Override
public Message pull(String queueUrl) {

String redisResponse = makeGetRequest(redisEndpointUrl + "/"+ REDIS_REV_RANGE_COMMAND+ "/"+ queueUrl+"/0/0");

return createMessageFromRedisResponse(redisResponse);
}

// for now, This deletes peek element from redis sorted set.
// will try to find a solution if we can delete using receipt id
@Override
public void delete(String queueUrl, String receiptId) {
makeGetRequest(redisEndpointUrl + "/"+ REDIS_ZPOPMAX_COMMAND+ "/"+ queueUrl);
}

private int extractPriorityFromJson(String msgBody) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(msgBody);
JsonNode priorityNode = rootNode.get("priority");
if (priorityNode != null && priorityNode.isInt()) {
return priorityNode.intValue();
}
} catch (Exception e) {
e.printStackTrace();
}
return 0; // Default priority if not found or error occurred
}

// this method creates a Message object from response received from redis.
private Message createMessageFromRedisResponse(String response) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
JsonNode resultNode = rootNode.get("result");

if (resultNode != null && resultNode.isArray() && resultNode.size() > 0) {
JsonNode messageNode = mapper.readTree(resultNode.get(0).asText());

int priority = messageNode.has("priority") ? messageNode.get("priority").asInt() : 0;
String messageBody = messageNode.has("content") ? messageNode.get("content").asText() : "";

return new Message(messageBody, UUID.randomUUID().toString(), priority);
} else {
// Queue is empty or no valid message found
return null;
}
} catch (IOException e) {
throw new RuntimeException("Failed to parse JSON response", e);
}
}

private String makeGetRequest(String apiUrl) {
CloseableHttpClient client = HttpClients.createDefault();

HttpGet httpGet = new HttpGet(apiUrl);
httpGet.addHeader("Content-Type", "application/json");
httpGet.addHeader("Authorization", String.format("Bearer %s", redisApiToken));

try {
CloseableHttpResponse response = client.execute(httpGet);

if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(String.format("Redis Api failed with status %s", response.getStatusLine().getStatusCode()));
}
return EntityUtils.toString(response.getEntity());
} catch (Exception e) {
throw new RuntimeException("Unable to call Redis api", e);
}
}

private String makePostRequest(String apiUrl, String msgBody) {
CloseableHttpClient client = HttpClients.createDefault();

HttpPost httpPost = new HttpPost(apiUrl);

try {
httpPost.setEntity(new StringEntity(msgBody));
} catch (Exception e) {
throw new RuntimeException("Unable to call Redis api", e);
}

httpPost.addHeader("Content-Type", "application/json");
httpPost.addHeader("Authorization", String.format("Bearer %s", redisApiToken));

try {
CloseableHttpResponse response = client.execute(httpPost);

if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(String.format("Redis Api failed with status %s", response.getStatusLine().getStatusCode()));
}
return EntityUtils.toString(response.getEntity());
} catch (Exception e) {
throw new RuntimeException("Unable to call Redis api", e);
}
}
}
8 changes: 7 additions & 1 deletion src/main/resources/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ queueDirectory = nigel-qs
fieldDelimiter = :

# Visibility Timeout (in seconds)
visibilityTimeout = 30
visibilityTimeout = 30

# Redis REST Endpoint Url
redisEndpointUrl = upstash_redis_endpoint

# Redis Oauth token
redisApiToken = upstash_redis_token
133 changes: 133 additions & 0 deletions src/test/java/com/example/RedisQueueTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.example;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;

public class RedisQueueTest {
private QueueService qs;
private String queueUrl = "mystash";

@Before
public void setup() {
qs = new RedisQueueService();
}

@Test
public void testSendMessage(){
String msgJson= "{\"content\" : \"Good message!\", \"priority\":1}";
qs.push(queueUrl, msgJson);
Message msg = qs.pull(queueUrl);

assertNotNull(msg);
assertEquals("Good message!", msg.getBody());
assertEquals(1, msg.getPriority());

qs.delete(queueUrl, msg.getReceiptId());
}

@Test
public void testPullMessageWithDefaultPriority(){
String msgJson = "{\"content\" : \"Good message!\"}";

qs.push(queueUrl, msgJson);
Message msg = qs.pull(queueUrl);

assertEquals("Good message!", msg.getBody());
assertEquals(0, msg.getPriority());
assertTrue(msg.getReceiptId() != null && msg.getReceiptId().length() > 0);

qs.delete(queueUrl, msg.getReceiptId());
}

@Test
public void testPullMultipleMessage(){
String msgJson1 = "{\"content\" : \"Message 1\", \"priority\":10}";
String msgJson2 = "{\"content\" : \"Message 2\", \"priority\":100}";
String msgJson3 = "{\"content\" : \"Message 3\", \"priority\":1}";

// push all 3 messages
qs.push(queueUrl, msgJson1);
qs.push(queueUrl, msgJson2);
qs.push(queueUrl, msgJson3);

// pull 1st message and delete it
Message pulledMsg1 = qs.pull(queueUrl);
qs.delete(queueUrl, pulledMsg1.getReceiptId());

// pull 2nd message and delete it
Message pulledMsg2 = qs.pull(queueUrl);
qs.delete(queueUrl, pulledMsg2.getReceiptId());

// pull 3rd message and delete it
Message pulledMsg3 = qs.pull(queueUrl);
qs.delete(queueUrl, pulledMsg3.getReceiptId());

// assert all 3 pulled messages are not null
assertNotNull(pulledMsg1);
assertNotNull(pulledMsg2);
assertNotNull(pulledMsg3);

// message 1 will be priority as 100
assertEquals("Message 2", pulledMsg1.getBody());
assertEquals(100, pulledMsg1.getPriority());
assertTrue(pulledMsg1.getReceiptId() != null && pulledMsg1.getReceiptId().length() > 0);

// message 2 will be priority as 10
assertEquals("Message 1", pulledMsg2.getBody());
assertEquals(10, pulledMsg2.getPriority());
assertTrue(pulledMsg2.getReceiptId() != null && pulledMsg2.getReceiptId().length() > 0);

// message 1 will be priority as 1
assertEquals("Message 3", pulledMsg3.getBody());
assertEquals(1, pulledMsg3.getPriority());
assertTrue(pulledMsg3.getReceiptId() != null && pulledMsg3.getReceiptId().length() > 0);
}

@Test
public void testPullEmptyQueue(){
Message msg = qs.pull(queueUrl);
assertNull(msg);
}

// this test doesn't return message on FIFO manner. I try adding timestamp to fix it.
@Test
public void testFIFO2Msgs(){
String [] msgStrs = {
"{\n" +
" \"content\":\"Message 1\",\n" +
" \"timestamp\": 1646749350000,"+
" \"priority\":1\n" +
" }",
"{\n" +
" \"content\":\"Message 2\",\n" +
" \"timestamp\": 1646749355000,"+
" \"priority\":1\n" +
" }"
};

// push both messages
qs.push(queueUrl, msgStrs[0]);
qs.push(queueUrl, msgStrs[1]);

// pull first message
Message msg1 = qs.pull(queueUrl);
qs.delete(queueUrl, msg1.getReceiptId());

// pull second message
Message msg2 = qs.pull(queueUrl);
qs.delete(queueUrl, msg2.getReceiptId());

// both message will be of same priority
assertEquals(1, msg1.getPriority());
assertEquals(1, msg2.getPriority());

// Ideal FIFO order
// assertEquals("Message 1", msg1.getBody());
// assertEquals("Message 2", msg2.getBody());

assertEquals("Message 2", msg1.getBody());
assertEquals("Message 1", msg2.getBody());
}
}