Skip to content

Commit

Permalink
remove outdated check flags (#8424)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Aug 21, 2023
1 parent bd4ae94 commit 9d31f8d
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 108 deletions.
5 changes: 0 additions & 5 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ object AutoPropagateSchema : Temporary<Boolean>(key = "autopropagation.enabled",
object AutoPropagateNewStreams : Temporary<Boolean>(key = "autopropagate-new-streams.enabled", default = false)

object CanonicalCatalogSchema : Temporary<Boolean>(key = "canonical-catalog-schema", default = false)

object CheckConnectionUseApiEnabled : Temporary<Boolean>(key = "check-connection-use-api", default = false)

object CheckConnectionUseChildWorkflowEnabled : Temporary<Boolean>(key = "check-connection-use-child-workflow", default = false)

object ShouldRunOnGkeDataplane : Temporary<Boolean>(key = "should-run-on-gke-dataplane", default = false)

object ShouldRunOnExpandedGkeDataplane : Temporary<Boolean>(key = "should-run-on-expanded-gke-dataplane", default = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ protected String getExpectedConsentUrl() {

@Override
void testGetSourceConsentUrlEmptyOAuthSpec() {}

@Override
void testGetSourceConsentUrl() {}

Expand All @@ -30,4 +30,5 @@ void testGetDestinationConsentUrl() {}

@Override
void testGetDestinationConsentUrlEmptyOAuthSpec() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricTags;
Expand All @@ -44,7 +42,6 @@
import io.airbyte.workers.models.SyncJobCheckConnectionInputs;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput;
import io.airbyte.workers.temporal.check.connection.SubmitCheckConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity;
import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity.LogInput;
Expand Down Expand Up @@ -115,15 +112,12 @@
public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow {

private static final String GENERATE_CHECK_INPUT_TAG = "generate_check_input";
private static final String CHECK_WITH_API_TAG = "check_with_api";
private static final String CHECK_WITH_CHILD_WORKFLOW_TAG = "check_with_child_workflow";
private static final String SYNC_TASK_QUEUE_ROUTE_RENAME_TAG = "sync_task_queue_route_rename";
private static final String CHECK_RUN_PROGRESS_TAG = "check_run_progress";
private static final String NEW_RETRIES_TAG = "new_retries";
private static final String APPEND_ATTEMPT_LOG_TAG = "append_attempt_log";
private static final String DONT_FAIL_FOR_BACKOFF_SCHEDULE_CONFLICT_TAG = "dont_fail_for_backoff_schedule_conflict";
private static final int GENERATE_CHECK_INPUT_CURRENT_VERSION = 1;
private static final int CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION = 1;
private static final int SYNC_TASK_QUEUE_ROUTE_RENAME_CURRENT_VERSION = 1;
private static final int CHECK_RUN_PROGRESS_VERSION = 1;
private static final int NEW_RETRIES_VERSION = 1;
Expand All @@ -136,7 +130,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow

private static final String GET_FEATURE_FLAGS_TAG = "get_feature_flags";
private static final int GET_FEATURE_FLAGS_CURRENT_VERSION = 1;
private static final int CHECK_WITH_API_CURRENT_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private GenerateInputActivity getSyncInputActivity;
Expand Down Expand Up @@ -302,7 +295,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
StandardSyncOutput standardSyncOutput = null;

try {
final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs, featureFlags);
final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs);
if (syncCheckConnectionResult.isFailed()) {
final StandardSyncOutput checkFailureOutput = syncCheckConnectionResult.buildFailureOutput();
workflowState.setFailed(getFailStatus(checkFailureOutput));
Expand Down Expand Up @@ -517,10 +510,6 @@ private boolean shouldRunCheckInputGeneration() {
return generateCheckInputVersion >= GENERATE_CHECK_INPUT_CURRENT_VERSION;
}

private ConnectorJobOutput getCheckResponse(final CheckConnectionInput checkInput) {
return runMandatoryActivityWithOutput(checkActivity::runWithJobOutput, checkInput);
}

private SyncJobCheckConnectionInputs getCheckConnectionInputFromSync(final JobInput jobInputs) {
final StandardSyncInput syncInput = jobInputs.getSyncInput();
final JsonNode sourceConfig = syncInput.getSourceConfiguration();
Expand All @@ -546,8 +535,7 @@ private SyncJobCheckConnectionInputs getCheckConnectionInputFromSync(final JobIn
}

private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConfig,
@Nullable final JobInput jobInputs,
final Map<String, Boolean> featureFlags) {
@Nullable final JobInput jobInputs) {
final SyncCheckConnectionResult checkConnectionResult = new SyncCheckConnectionResult(jobRunConfig);

final JobCheckFailureInput jobStateInput =
Expand Down Expand Up @@ -576,29 +564,11 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf
} else {
log.info("SOURCE CHECK: Starting");
final ConnectorJobOutput sourceCheckResponse;
final int checkWithApiVersion =
Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION);
final int checkWithChildWorkflowVersion =
Workflow.getVersion(CHECK_WITH_CHILD_WORKFLOW_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION);
if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) {
sourceCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToSource,
checkInputs.getSourceCheckConnectionInput().getActorId());
} else if (checkWithChildWorkflowVersion >= CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION
&& featureFlags.get(CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey())) {
// Retrieve source definition;

sourceCheckResponse = runCheckInChildWorkflow(jobRunConfig, sourceLauncherConfig, new StandardCheckConnectionInput()
.withActorType(ActorType.SOURCE)
.withActorId(checkInputs.getSourceCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getSourceCheckConnectionInput().getConnectionConfiguration())
.withResourceRequirements(checkInputs.getSourceCheckConnectionInput().getResourceRequirements()));
} else {
final CheckConnectionInput checkSourceInput = new CheckConnectionInput(
jobRunConfig,
sourceLauncherConfig,
checkInputs.getSourceCheckConnectionInput());
sourceCheckResponse = getCheckResponse(checkSourceInput);
}
sourceCheckResponse = runCheckInChildWorkflow(jobRunConfig, sourceLauncherConfig, new StandardCheckConnectionInput()
.withActorType(ActorType.SOURCE)
.withActorId(checkInputs.getSourceCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getSourceCheckConnectionInput().getConnectionConfiguration())
.withResourceRequirements(checkInputs.getSourceCheckConnectionInput().getResourceRequirements()));

if (SyncCheckConnectionResult.isOutputFailed(sourceCheckResponse)) {
checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.SOURCE);
Expand All @@ -614,27 +584,11 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf
} else {
log.info("DESTINATION CHECK: Starting");
final ConnectorJobOutput destinationCheckResponse;
final int checkWithApiVersion =
Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION);
final int checkWithChildWorkflowVersion =
Workflow.getVersion(CHECK_WITH_CHILD_WORKFLOW_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION);
if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) {
destinationCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToDestination,
checkInputs.getDestinationCheckConnectionInput().getActorId());
} else if (checkWithChildWorkflowVersion >= CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION
&& featureFlags.get(CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey())) {
destinationCheckResponse = runCheckInChildWorkflow(jobRunConfig, checkInputs.getDestinationLauncherConfig(),
new StandardCheckConnectionInput()
.withActorType(ActorType.DESTINATION)
.withActorId(checkInputs.getDestinationCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getDestinationCheckConnectionInput().getConnectionConfiguration()));
} else {
final CheckConnectionInput checkDestinationInput = new CheckConnectionInput(
jobRunConfig,
checkInputs.getDestinationLauncherConfig(),
checkInputs.getDestinationCheckConnectionInput());
destinationCheckResponse = getCheckResponse(checkDestinationInput);
}
destinationCheckResponse = runCheckInChildWorkflow(jobRunConfig, checkInputs.getDestinationLauncherConfig(),
new StandardCheckConnectionInput()
.withActorType(ActorType.DESTINATION)
.withActorId(checkInputs.getDestinationCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getDestinationCheckConnectionInput().getConnectionConfiguration()));
if (SyncCheckConnectionResult.isOutputFailed(destinationCheckResponse)) {
checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.DESTINATION);
checkConnectionResult.setFailureOutput(destinationCheckResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,8 @@
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Flag;
import io.airbyte.featureflag.Workspace;
import jakarta.inject.Singleton;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -28,12 +21,9 @@
public class FeatureFlagFetchActivityImpl implements FeatureFlagFetchActivity {

private final WorkspaceApi workspaceApi;
private final FeatureFlagClient featureFlagClient;

public FeatureFlagFetchActivityImpl(final WorkspaceApi workspaceApi,
final FeatureFlagClient featureFlagClient) {
public FeatureFlagFetchActivityImpl(final WorkspaceApi workspaceApi) {
this.workspaceApi = workspaceApi;
this.featureFlagClient = featureFlagClient;
}

/**
Expand All @@ -53,17 +43,9 @@ public UUID getWorkspaceId(final UUID connectionId) {

@Override
public FeatureFlagFetchOutput getFeatureFlags(final FeatureFlagFetchInput input) {
final UUID workspaceId = getWorkspaceId(input.getConnectionId());

// No feature flags are currently in use.
// To get value for a feature flag with the workspace context, add it to the workspaceFlags list.
final List<Flag<Boolean>> workspaceFlags = List.of(CheckConnectionUseApiEnabled.INSTANCE, CheckConnectionUseChildWorkflowEnabled.INSTANCE);
final Map<String, Boolean> featureFlags = new HashMap<>();
for (final Flag<Boolean> flag : workspaceFlags) {
featureFlags.put(flag.getKey(), featureFlagClient.boolVariation(flag, new Workspace(workspaceId)));
}

return new FeatureFlagFetchOutput(featureFlags);
return new FeatureFlagFetchOutput(new HashMap<>());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.models.JobInput;
Expand All @@ -48,7 +46,6 @@
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.GetMaxAttemptOutput;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
import io.airbyte.workers.temporal.scheduling.activities.FeatureFlagFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.FeatureFlagFetchActivity.FeatureFlagFetchOutput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivityImpl;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
Expand Down Expand Up @@ -98,7 +95,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -247,10 +243,6 @@ void setUp() throws Exception {
when(mRouteToSyncTaskQueueActivity.routeToCheckConnection(Mockito.any()))
.thenReturn(new RouteToSyncTaskQueueOutput(TemporalJobType.CHECK_CONNECTION.name()));

when(mFeatureFlagFetchActivity.getFeatureFlags(Mockito.any()))
.thenReturn(new FeatureFlagFetchOutput(Map.of(CheckConnectionUseApiEnabled.INSTANCE.getKey(), false,
CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey(), true)));

when(mCheckRunProgressActivity.checkProgress(Mockito.any()))
.thenReturn(new CheckRunProgressActivity.Output(false)); // false == complete failure
final var manager = RetryManager.builder().totalCompleteFailureLimit(1).build(); // just run once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,9 @@
package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Assertions;
Expand All @@ -24,29 +17,23 @@
class FeatureFlagFetchActivityTest {

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final UUID WORKSPACE_ID = UUID.randomUUID();

FeatureFlagFetchActivity featureFlagFetchActivity;
FeatureFlagClient featureFlagClient;

@BeforeEach
void setUp() throws ApiException {
void setUp() {
final WorkspaceApi workspaceApi = mock(WorkspaceApi.class);

featureFlagClient = mock(TestClient.class);
featureFlagFetchActivity = new FeatureFlagFetchActivityImpl(workspaceApi, featureFlagClient);

when(workspaceApi.getWorkspaceByConnectionId(new ConnectionIdRequestBody().connectionId(CONNECTION_ID)))
.thenReturn(new WorkspaceRead().workspaceId(WORKSPACE_ID));
featureFlagFetchActivity = new FeatureFlagFetchActivityImpl(workspaceApi);
}

@Test
void testGetFeatureFlags() {
final FeatureFlagFetchActivity.FeatureFlagFetchInput input = new FeatureFlagFetchActivity.FeatureFlagFetchInput(CONNECTION_ID);

final FeatureFlagFetchActivity.FeatureFlagFetchOutput output = featureFlagFetchActivity.getFeatureFlags(input);
Assertions.assertEquals(output.getFeatureFlags(), Map.of(CheckConnectionUseApiEnabled.INSTANCE.getKey(), false,
CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey(), false));
Assertions.assertEquals(output.getFeatureFlags(), Map.of());

}

Expand Down

0 comments on commit 9d31f8d

Please sign in to comment.