Skip to content
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ dmypy.json
MANIFEST
*.pyc
.python-version
Pipfile
Pipfile.lock

# Generated files
**/bin
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.impl;

import com.google.common.base.Throwables;
import com.linkedin.datahub.upgrade.UpgradeReport;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -20,6 +21,8 @@ public void addLine(String line) {
public void addLine(String line, Exception e) {
log.error(line, e);
reportLines.add(line + String.format(": %s", e));
reportLines.add(
String.format("Exception stack trace: %s", Throwables.getStackTraceAsString(e)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package com.linkedin.datahub.upgrade.restoreindices;

import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.ebean.Database;
import io.ebean.ExpressionList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -156,22 +152,8 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
}

@VisibleForTesting
int getRowCount(RestoreIndicesArgs args) {
ExpressionList<EbeanAspectV2> countExp =
_server
.find(EbeanAspectV2.class)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
if (args.aspectName != null) {
countExp = countExp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName);
}
if (args.urn != null) {
countExp = countExp.eq(EbeanAspectV2.URN_COLUMN, args.urn);
}
if (args.urnLike != null) {
countExp = countExp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
}
return countExp.findCount();
int getRowCount(UpgradeContext context, RestoreIndicesArgs args) {
return _entityService.countAspect(args, context.report()::addLine);
}

@Override
Expand All @@ -184,7 +166,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {

context.report().addLine("Sending MAE from local DB");
long startTime = System.currentTimeMillis();
final int rowCount = getRowCount(args);
final int rowCount = getRowCount(context, args);
context
.report()
.addLine(
Expand Down Expand Up @@ -224,7 +206,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
context.report().addLine("End of data.");
break;
} else {
log.error("Failure processing restore indices batch.", e);
context.report().addLine("Exception while processing batch", e);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade;

import static org.testng.Assert.*;

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeReport;
import java.util.List;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class UpgradeReportTest {

private UpgradeReport upgradeReport;

@BeforeMethod
public void setup() {
upgradeReport = new DefaultUpgradeReport();
}

@Test
public void testAddLine() {
// Given
String line1 = "Starting upgrade";
String line2 = "Processing step 1";
String line3 = "Upgrade completed";

// When
upgradeReport.addLine(line1);
upgradeReport.addLine(line2);
upgradeReport.addLine(line3);

// Then
List<String> lines = upgradeReport.lines();
assertEquals(lines.size(), 3);
assertEquals(lines.get(0), line1);
assertEquals(lines.get(1), line2);
assertEquals(lines.get(2), line3);
}

@Test
public void testAddLineWithException() {
// Given
String errorMessage = "Error occurred during upgrade";
Exception testException = new RuntimeException("Test exception message");

// When
upgradeReport.addLine(errorMessage, testException);

// Then
List<String> lines = upgradeReport.lines();
assertEquals(lines.size(), 2);
assertTrue(lines.get(0).contains(errorMessage));
assertTrue(lines.get(0).contains("Test exception message"));
assertTrue(lines.get(1).contains("Exception stack trace:"));
assertTrue(lines.get(1).contains("RuntimeException"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public void testExecutableWithDefaultArgs() {
// Insert a few test rows
insertTestRows(5, null);

// Mock countAspect to return 5 rows
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down Expand Up @@ -175,6 +178,8 @@ public void testExecutableWithCustomArgs() {
// Insert some test data
insertTestRows(5, "testAspect");

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down Expand Up @@ -209,6 +214,8 @@ public void testExecutableWithUrnLike() {
// Insert data that matches the URN pattern
insertTestRows(3, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down Expand Up @@ -242,6 +249,8 @@ public void testExecutableWithPitEpochMs() {
insertTestRow("urn:li:test:2", "testAspect", 0, oneHourAgo, "testUser"); // Edge of range
insertTestRow("urn:li:test:3", "testAspect", 0, twoHoursAgo, "testUser"); // Outside range

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(2);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand All @@ -267,14 +276,14 @@ public void testExecutableWithAspectNames() {
insertTestRow("urn:li:test:2", "aspect2", 0, now, "testUser");
insertTestRow("urn:li:test:3", "aspect3", 0, now, "testUser");

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

// Verify aspect names parameter
ArgumentCaptor<RestoreIndicesArgs> argsCaptor =
ArgumentCaptor.forClass(RestoreIndicesArgs.class);
verify(mockEntityService).restoreIndices(eq(mockOpContext), argsCaptor.capture(), any());

RestoreIndicesArgs capturedArgs = argsCaptor.getValue();
assertEquals(capturedArgs.aspectNames.size(), 3);
assertTrue(capturedArgs.aspectNames.contains("aspect1"));
Expand All @@ -291,6 +300,8 @@ public void testExecutableWithUrnBasedPagination() {
// Insert enough rows to trigger pagination
insertTestRows(5, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);

// Setup sequential results for pagination
RestoreIndicesResult firstResult = new RestoreIndicesResult();
firstResult.rowsMigrated = 2;
Expand Down Expand Up @@ -374,6 +385,8 @@ public void testExecutableWithError() {
// Insert rows so the query returns data
insertTestRows(10, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(10);

// Force the service to throw an exception
when(mockEntityService.restoreIndices(eq(mockOpContext), any(RestoreIndicesArgs.class), any()))
.thenThrow(new RuntimeException("Test exception"));
Expand All @@ -387,6 +400,28 @@ public void testExecutableWithError() {
assertEquals(result.stepId(), sendMAEStep.id());
}

@Test
public void testUrnBasedPaginationExecutableWithError() {
parsedArgs.put(RestoreIndices.URN_BASED_PAGINATION_ARG_NAME, Optional.of("true"));

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(10);

// Force the service to throw an exception
when(mockEntityService.restoreIndices(eq(mockOpContext), any(RestoreIndicesArgs.class), any()))
.thenThrow(new RuntimeException("Test exception"));

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

// Verify failure
assertTrue(result instanceof DefaultUpgradeStepResult);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
assertEquals(result.stepId(), sendMAEStep.id());

// verify exception reported
verify(mockReport, atLeastOnce()).addLine(anyString(), any(Exception.class));
}

@Test
public void testReportAddedLines() {
// Insert some test data
Expand All @@ -407,6 +442,8 @@ public void testExecutableWithCreateDefaultAspects() {
// Insert test data
insertTestRows(3, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ ListResult<String> listUrns(
@Nonnull
Integer countAspect(@Nonnull final String aspectName, @Nullable String urnLike);

@Nonnull
Integer countAspect(final RestoreIndicesArgs args);

@Nonnull
PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,12 @@ public Integer getCountAspect(
return aspectDao.countAspect(aspectName, urnLike);
}

@Override
public Integer countAspect(@Nonnull RestoreIndicesArgs args, @Nonnull Consumer<String> logger) {
logger.accept(String.format("Args are %s", args));
return aspectDao.countAspect(args);
}

@Nonnull
@Override
public List<RestoreIndicesResult> restoreIndices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,13 @@ public Integer countAspect(@Nonnull String aspectName, @Nullable String urnLike)
return -1;
}

@Nonnull
@Override
public Integer countAspect(final RestoreIndicesArgs args) {
// Not implemented
return -1;
}

@Nonnull
public PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args) {
// Not implemented
Expand Down
Loading
Loading