Skip to content

Commit

Permalink
Skip non-normalization operations on resets (#8654)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfsiega-airbyte committed Sep 1, 2023
1 parent 6fac09f commit 5c257c3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,13 @@ public JobInfoRead createJob(final JobCreate jobCreate) throws JsonValidationExc
final List<StandardSyncOperation> standardSyncOperations = Lists.newArrayList();
for (final var operationId : standardSync.getOperationIds()) {
final StandardSyncOperation standardSyncOperation = configRepository.getStandardSyncOperation(operationId);
standardSyncOperations.add(standardSyncOperation);
// NOTE: we must run normalization operations during resets, because we rely on them to clear the
// normalized tables. However, we don't want to run other operations (dbt, webhook) because those
// are meant to transform the data after the sync but there's no data to transform. Webhook
// operations particularly will fail because we don't populate some required config during resets.
if (StandardSyncOperation.OperatorType.NORMALIZATION.equals(standardSyncOperation.getOperatorType())) {
standardSyncOperations.add(standardSyncOperation);
}
}

final Optional<Long> jobIdOptional =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,15 @@
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobTypeResourceLimit;
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -222,6 +225,17 @@ class SchedulerHandlerTest {
.withSupportedDestinationSyncModes(List.of(DestinationSyncMode.OVERWRITE, DestinationSyncMode.APPEND, DestinationSyncMode.APPEND_DEDUP))
.withDocumentationUrl(URI.create("unused")));

private static final StandardSyncOperation NORMALIZATION_OPERATION = new StandardSyncOperation()
.withOperatorType(StandardSyncOperation.OperatorType.NORMALIZATION)
.withOperatorNormalization(new OperatorNormalization());

private static final UUID NORMALIZATION_OPERATION_ID = UUID.randomUUID();

public static final StandardSyncOperation WEBHOOK_OPERATION = new StandardSyncOperation()
.withOperatorType(StandardSyncOperation.OperatorType.WEBHOOK)
.withOperatorWebhook(new OperatorWebhook());
private static final UUID WEBHOOK_OPERATION_ID = UUID.randomUUID();

private SchedulerHandler schedulerHandler;
private ConfigRepository configRepository;
private SecretsRepositoryWriter secretsRepositoryWriter;
Expand Down Expand Up @@ -322,7 +336,10 @@ void createJob() throws JsonValidationException, ConfigNotFoundException, IOExce
@Test
@DisplayName("Test reset job creation")
void createResetJob() throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync standardSync = new StandardSync().withDestinationId(DESTINATION_ID);
Mockito.when(configRepository.getStandardSyncOperation(NORMALIZATION_OPERATION_ID)).thenReturn(NORMALIZATION_OPERATION);
Mockito.when(configRepository.getStandardSyncOperation(WEBHOOK_OPERATION_ID)).thenReturn(WEBHOOK_OPERATION);
final StandardSync standardSync =
new StandardSync().withDestinationId(DESTINATION_ID).withOperationIds(List.of(NORMALIZATION_OPERATION_ID, WEBHOOK_OPERATION_ID));
Mockito.when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync);
final DestinationConnection destination = new DestinationConnection()
.withDestinationId(DESTINATION_ID)
Expand All @@ -344,7 +361,7 @@ void createResetJob() throws JsonValidationException, ConfigNotFoundException, I
Mockito
.when(jobCreator.createResetConnectionJob(destination, standardSync, destinationDefinition, actorDefinitionVersion, DOCKER_IMAGE_NAME,
destinationVersion,
false, List.of(),
false, List.of(NORMALIZATION_OPERATION),
streamsToReset, WORKSPACE_ID))
.thenReturn(Optional.of(JOB_ID));

Expand Down

0 comments on commit 5c257c3

Please sign in to comment.