Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/main/java/com/example/InMemoryPriorityQueueService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.example;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class InMemoryPriorityQueueService implements QueueService {
private final Map<String, Queue<Message>> queues;

private long visibilityTimeout;

InMemoryPriorityQueueService() {
this.queues = new ConcurrentHashMap<>();

String propFileName = "config.properties";
Properties confInfo = new Properties();

try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) {
confInfo.load(inStream);
} catch (IOException e) {
e.printStackTrace();
}

this.visibilityTimeout = Integer.parseInt(confInfo.getProperty("visibilityTimeout", "30"));
}

@Override
public void push(String queueUrl, String msgBody) {
Queue<Message> queue = queues.get(queueUrl);
if (queue == null) {
queue = new PriorityQueue<>(Comparator.comparingInt(Message::getPriority).reversed().thenComparingLong(Message::getTimestamp));
queues.put(queueUrl, queue);
}

queue.add(new Message(msgBody, this.extractPriorityFromJson(msgBody)));
}

@Override
public Message pull(String queueUrl) {
Queue<Message> queue = queues.get(queueUrl);
if (queue == null || queue.isEmpty()) {
return null;
}

Message msg = queue.peek();
msg.setReceiptId(UUID.randomUUID().toString());
msg.incrementAttempts();
msg.setVisibleFrom(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(visibilityTimeout));

return new Message(msg.getBody(), msg.getReceiptId(), this.extractPriorityFromJson(msg.getBody()));
}

@Override
public void delete(String queueUrl, String receiptId) {
Queue<Message> queue = queues.get(queueUrl);

if(queue == null || queue.isEmpty()) {
return;
}

long nowTime = now();
for (Iterator<Message> it = queue.iterator(); it.hasNext(); ) {
Message msg = it.next();
if (!msg.isVisibleAt(nowTime) && msg.getReceiptId().equals(receiptId)) {
it.remove();
break;
}
}
}

private int extractPriorityFromJson(String msgBody) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(msgBody);
JsonNode priorityNode = rootNode.get("priority");
if (priorityNode != null && priorityNode.isInt()) {
return priorityNode.intValue();
}
} catch (Exception e) {
e.printStackTrace();
}
return 0; // Default priority if not found or error occurred
}

long now() {
return System.currentTimeMillis();
}
}
26 changes: 25 additions & 1 deletion src/main/java/com/example/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,29 @@ public class Message {

private String msgBody;

private int priority;

private long timestamp;

Message(String msgBody) {
this.msgBody = msgBody;
this.timestamp = System.currentTimeMillis();
}

Message(String msgBody, String receiptId) {
this.msgBody = msgBody;
this.receiptId = receiptId;
this.timestamp = System.currentTimeMillis();
}

Message(String msgBody, int priority) {
this(msgBody);
this.priority = priority;
}

Message(String msgBody, String receiptId, int priority) {
this(msgBody, receiptId);
this.priority = priority;
}

public String getReceiptId() {
Expand Down Expand Up @@ -53,4 +69,12 @@ protected int getAttempts() {
protected void incrementAttempts() {
this.attempts++;
}
}

protected int getPriority() {
return priority;
}

protected long getTimestamp() {
return timestamp;
}
}
178 changes: 178 additions & 0 deletions src/test/java/com/example/InMemoryPriorityQueueTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package com.example;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;

public class InMemoryPriorityQueueTest {
private QueueService qs;
private String queueUrl = "https://sqs.ap-1.amazonaws.com/007/MyQueue";

@Before
public void setup() {
qs = new InMemoryPriorityQueueService();
}


@Test
public void testSendMessage(){
String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null }";
qs.push(queueUrl, msgBody);
Message msg = qs.pull(queueUrl);

assertNotNull(msg);
assertEquals(msgBody, msg.getBody());
}

@Test
public void testPullMessageWithDefaultPriority(){
String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null }";

qs.push(queueUrl, msgBody);
Message msg = qs.pull(queueUrl);

assertEquals(msgBody, msg.getBody());
assertEquals(0, msg.getPriority());
assertTrue(msg.getReceiptId() != null && msg.getReceiptId().length() > 0);
}

@Test
public void testPullMessageWithDefinedPriority(){
String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null , \"priority\":100}";

qs.push(queueUrl, msgBody);
Message msg = qs.pull(queueUrl);

assertEquals(msgBody, msg.getBody());
assertEquals(100, msg.getPriority());
assertTrue(msg.getReceiptId() != null && msg.getReceiptId().length() > 0);
}

@Test
public void testPullMultipleMessage(){
String msgBody1 = "{ \"name\":\"John\", \"age\":30, \"car\":null , \"priority\":10}";
String msgBody2 = "{ \"name\":\"John\", \"age\":30, \"car\":null , \"priority\":100}";
String msgBody3 = "{ \"name\":\"John\", \"age\":30, \"car\":null , \"priority\":1}";

// push all 3 messages
qs.push(queueUrl, msgBody1);
qs.push(queueUrl, msgBody2);
qs.push(queueUrl, msgBody3);

// pull 1st message and delete it
Message pulledMsg1 = qs.pull(queueUrl);
qs.delete(queueUrl , pulledMsg1.getReceiptId());

// pull 2nd message and delete it
Message pulledMsg2 = qs.pull(queueUrl);
qs.delete(queueUrl , pulledMsg2.getReceiptId());

// pull 3rd message and delete it
Message pulledMsg3 = qs.pull(queueUrl);
qs.delete(queueUrl , pulledMsg3.getReceiptId());

// assert all 3 pulled messages are not null
assertNotNull(pulledMsg1);
assertNotNull(pulledMsg2);
assertNotNull(pulledMsg3);

// message 1 will be priority as 100
assertEquals(msgBody2, pulledMsg1.getBody());
assertEquals(100, pulledMsg1.getPriority());
assertTrue(pulledMsg1.getReceiptId() != null && pulledMsg1.getReceiptId().length() > 0);

// message 2 will be priority as 10
assertEquals(msgBody1, pulledMsg2.getBody());
assertEquals(10, pulledMsg2.getPriority());
assertTrue(pulledMsg2.getReceiptId() != null && pulledMsg2.getReceiptId().length() > 0);

// message 1 will be priority as 1
assertEquals(msgBody3, pulledMsg3.getBody());
assertEquals(1, pulledMsg3.getPriority());
assertTrue(pulledMsg3.getReceiptId() != null && pulledMsg3.getReceiptId().length() > 0);
}

@Test
public void testPullEmptyQueue(){
Message msg = qs.pull(queueUrl);
assertNull(msg);
}

@Test
public void testDeleteMessage(){
String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null, \"priority\":1 }";

qs.push(queueUrl, msgBody);
Message msg = qs.pull(queueUrl);

qs.delete(queueUrl, msg.getReceiptId());
msg = qs.pull(queueUrl);

assertNull(msg);
}

@Test
public void testFIFO3Msgs(){
String [] msgStrs = {
"{\n" +
" \"name\":\"John1\",\n" +
" \"age\":30,\n" +
" \"priority\":1\n" +
" }",
"{\n" +
" \"name\":\"John2\",\n" +
" \"age\":30,\n" +
" \"priority\":1\n" +
" }",
"{\n" +
" \"name\":\"John3\",\n" +
" \"age\":30,\n" +
" \"priority\":1\n" +
" }"
};

// push all 3 messages
qs.push(queueUrl, msgStrs[0]);
qs.push(queueUrl, msgStrs[1]);
qs.push(queueUrl, msgStrs[2]);

// pull first message
Message msg1 = qs.pull(queueUrl);
qs.delete(queueUrl , msg1.getReceiptId());

// pull second message
Message msg2 = qs.pull(queueUrl);
qs.delete(queueUrl , msg2.getReceiptId());

// pull third message
Message msg3 = qs.pull(queueUrl);
qs.delete(queueUrl , msg3.getReceiptId());

// all messages will be of same priority
assertEquals(1, msg1.getPriority());
assertEquals(1, msg2.getPriority());
assertEquals(1, msg3.getPriority());

// assert first pulled message has msgBody1 and likewise for second & third message
assertEquals(msgStrs[0],msg1.getBody());
assertEquals(msgStrs[1],msg2.getBody());
assertEquals(msgStrs[2],msg3.getBody());
}

@Test
public void testAckTimeout(){

String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null, \"priority\":1 }";
InMemoryPriorityQueueService queueService = new InMemoryPriorityQueueService() {
long now() {
return System.currentTimeMillis() + 1000 * 30 + 1;
}
};

queueService.push(queueUrl, msgBody);
queueService.pull(queueUrl);
Message msg = queueService.pull(queueUrl);
assertTrue(msg != null && msg.getBody() == msgBody);
}
}