Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface DedupQueueService extends BaseQueueService {

void sendAll(Map<String, ? extends Collection<?>> messagesByQueue);

//Overloaded sendAll method to send to cassandra
void sendAll(String queue, Collection<?>messages, boolean isFlush);

/**
Expand All @@ -26,6 +27,10 @@ public interface DedupQueueService extends BaseQueueService {
*/
long getMessageCount(String queue);

default long getUncachedSize(String queue){
return 0;
}

/**
* Counts the total number of messages for the specified queue, accurate up to the specified limit. Beyond the
* specified limit the message count will be a rough estimate, allowing the caller to make the trade-off between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public interface QueueService extends BaseQueueService {

void sendAll(String queue, Collection<?> messages);


void sendAll(Map<String, ? extends Collection<?>> messagesByQueue);

//Overloaded sendAll method to send to cassandra
Expand All @@ -28,6 +27,10 @@ public interface QueueService extends BaseQueueService {
*/
long getMessageCount(String queue);

default long getUncachedSize(String queue){
return 0;
}

/**
* Counts the total number of messages for the specified queue, accurate up to the specified limit. Beyond the
* specified limit the message count will be a rough estimate, allowing the caller to make the trade-off between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,9 @@ private void startStepFunctionExecution(Map<String, String> parameters, String q

String inputPayload = createInputPayload(queueThreshold, batchSize, queueType, queueName, topic, interval);

// Create the timestamp
String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds

// Check if queueType is "dedupq" and prepend "D" to execution name if true
String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp;

String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName ;
// Start the Step Function execution
stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.services.stepfunctions.model.StartExecutionResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;

/**
* Service to interact with AWS Step Functions using AWS SDK v1.
Expand Down Expand Up @@ -42,6 +43,10 @@ public void startExecution(String stateMachineArn, String inputPayload, String e
logger.warn("Input payload is null; using empty JSON object");
inputPayload = "{}"; // Default to empty payload if null
}
// Create the timestamp
String timestamp = String.valueOf(Instant.now().getEpochSecond());
// Append the timestamp to the initial execution name
executionName = sanitizeExecutionName(executionName) + "_" + timestamp;

try {
StartExecutionRequest startExecutionRequest = new StartExecutionRequest()
Expand All @@ -59,4 +64,33 @@ public void startExecution(String stateMachineArn, String inputPayload, String e
throw e;
}
}

/**
* Sanitizes the execution name by replacing invalid characters with underscores
* and truncating if needed.
*/
public String sanitizeExecutionName(String executionName) {
if (executionName == null || executionName.isEmpty()) {
throw new IllegalArgumentException("Execution name cannot be null or empty");
}
executionName = executionName.trim();
// Replace invalid characters with underscores
String sanitized = executionName.replaceAll("[^a-zA-Z0-9\\-_]", "_");

// Check if the sanitized name is empty or consists only of underscores
if (sanitized.isEmpty() || sanitized.replaceAll("_", "").isEmpty()) {
throw new IllegalArgumentException("Execution name cannot contain only invalid characters");
}

// Truncate from the beginning if length exceeds 69 characters
if (sanitized.length() > 69) {
sanitized = sanitized.substring(sanitized.length() - 69);
}

// Log the updated execution name if it has changed
if (!sanitized.equals(executionName)) {
logger.info("Updated execution name: {}", sanitized);
}
return sanitized;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package com.bazaarvoice.emodb.queue.core.stepfn;

import com.amazonaws.services.stepfunctions.AWSStepFunctions;
import com.amazonaws.services.stepfunctions.model.StartExecutionRequest;
import com.amazonaws.services.stepfunctions.model.StartExecutionResult;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Field;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;

public class StepFunctionServiceTest {

private StepFunctionService stepFunctionService;

@Mock
private AWSStepFunctions mockStepFunctionsClient;

@BeforeMethod
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
stepFunctionService = new StepFunctionService();

// Use reflection to set the private field stepFunctionsClient
Field field = StepFunctionService.class.getDeclaredField("stepFunctionsClient");
field.setAccessible(true); // Make the private field accessible
field.set(stepFunctionService, mockStepFunctionsClient); // Inject mock
}

@Test
public void testStartExecution_withValidParameters() {
// Arrange
String stateMachineArn = "arn:aws:states:us-east-1:123456789012:stateMachine:exampleStateMachine";
String inputPayload = "{\"key\":\"value\"}";
String executionName = "testExecution";

StartExecutionResult mockResult = new StartExecutionResult()
.withExecutionArn("arn:aws:states:us-east-1:123456789012:execution:exampleExecution");
when(mockStepFunctionsClient.startExecution(any(StartExecutionRequest.class))).thenReturn(mockResult);

// Act
stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);

// Assert
ArgumentCaptor<StartExecutionRequest> requestCaptor = ArgumentCaptor.forClass(StartExecutionRequest.class);
verify(mockStepFunctionsClient).startExecution(requestCaptor.capture());

StartExecutionRequest request = requestCaptor.getValue();
assertEquals(request.getStateMachineArn(), stateMachineArn);
assertEquals(request.getInput(), inputPayload);
assertTrue(request.getName().startsWith("testExecution_"));
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "State Machine ARN cannot be null or empty")
public void testStartExecution_withNullStateMachineArn() {
// Arrange
String stateMachineArn = null;
String inputPayload = "{\"key\":\"value\"}";
String executionName = "testExecution";

// Act
stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "State Machine ARN cannot be null or empty")
public void testStartExecution_withEmptyStateMachineArn() {
// Arrange
String stateMachineArn = "";
String inputPayload = "{\"key\":\"value\"}";
String executionName = "testExecution";

// Act
stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
}

@Test
public void testStartExecution_withNullInputPayload() {
// Arrange
String stateMachineArn = "arn:aws:states:us-east-1:123456789012:stateMachine:exampleStateMachine";
String executionName = "testExecution";

StartExecutionResult mockResult = new StartExecutionResult()
.withExecutionArn("arn:aws:states:us-east-1:123456789012:execution:exampleExecution");
when(mockStepFunctionsClient.startExecution(any(StartExecutionRequest.class))).thenReturn(mockResult);

// Act
stepFunctionService.startExecution(stateMachineArn, null, executionName);

// Assert
ArgumentCaptor<StartExecutionRequest> requestCaptor = ArgumentCaptor.forClass(StartExecutionRequest.class);
verify(mockStepFunctionsClient).startExecution(requestCaptor.capture());

StartExecutionRequest request = requestCaptor.getValue();
assertEquals(request.getStateMachineArn(), stateMachineArn);
assertEquals(request.getInput(), "{}"); // Default to empty payload
}

@Test
public void testSanitizeExecutionName_withInvalidCharacters() {
// Arrange
String invalidExecutionName = "test/execution:name*with?invalid|characters";

// Act
String sanitized = stepFunctionService.sanitizeExecutionName(invalidExecutionName);

// Assert
assertEquals(sanitized, "test_execution_name_with_invalid_characters");
}

@Test
public void testSanitizeExecutionName_withTooLongName() {
// Arrange
String longExecutionName = "ThisIsAVeryLongExecutionNameThatExceedsTheMaximumAllowedLengthOfSixtyNineCharactersAndShouldBeTruncatedAtSomePoint";

// Act
String sanitized = stepFunctionService.sanitizeExecutionName(longExecutionName);

// Assert
assertTrue(sanitized.length() <= 69);
}

// New Test Cases for Edge Cases

@Test
public void testSanitizeExecutionName_withValidName() {
// Arrange
String validExecutionName = "validExecutionName";

// Act
String sanitized = stepFunctionService.sanitizeExecutionName(validExecutionName);


// Assert
assertEquals(sanitized, validExecutionName); // Should return the same name
}

@Test
public void testSanitizeExecutionName_withLeadingAndTrailingSpaces() {
// Arrange
String executionName = " executionName ";

// Act
String sanitized = stepFunctionService.sanitizeExecutionName(executionName);


// Assert
assertEquals(sanitized, "executionName"); // Should trim spaces
}

@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Execution name cannot contain only invalid characters")
public void testSanitizeExecutionName_withOnlyInvalidCharacters() {
// Arrange
String invalidOnly = "*/?|<>"; // Input with only invalid characters

stepFunctionService.sanitizeExecutionName(invalidOnly);
}


@Test
public void testSanitizeExecutionName_withMaximumLength() {
// Arrange
String maxLengthExecutionName = "ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEDHDFHDFHHFCN"; // 69 characters

// Act
String sanitized = stepFunctionService.sanitizeExecutionName(maxLengthExecutionName);


// Assert
assertEquals(sanitized.length(), 69); // Should be exactly 69 characters
}

@Test
public void testSanitizeExecutionName_withMultipleInvalidCharacters() {
// Arrange
String executionName = "test//?invalid//name?with*multiple|invalid:characters";

// Act
String sanitized = stepFunctionService.sanitizeExecutionName(executionName);


// Assert
assertEquals(sanitized, "test___invalid__name_with_multiple_invalid_characters"); // Should replace all invalid characters
}
}