Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
public class PinotTableRestletResourceTest extends ControllerTest {
private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
private static final String SEGMENT_GENERATION_AND_PUSH_TASK_TYPE =
MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
private final TableConfigBuilder _offlineBuilder = getOfflineTableBuilder(OFFLINE_TABLE_NAME);
private final TableConfigBuilder _realtimeBuilder = getRealtimeTableBuilder(REALTIME_TABLE_NAME);
private String _createTableUrl;
Expand Down Expand Up @@ -122,6 +124,7 @@ public void beforeMethod()
throws Exception {
DEFAULT_INSTANCE.addDummySchema(REALTIME_TABLE_NAME);
DEFAULT_INSTANCE.addDummySchema(OFFLINE_TABLE_NAME);
ensureTaskQueueResumed();
}

private void registerMinionTasks() {
Expand Down Expand Up @@ -1130,10 +1133,8 @@ public void testTableTasksValidationWithDanglingTasks()

// Create a task manually to simulate dangling task
PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
TaskSchedulingContext context = new TaskSchedulingContext();
context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
Map<String, TaskSchedulingInfo> taskInfo = taskManager.scheduleTasks(context);
String taskName = taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
String taskName = getOrScheduleTask(taskManager, tableName + "_OFFLINE");
assertNotNull(taskName);
waitForTaskState(taskName, TaskState.IN_PROGRESS);

// Now try to create another table with same name (simulating re-creation with dangling tasks)
Expand Down Expand Up @@ -1191,10 +1192,8 @@ public void testTableTasksCleanupWithNonActiveTasks()

// Create some completed tasks
PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
TaskSchedulingContext context = new TaskSchedulingContext();
context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
Map<String, TaskSchedulingInfo> taskInfo = taskManager.scheduleTasks(context);
String taskName = taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
String taskName = getOrScheduleTask(taskManager, tableName + "_OFFLINE");
assertNotNull(taskName);
waitForTaskState(taskName, TaskState.IN_PROGRESS);

// stop the task queue to abort the task
Expand Down Expand Up @@ -1223,6 +1222,90 @@ private static void waitForTaskState(String taskName, TaskState expectedState) {
}, 5000, "Task not scheduled");
}

private static String getScheduledTaskName(Map<String, TaskSchedulingInfo> taskInfo) {
for (TaskSchedulingInfo schedulingInfo : taskInfo.values()) {
List<String> scheduledTaskNames = schedulingInfo.getScheduledTaskNames();
if (scheduledTaskNames != null && !scheduledTaskNames.isEmpty()) {
return scheduledTaskNames.get(0);
}
}
return null;
}

private static String getOrScheduleTask(PinotTaskManager taskManager, String tableNameWithType)
throws IOException {
TaskSchedulingContext context = new TaskSchedulingContext();
context.setTablesToSchedule(Set.of(tableNameWithType));
String[] scheduledTaskName = new String[1];
TestUtils.waitForCondition((aVoid) -> {
Map<String, TaskSchedulingInfo> taskInfo;
taskInfo = taskManager.scheduleTasks(context);
scheduledTaskName[0] = getScheduledTaskName(taskInfo);
if (scheduledTaskName[0] != null) {
return true;
}
TaskSchedulingInfo schedulingInfo = taskInfo.get(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE);
if (!isQueueStopped(schedulingInfo)) {
return false;
}
try {
ensureTaskQueueResumed();
} catch (IOException e) {
return false;
}
return false;
}, 6000, "Failed to schedule SegmentGenerationAndPushTask for table: " + tableNameWithType);
if (scheduledTaskName[0] != null) {
return scheduledTaskName[0];
}
fail("Failed to schedule SegmentGenerationAndPushTask after retrying queue resume for table: " + tableNameWithType);
return null;
}

private static void ensureTaskQueueResumed()
throws IOException {
sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
.forResumeMinionTaskQueue(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE));
}

private static boolean isQueueStopped(TaskSchedulingInfo schedulingInfo) {
if (schedulingInfo == null || schedulingInfo.getScheduledTaskNames() != null) {
return false;
}
return schedulingInfo.getSchedulingErrors().stream().anyMatch(
error -> error.contains("Unable to start scheduling for task type") && error.contains("may be stopped"));
}

private static void waitForTaskStateOrTaskMissing(String taskName, TaskState expectedState) {
TestUtils.waitForCondition((aVoid) -> {
try {
return sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forMinionTaskState(taskName))
.replace("\"", "")
.equals(expectedState.name());
} catch (IOException e) {
return isTaskStateNotFound(e);
}
}, 8000, "Task not stopped or removed");
}

private static void deleteMinionTaskWithRetry(String taskName) {
TestUtils.waitForCondition((aVoid) -> {
try {
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forDeleteMinionTask(taskName)
+ "?forceDelete=true");
return true;
} catch (IOException e) {
return isTaskStateNotFound(e);
}
}, 15000, "Failed to delete task: " + taskName);
}

private static boolean isTaskStateNotFound(IOException e) {
String message = e.getMessage();
return message != null && (message.contains("status code: 404") || message.contains("Not Found")
|| message.contains("does not exist"));
}
Comment on lines +1303 to +1307
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isTaskStateNotFound detects 404s by substring-matching IOException.getMessage(), which is brittle and may misclassify errors if message formats change. Since the request helpers wrap HttpErrorStatusException as the IOException cause, prefer checking e.getCause() for HttpErrorStatusException and using its status code (or a dedicated helper that returns status codes) to reliably detect 404s.

Copilot uses AI. Check for mistakes.

@Test
public void testTableTasksCleanupWithActiveTasks()
throws Exception {
Expand All @@ -1241,10 +1324,8 @@ public void testTableTasksCleanupWithActiveTasks()

// Create an active/in-progress task
PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
TaskSchedulingContext context = new TaskSchedulingContext();
context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
Map<String, TaskSchedulingInfo> taskInfo = taskManager.scheduleTasks(context);
String taskName = taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
String taskName = getOrScheduleTask(taskManager, tableName + "_OFFLINE");
assertNotNull(taskName);
waitForTaskState(taskName, TaskState.IN_PROGRESS);
try {
// Try to delete table without ignoring active tasks - should fail
Expand All @@ -1259,9 +1340,17 @@ public void testTableTasksCleanupWithActiveTasks()
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName + "?ignoreActiveTasks=true"));
assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + "_OFFLINE] deleted\"}");

// delete task
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forDeleteMinionTask(taskName)
+ "?forceDelete=true");
try {
// delete task
sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
.forStopMinionTaskQueue(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE));
waitForTaskStateOrTaskMissing(taskName, TaskState.STOPPED);
deleteMinionTaskWithRetry(taskName);
} finally {
// resume the task queue again to avoid affecting other tests
sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
.forResumeMinionTaskQueue(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE));
}
}

@Test
Expand Down
Loading