Skip to content

Commit

Permalink
upgrade actors default versions based on breaking changes (#8492)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ella Rohm-Ensing committed Aug 23, 2023
1 parent 449b0de commit ebda57d
Show file tree
Hide file tree
Showing 20 changed files with 708 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.server.handlers;

import static io.airbyte.featureflag.ContextKt.ANONYMOUS;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.model.generated.ActorDefinitionIdWithScope;
import io.airbyte.api.model.generated.CustomDestinationDefinitionCreate;
Expand All @@ -23,6 +25,7 @@
import io.airbyte.commons.server.errors.IdNotFoundKnownException;
import io.airbyte.commons.server.errors.InternalServerKnownException;
import io.airbyte.commons.server.handlers.helpers.ActorDefinitionHandlerHelper;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.ActorType;
Expand All @@ -36,6 +39,7 @@
import io.airbyte.featureflag.DestinationDefinition;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.HideActorDefinitionFromList;
import io.airbyte.featureflag.IngestBreakingChanges;
import io.airbyte.featureflag.Multi;
import io.airbyte.featureflag.Workspace;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -262,11 +266,16 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
.withCustom(currentDestination.getCustom())
.withResourceRequirements(updatedResourceReqs);

final ActorDefinitionVersion newVersion = actorDefinitionHandlerHelper.defaultDefinitionVersionFromUpdate(currentVersion,
ActorType.DESTINATION, destinationDefinitionUpdate.getDockerImageTag(), currentDestination.getCustom());
final ActorDefinitionVersion newVersion = actorDefinitionHandlerHelper.defaultDefinitionVersionFromUpdate(
currentVersion, ActorType.DESTINATION, destinationDefinitionUpdate.getDockerImageTag(), currentDestination.getCustom());

final List<ActorDefinitionBreakingChange> breakingChangesForDef =
actorDefinitionHandlerHelper.getBreakingChanges(newVersion, ActorType.DESTINATION);
configRepository.writeDestinationDefinitionAndDefaultVersion(newDestination, newVersion, breakingChangesForDef);

actorDefinitionHandlerHelper.persistBreakingChanges(newVersion, ActorType.DESTINATION);
configRepository.writeDestinationDefinitionAndDefaultVersion(newDestination, newVersion);
if (featureFlagClient.boolVariation(IngestBreakingChanges.INSTANCE, new Workspace(ANONYMOUS))) {
configRepository.writeActorDefinitionBreakingChanges(breakingChangesForDef);
}
return buildDestinationDefinitionRead(newDestination, newVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.server.handlers;

import static io.airbyte.featureflag.ContextKt.ANONYMOUS;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.model.generated.ActorDefinitionIdWithScope;
import io.airbyte.api.model.generated.CustomSourceDefinitionCreate;
Expand All @@ -24,6 +26,7 @@
import io.airbyte.commons.server.errors.IdNotFoundKnownException;
import io.airbyte.commons.server.errors.InternalServerKnownException;
import io.airbyte.commons.server.handlers.helpers.ActorDefinitionHandlerHelper;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.ActorType;
Expand All @@ -36,6 +39,7 @@
import io.airbyte.config.specs.RemoteDefinitionsProvider;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.HideActorDefinitionFromList;
import io.airbyte.featureflag.IngestBreakingChanges;
import io.airbyte.featureflag.Multi;
import io.airbyte.featureflag.SourceDefinition;
import io.airbyte.featureflag.Workspace;
Expand Down Expand Up @@ -266,8 +270,12 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate
final ActorDefinitionVersion newVersion = actorDefinitionHandlerHelper.defaultDefinitionVersionFromUpdate(
currentVersion, ActorType.SOURCE, sourceDefinitionUpdate.getDockerImageTag(), currentSourceDefinition.getCustom());

actorDefinitionHandlerHelper.persistBreakingChanges(newVersion, ActorType.SOURCE);
configRepository.writeSourceDefinitionAndDefaultVersion(newSource, newVersion);
final List<ActorDefinitionBreakingChange> breakingChangesForDef = actorDefinitionHandlerHelper.getBreakingChanges(newVersion, ActorType.SOURCE);
configRepository.writeSourceDefinitionAndDefaultVersion(newSource, newVersion, breakingChangesForDef);

if (featureFlagClient.boolVariation(IngestBreakingChanges.INSTANCE, new Workspace(ANONYMOUS))) {
configRepository.writeActorDefinitionBreakingChanges(breakingChangesForDef);
}
return buildSourceDefinitionRead(newSource, newVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.commons.server.handlers.helpers;

import static io.airbyte.featureflag.ContextKt.ANONYMOUS;

import io.airbyte.commons.server.ServerConstants;
import io.airbyte.commons.server.converters.SpecFetcher;
import io.airbyte.commons.server.errors.UnsupportedProtocolVersionException;
Expand All @@ -21,11 +19,7 @@
import io.airbyte.config.ConnectorRegistrySourceDefinition;
import io.airbyte.config.helpers.ConnectorRegistryConverters;
import io.airbyte.config.persistence.ActorDefinitionVersionResolver;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.specs.RemoteDefinitionsProvider;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.IngestBreakingChanges;
import io.airbyte.featureflag.Workspace;
import io.airbyte.protocol.models.ConnectorSpecification;
import jakarta.inject.Singleton;
import java.io.IOException;
Expand All @@ -47,22 +41,16 @@ public class ActorDefinitionHandlerHelper {
private final SynchronousSchedulerClient synchronousSchedulerClient;
private final AirbyteProtocolVersionRange protocolVersionRange;
private final ActorDefinitionVersionResolver actorDefinitionVersionResolver;
private final ConfigRepository configRepository;
private final RemoteDefinitionsProvider remoteDefinitionsProvider;
private final FeatureFlagClient featureFlagClient;

public ActorDefinitionHandlerHelper(final SynchronousSchedulerClient synchronousSchedulerClient,
final AirbyteProtocolVersionRange airbyteProtocolVersionRange,
final ActorDefinitionVersionResolver actorDefinitionVersionResolver,
final ConfigRepository configRepository,
final RemoteDefinitionsProvider remoteDefinitionsProvider,
final FeatureFlagClient featureFlagClient) {
final RemoteDefinitionsProvider remoteDefinitionsProvider) {
this.synchronousSchedulerClient = synchronousSchedulerClient;
this.protocolVersionRange = airbyteProtocolVersionRange;
this.actorDefinitionVersionResolver = actorDefinitionVersionResolver;
this.configRepository = configRepository;
this.remoteDefinitionsProvider = remoteDefinitionsProvider;
this.featureFlagClient = featureFlagClient;
}

/**
Expand Down Expand Up @@ -175,11 +163,8 @@ private String getAndValidateProtocolVersionFromSpec(final ConnectorSpecificatio
* @param actorType - the actor type
* @throws IOException - if there is an error persisting the breaking changes
*/
public void persistBreakingChanges(final ActorDefinitionVersion actorDefinitionVersion, final ActorType actorType)
public List<ActorDefinitionBreakingChange> getBreakingChanges(final ActorDefinitionVersion actorDefinitionVersion, final ActorType actorType)
throws IOException {
if (!featureFlagClient.boolVariation(IngestBreakingChanges.INSTANCE, new Workspace(ANONYMOUS))) {
return;
}

final String connectorRepository = actorDefinitionVersion.getDockerRepository();
// We always want the most up-to-date version of the list breaking changes, in case they've been
Expand All @@ -200,9 +185,7 @@ public void persistBreakingChanges(final ActorDefinitionVersion actorDefinitionV
default -> throw new IllegalArgumentException("Actor type not supported: " + actorType);
}

if (breakingChanges.isPresent()) {
configRepository.writeActorDefinitionBreakingChanges(breakingChanges.get());
}
return breakingChanges.orElse(List.of());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.airbyte.commons.server.errors.IdNotFoundKnownException;
import io.airbyte.commons.server.errors.UnsupportedProtocolVersionException;
import io.airbyte.commons.server.handlers.helpers.ActorDefinitionHandlerHelper;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.ActorType;
Expand Down Expand Up @@ -73,6 +75,8 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class DestinationDefinitionsHandlerTest {

Expand Down Expand Up @@ -147,6 +151,16 @@ private ActorDefinitionVersion generateVersionFromDestinationDefinition(final St
.withAllowedHosts(new AllowedHosts().withHosts(List.of("host1", "host2")));
}

private List<ActorDefinitionBreakingChange> generateBreakingChangesFromDestinationDefinition(final StandardDestinationDefinition destDef) {
final ActorDefinitionBreakingChange breakingChange = new ActorDefinitionBreakingChange()
.withActorDefinitionId(destDef.getDestinationDefinitionId())
.withVersion(new Version("1.0.0"))
.withMessage("This is a breaking change")
.withMigrationDocumentationUrl("https://docs.airbyte.com/migration#1.0.0")
.withUpgradeDeadline("2025-01-21");
return List.of(breakingChange);
}

private ActorDefinitionVersion generateCustomVersionFromDestinationDefinition(final StandardDestinationDefinition destinationDefinition) {
return generateVersionFromDestinationDefinition(destinationDefinition)
.withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
Expand Down Expand Up @@ -699,9 +713,12 @@ void testCreateCustomDestinationDefinitionShouldCheckProtocolVersion() throws UR
verifyNoMoreInteractions(actorDefinitionHandlerHelper);
}

@Test
@ParameterizedTest
@ValueSource(booleans = {true, false})
@DisplayName("updateDestinationDefinition should correctly update a destinationDefinition")
void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonValidationException {
void testUpdateDestination(final boolean ingestBreakingChangesFF) throws ConfigNotFoundException, IOException, JsonValidationException {
when(featureFlagClient.boolVariation(IngestBreakingChanges.INSTANCE, new Workspace(ANONYMOUS))).thenReturn(ingestBreakingChangesFF);

when(configRepository.getStandardDestinationDefinition(destinationDefinition.getDestinationDefinitionId())).thenReturn(destinationDefinition);
when(configRepository.getActorDefinitionVersion(destinationDefinition.getDefaultVersionId()))
.thenReturn(destinationDefinitionVersion);
Expand All @@ -719,8 +736,10 @@ void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonVa
.withDockerImageTag(newDockerImageTag);

when(actorDefinitionHandlerHelper.defaultDefinitionVersionFromUpdate(destinationDefinitionVersion, ActorType.DESTINATION, newDockerImageTag,
destinationDefinition.getCustom()))
.thenReturn(updatedDestinationDefVersion);
destinationDefinition.getCustom())).thenReturn(updatedDestinationDefVersion);

final List<ActorDefinitionBreakingChange> breakingChanges = generateBreakingChangesFromDestinationDefinition(updatedDestination);
when(actorDefinitionHandlerHelper.getBreakingChanges(updatedDestinationDefVersion, ActorType.DESTINATION)).thenReturn(breakingChanges);

final DestinationDefinitionRead destinationRead =
destinationDefinitionsHandler.updateDestinationDefinition(
Expand All @@ -730,10 +749,11 @@ void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonVa
assertEquals(newDockerImageTag, destinationRead.getDockerImageTag());
verify(actorDefinitionHandlerHelper).defaultDefinitionVersionFromUpdate(destinationDefinitionVersion, ActorType.DESTINATION, newDockerImageTag,
destinationDefinition.getCustom());
verify(actorDefinitionHandlerHelper).persistBreakingChanges(updatedDestinationDefVersion, ActorType.DESTINATION);

verify(configRepository).writeDestinationDefinitionAndDefaultVersion(updatedDestination,
updatedDestinationDefVersion);
verify(actorDefinitionHandlerHelper).getBreakingChanges(updatedDestinationDefVersion, ActorType.DESTINATION);
verify(configRepository).writeDestinationDefinitionAndDefaultVersion(updatedDestination, updatedDestinationDefVersion, breakingChanges);
if (ingestBreakingChangesFF) {
verify(configRepository).writeActorDefinitionBreakingChanges(breakingChanges);
}

verifyNoMoreInteractions(actorDefinitionHandlerHelper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.server.handlers;

import static io.airbyte.featureflag.ContextKt.ANONYMOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -38,6 +39,8 @@
import io.airbyte.commons.server.errors.IdNotFoundKnownException;
import io.airbyte.commons.server.errors.UnsupportedProtocolVersionException;
import io.airbyte.commons.server.handlers.helpers.ActorDefinitionHandlerHelper;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.ActorType;
Expand All @@ -53,6 +56,7 @@
import io.airbyte.config.specs.RemoteDefinitionsProvider;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.HideActorDefinitionFromList;
import io.airbyte.featureflag.IngestBreakingChanges;
import io.airbyte.featureflag.Multi;
import io.airbyte.featureflag.SourceDefinition;
import io.airbyte.featureflag.TestClient;
Expand All @@ -72,6 +76,8 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SourceDefinitionsHandlerTest {

Expand Down Expand Up @@ -136,6 +142,16 @@ private ActorDefinitionVersion generateVersionFromSourceDefinition(final Standar
.withSuggestedStreams(new SuggestedStreams().withStreams(List.of("stream1", "stream2")));
}

private List<ActorDefinitionBreakingChange> generateBreakingChangesFromSourceDefinition(final StandardSourceDefinition sourceDefinition) {
final ActorDefinitionBreakingChange breakingChange = new ActorDefinitionBreakingChange()
.withActorDefinitionId(sourceDefinition.getSourceDefinitionId())
.withVersion(new Version("1.0.0"))
.withMessage("This is a breaking change")
.withMigrationDocumentationUrl("https://docs.airbyte.com/migration#1.0.0")
.withUpgradeDeadline("2025-01-21");
return List.of(breakingChange);
}

private ActorDefinitionVersion generateCustomVersionFromSourceDefinition(final StandardSourceDefinition sourceDefinition) {
return generateVersionFromSourceDefinition(sourceDefinition)
.withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
Expand Down Expand Up @@ -630,9 +646,12 @@ void testCreateCustomSourceDefinitionShouldCheckProtocolVersion() throws URISynt
verifyNoMoreInteractions(actorDefinitionHandlerHelper);
}

@Test
@ParameterizedTest
@ValueSource(booleans = {true, false})
@DisplayName("updateSourceDefinition should correctly update a sourceDefinition")
void testUpdateSource() throws ConfigNotFoundException, IOException, JsonValidationException {
void testUpdateSource(final boolean ingestBreakingChangesFF) throws ConfigNotFoundException, IOException, JsonValidationException {
when(featureFlagClient.boolVariation(IngestBreakingChanges.INSTANCE, new Workspace(ANONYMOUS))).thenReturn(ingestBreakingChangesFF);

when(configRepository.getStandardSourceDefinition(sourceDefinition.getSourceDefinitionId())).thenReturn(sourceDefinition);
when(configRepository.getActorDefinitionVersion(sourceDefinition.getDefaultVersionId()))
.thenReturn(sourceDefinitionVersion);
Expand All @@ -650,8 +669,10 @@ void testUpdateSource() throws ConfigNotFoundException, IOException, JsonValidat
.withDockerImageTag(newDockerImageTag);

when(actorDefinitionHandlerHelper.defaultDefinitionVersionFromUpdate(sourceDefinitionVersion, ActorType.SOURCE, newDockerImageTag,
sourceDefinition.getCustom()))
.thenReturn(updatedSourceDefVersion);
sourceDefinition.getCustom())).thenReturn(updatedSourceDefVersion);

final List<ActorDefinitionBreakingChange> breakingChanges = generateBreakingChangesFromSourceDefinition(updatedSource);
when(actorDefinitionHandlerHelper.getBreakingChanges(updatedSourceDefVersion, ActorType.SOURCE)).thenReturn(breakingChanges);

final SourceDefinitionRead sourceRead =
sourceDefinitionsHandler.updateSourceDefinition(
Expand All @@ -661,9 +682,11 @@ void testUpdateSource() throws ConfigNotFoundException, IOException, JsonValidat
assertEquals(newDockerImageTag, sourceRead.getDockerImageTag());
verify(actorDefinitionHandlerHelper).defaultDefinitionVersionFromUpdate(sourceDefinitionVersion, ActorType.SOURCE, newDockerImageTag,
sourceDefinition.getCustom());
verify(actorDefinitionHandlerHelper).persistBreakingChanges(updatedSourceDefVersion, ActorType.SOURCE);
verify(configRepository).writeSourceDefinitionAndDefaultVersion(updatedSource,
updatedSourceDefVersion);
verify(actorDefinitionHandlerHelper).getBreakingChanges(updatedSourceDefVersion, ActorType.SOURCE);
verify(configRepository).writeSourceDefinitionAndDefaultVersion(updatedSource, updatedSourceDefVersion, breakingChanges);
if (ingestBreakingChangesFF) {
verify(configRepository).writeActorDefinitionBreakingChanges(breakingChanges);
}

verifyNoMoreInteractions(actorDefinitionHandlerHelper);
}
Expand Down
Loading

0 comments on commit ebda57d

Please sign in to comment.