Skip to content

Commit

Permalink
add acceptance tests for schema management (#7209)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfsiega-airbyte committed Jul 19, 2023
1 parent 0357a1e commit d220ce1
Show file tree
Hide file tree
Showing 14 changed files with 582 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ public void applySchemaChangeForSource(final SourceAutoPropagateChange sourceAut
new ConnectionUpdate().connectionId(connectionRead.getConnectionId());

if (shouldAutoPropagate(diff, sourceAutoPropagateChange.getWorkspaceId(), connectionRead)) {
applySchemaChange(sourceAutoPropagateChange.getWorkspaceId(),
applySchemaChange(updateObject.getConnectionId(),
sourceAutoPropagateChange.getWorkspaceId(),
updateObject,
syncCatalog,
sourceAutoPropagateChange.getCatalog(),
Expand Down Expand Up @@ -594,6 +595,7 @@ private boolean shouldAutoPropagate(final CatalogDiff diff, final UUID workspace
}

private void applySchemaChange(final UUID connectionId,
final UUID workspaceId,
final ConnectionUpdate updateObject,
final io.airbyte.api.model.generated.AirbyteCatalog currentSyncCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog newCatalog,
Expand All @@ -606,7 +608,8 @@ private void applySchemaChange(final UUID connectionId,
currentSyncCatalog,
newCatalog,
transformations,
nonBreakingChangesPreference);
nonBreakingChangesPreference,
featureFlagClient, workspaceId);
updateObject.setSyncCatalog(catalog);
updateObject.setSourceCatalogId(sourceCatalogId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,34 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.generated.DestinationSyncMode;
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.SyncMode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.featureflag.AutoPropagateNewStreams;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Workspace;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.ws.rs.NotSupportedException;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Helper that allows to generate the catalogs to be auto propagated.
*/
@Slf4j
public class AutoPropagateSchemaChangeHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(AutoPropagateSchemaChangeHelper.class);
private enum DefaultSyncModeCase {
SOURCE_CURSOR_AND_PRIMARY_KEY,
SOURCE_CURSOR_NO_PRIMARY_KEY_SUPPORTS_FULL_REFRESH,
SOURCE_CURSOR_NO_PRIMARY_KEY_NO_FULL_REFRESH,
NO_SOURCE_CURSOR
}

/**
* This is auto propagating schema changes, it replaces the stream in the old catalog by using the
Expand All @@ -41,7 +50,9 @@ public class AutoPropagateSchemaChangeHelper {
public static AirbyteCatalog getUpdatedSchema(final AirbyteCatalog oldCatalog,
final AirbyteCatalog newCatalog,
final List<StreamTransform> transformations,
final NonBreakingChangesPreference nonBreakingChangesPreference) {
final NonBreakingChangesPreference nonBreakingChangesPreference,
final FeatureFlagClient featureFlagClient,
final UUID workspaceId) {
final AirbyteCatalog copiedOldCatalog = Jsons.clone(oldCatalog);
final Map<StreamDescriptor, AirbyteStreamAndConfiguration> oldCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(copiedOldCatalog);
final Map<StreamDescriptor, AirbyteStreamAndConfiguration> newCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(newCatalog);
Expand All @@ -57,7 +68,17 @@ public static AirbyteCatalog getUpdatedSchema(final AirbyteCatalog oldCatalog,
}
case ADD_STREAM -> {
if (nonBreakingChangesPreference.equals(NonBreakingChangesPreference.PROPAGATE_FULLY)) {
oldCatalogPerStream.put(streamDescriptor, newCatalogPerStream.get(streamDescriptor));
final var streamAndConfigurationToAdd = newCatalogPerStream.get(streamDescriptor);
if (featureFlagClient.boolVariation(AutoPropagateNewStreams.INSTANCE, new Workspace(workspaceId))) {
// If we're propagating it, we want to enable it! Otherwise, it'll just get dropped when we update
// the catalog.
streamAndConfigurationToAdd.getConfig()
.selected(true);
configureDefaultSyncModesForNewStream(streamAndConfigurationToAdd);
}
// TODO(mfsiega-airbyte): handle the case where the chosen sync mode isn't actually one of the
// supported sync modes.
oldCatalogPerStream.put(streamDescriptor, streamAndConfigurationToAdd);
}
}
case REMOVE_STREAM -> {
Expand All @@ -72,6 +93,36 @@ public static AirbyteCatalog getUpdatedSchema(final AirbyteCatalog oldCatalog,
return new AirbyteCatalog().streams(List.copyOf(oldCatalogPerStream.values()));
}

private static void configureDefaultSyncModesForNewStream(final AirbyteStreamAndConfiguration streamToAdd) {
// TODO(mfsiega-airbyte): unite this with the default config generation in the CatalogConverter.
final var stream = streamToAdd.getStream();
final var config = streamToAdd.getConfig();
final boolean hasSourceDefinedCursor = stream.getSourceDefinedCursor() != null && stream.getSourceDefinedCursor();
final boolean hasSourceDefinedPrimaryKey = stream.getSourceDefinedPrimaryKey() != null && !stream.getSourceDefinedPrimaryKey().isEmpty();
final boolean supportsFullRefresh = stream.getSupportedSyncModes().contains(SyncMode.FULL_REFRESH);
if (hasSourceDefinedCursor && hasSourceDefinedPrimaryKey) { // Source-defined cursor and primary key
config
.syncMode(SyncMode.INCREMENTAL)
.destinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.primaryKey(stream.getSourceDefinedPrimaryKey());
} else if (hasSourceDefinedCursor && supportsFullRefresh) { // Source-defined cursor but no primary key.
// NOTE: we prefer Full Refresh | Overwrite to avoid the risk of an Incremental | Append sync
// blowing up their destination.
config
.syncMode(SyncMode.FULL_REFRESH)
.destinationSyncMode(DestinationSyncMode.OVERWRITE);
} else if (hasSourceDefinedCursor) { // Source-defined cursor but no primary key *and* no full-refresh supported.
// If *only* incremental is supported, we go with it.
config
.syncMode(SyncMode.INCREMENTAL)
.destinationSyncMode(DestinationSyncMode.APPEND);
} else { // No source-defined cursor at all.
config
.syncMode(SyncMode.FULL_REFRESH)
.destinationSyncMode(DestinationSyncMode.OVERWRITE);
}
}

@VisibleForTesting
static Map<StreamDescriptor, AirbyteStreamAndConfiguration> extractStreamAndConfigPerStreamDescriptor(final AirbyteCatalog catalog) {
return catalog.getStreams().stream().collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.model.generated.AirbyteStream;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
Expand Down Expand Up @@ -1621,7 +1622,8 @@ void testAutoPropagateSchemaChangeAddStream() throws IOException, ConfigNotFound

final io.airbyte.api.model.generated.AirbyteCatalog catalogWithDiff =
CatalogConverter.toApi(Jsons.clone(airbyteCatalog), sourceVersion);
catalogWithDiff.addStreamsItem(new AirbyteStreamAndConfiguration().stream(new AirbyteStream().name(A_DIFFERENT_STREAM)));
catalogWithDiff.addStreamsItem(new AirbyteStreamAndConfiguration().stream(new AirbyteStream().name(A_DIFFERENT_STREAM))
.config(new AirbyteStreamConfiguration().selected(true)));

final SourceAutoPropagateChange request = new SourceAutoPropagateChange()
.sourceId(source.getSourceId())
Expand Down
Loading

0 comments on commit d220ce1

Please sign in to comment.