From 9d31f8d6628f02fba5c799e736a65e9da6e7de1a Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Mon, 21 Aug 2023 11:52:04 -0700 Subject: [PATCH] remove outdated check flags (#8424) --- .../src/main/kotlin/FlagDefinitions.kt | 5 -- .../oauth/flows/HubspotOAuthFlowTest.java | 3 +- .../ConnectionManagerWorkflowImpl.java | 70 ++++--------------- .../FeatureFlagFetchActivityImpl.java | 22 +----- .../ConnectionManagerWorkflowTest.java | 8 --- .../FeatureFlagFetchActivityTest.java | 19 +---- 6 files changed, 19 insertions(+), 108 deletions(-) diff --git a/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt b/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt index 9d8979df4ac..0839442621f 100644 --- a/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt +++ b/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt @@ -40,11 +40,6 @@ object AutoPropagateSchema : Temporary(key = "autopropagation.enabled", object AutoPropagateNewStreams : Temporary(key = "autopropagate-new-streams.enabled", default = false) object CanonicalCatalogSchema : Temporary(key = "canonical-catalog-schema", default = false) - -object CheckConnectionUseApiEnabled : Temporary(key = "check-connection-use-api", default = false) - -object CheckConnectionUseChildWorkflowEnabled : Temporary(key = "check-connection-use-child-workflow", default = false) - object ShouldRunOnGkeDataplane : Temporary(key = "should-run-on-gke-dataplane", default = false) object ShouldRunOnExpandedGkeDataplane : Temporary(key = "should-run-on-expanded-gke-dataplane", default = false) diff --git a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/HubspotOAuthFlowTest.java b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/HubspotOAuthFlowTest.java index 1e5c0399111..252160d92b2 100644 --- a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/HubspotOAuthFlowTest.java +++ b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/HubspotOAuthFlowTest.java @@ -21,7 +21,7 @@ protected String getExpectedConsentUrl() { @Override void testGetSourceConsentUrlEmptyOAuthSpec() {} - + @Override void testGetSourceConsentUrl() {} @@ -30,4 +30,5 @@ void testGetDestinationConsentUrl() {} @Override void testGetDestinationConsentUrlEmptyOAuthSpec() {} + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 5b9fa785f38..08a03bd5e9c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -31,8 +31,6 @@ import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StandardSyncSummary.ReplicationStatus; -import io.airbyte.featureflag.CheckConnectionUseApiEnabled; -import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricTags; @@ -44,7 +42,6 @@ import io.airbyte.workers.models.SyncJobCheckConnectionInputs; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; -import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput; import io.airbyte.workers.temporal.check.connection.SubmitCheckConnectionActivity; import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity; import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity.LogInput; @@ -115,15 +112,12 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow { private static final String GENERATE_CHECK_INPUT_TAG = "generate_check_input"; - private static final String CHECK_WITH_API_TAG = "check_with_api"; - private static final String CHECK_WITH_CHILD_WORKFLOW_TAG = "check_with_child_workflow"; private static final String SYNC_TASK_QUEUE_ROUTE_RENAME_TAG = "sync_task_queue_route_rename"; private static final String CHECK_RUN_PROGRESS_TAG = "check_run_progress"; private static final String NEW_RETRIES_TAG = "new_retries"; private static final String APPEND_ATTEMPT_LOG_TAG = "append_attempt_log"; private static final String DONT_FAIL_FOR_BACKOFF_SCHEDULE_CONFLICT_TAG = "dont_fail_for_backoff_schedule_conflict"; private static final int GENERATE_CHECK_INPUT_CURRENT_VERSION = 1; - private static final int CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION = 1; private static final int SYNC_TASK_QUEUE_ROUTE_RENAME_CURRENT_VERSION = 1; private static final int CHECK_RUN_PROGRESS_VERSION = 1; private static final int NEW_RETRIES_VERSION = 1; @@ -136,7 +130,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String GET_FEATURE_FLAGS_TAG = "get_feature_flags"; private static final int GET_FEATURE_FLAGS_CURRENT_VERSION = 1; - private static final int CHECK_WITH_API_CURRENT_VERSION = 1; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private GenerateInputActivity getSyncInputActivity; @@ -302,7 +295,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn StandardSyncOutput standardSyncOutput = null; try { - final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs, featureFlags); + final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs); if (syncCheckConnectionResult.isFailed()) { final StandardSyncOutput checkFailureOutput = syncCheckConnectionResult.buildFailureOutput(); workflowState.setFailed(getFailStatus(checkFailureOutput)); @@ -517,10 +510,6 @@ private boolean shouldRunCheckInputGeneration() { return generateCheckInputVersion >= GENERATE_CHECK_INPUT_CURRENT_VERSION; } - private ConnectorJobOutput getCheckResponse(final CheckConnectionInput checkInput) { - return runMandatoryActivityWithOutput(checkActivity::runWithJobOutput, checkInput); - } - private SyncJobCheckConnectionInputs getCheckConnectionInputFromSync(final JobInput jobInputs) { final StandardSyncInput syncInput = jobInputs.getSyncInput(); final JsonNode sourceConfig = syncInput.getSourceConfiguration(); @@ -546,8 +535,7 @@ private SyncJobCheckConnectionInputs getCheckConnectionInputFromSync(final JobIn } private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConfig, - @Nullable final JobInput jobInputs, - final Map featureFlags) { + @Nullable final JobInput jobInputs) { final SyncCheckConnectionResult checkConnectionResult = new SyncCheckConnectionResult(jobRunConfig); final JobCheckFailureInput jobStateInput = @@ -576,29 +564,11 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf } else { log.info("SOURCE CHECK: Starting"); final ConnectorJobOutput sourceCheckResponse; - final int checkWithApiVersion = - Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION); - final int checkWithChildWorkflowVersion = - Workflow.getVersion(CHECK_WITH_CHILD_WORKFLOW_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION); - if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) { - sourceCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToSource, - checkInputs.getSourceCheckConnectionInput().getActorId()); - } else if (checkWithChildWorkflowVersion >= CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION - && featureFlags.get(CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey())) { - // Retrieve source definition; - - sourceCheckResponse = runCheckInChildWorkflow(jobRunConfig, sourceLauncherConfig, new StandardCheckConnectionInput() - .withActorType(ActorType.SOURCE) - .withActorId(checkInputs.getSourceCheckConnectionInput().getActorId()) - .withConnectionConfiguration(checkInputs.getSourceCheckConnectionInput().getConnectionConfiguration()) - .withResourceRequirements(checkInputs.getSourceCheckConnectionInput().getResourceRequirements())); - } else { - final CheckConnectionInput checkSourceInput = new CheckConnectionInput( - jobRunConfig, - sourceLauncherConfig, - checkInputs.getSourceCheckConnectionInput()); - sourceCheckResponse = getCheckResponse(checkSourceInput); - } + sourceCheckResponse = runCheckInChildWorkflow(jobRunConfig, sourceLauncherConfig, new StandardCheckConnectionInput() + .withActorType(ActorType.SOURCE) + .withActorId(checkInputs.getSourceCheckConnectionInput().getActorId()) + .withConnectionConfiguration(checkInputs.getSourceCheckConnectionInput().getConnectionConfiguration()) + .withResourceRequirements(checkInputs.getSourceCheckConnectionInput().getResourceRequirements())); if (SyncCheckConnectionResult.isOutputFailed(sourceCheckResponse)) { checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.SOURCE); @@ -614,27 +584,11 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf } else { log.info("DESTINATION CHECK: Starting"); final ConnectorJobOutput destinationCheckResponse; - final int checkWithApiVersion = - Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION); - final int checkWithChildWorkflowVersion = - Workflow.getVersion(CHECK_WITH_CHILD_WORKFLOW_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION); - if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) { - destinationCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToDestination, - checkInputs.getDestinationCheckConnectionInput().getActorId()); - } else if (checkWithChildWorkflowVersion >= CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION - && featureFlags.get(CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey())) { - destinationCheckResponse = runCheckInChildWorkflow(jobRunConfig, checkInputs.getDestinationLauncherConfig(), - new StandardCheckConnectionInput() - .withActorType(ActorType.DESTINATION) - .withActorId(checkInputs.getDestinationCheckConnectionInput().getActorId()) - .withConnectionConfiguration(checkInputs.getDestinationCheckConnectionInput().getConnectionConfiguration())); - } else { - final CheckConnectionInput checkDestinationInput = new CheckConnectionInput( - jobRunConfig, - checkInputs.getDestinationLauncherConfig(), - checkInputs.getDestinationCheckConnectionInput()); - destinationCheckResponse = getCheckResponse(checkDestinationInput); - } + destinationCheckResponse = runCheckInChildWorkflow(jobRunConfig, checkInputs.getDestinationLauncherConfig(), + new StandardCheckConnectionInput() + .withActorType(ActorType.DESTINATION) + .withActorId(checkInputs.getDestinationCheckConnectionInput().getActorId()) + .withConnectionConfiguration(checkInputs.getDestinationCheckConnectionInput().getConnectionConfiguration())); if (SyncCheckConnectionResult.isOutputFailed(destinationCheckResponse)) { checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.DESTINATION); checkConnectionResult.setFailureOutput(destinationCheckResponse); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java index 4383cf1a0fc..7860ffcbaf0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java @@ -8,15 +8,8 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.WorkspaceRead; -import io.airbyte.featureflag.CheckConnectionUseApiEnabled; -import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled; -import io.airbyte.featureflag.FeatureFlagClient; -import io.airbyte.featureflag.Flag; -import io.airbyte.featureflag.Workspace; import jakarta.inject.Singleton; import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -28,12 +21,9 @@ public class FeatureFlagFetchActivityImpl implements FeatureFlagFetchActivity { private final WorkspaceApi workspaceApi; - private final FeatureFlagClient featureFlagClient; - public FeatureFlagFetchActivityImpl(final WorkspaceApi workspaceApi, - final FeatureFlagClient featureFlagClient) { + public FeatureFlagFetchActivityImpl(final WorkspaceApi workspaceApi) { this.workspaceApi = workspaceApi; - this.featureFlagClient = featureFlagClient; } /** @@ -53,17 +43,9 @@ public UUID getWorkspaceId(final UUID connectionId) { @Override public FeatureFlagFetchOutput getFeatureFlags(final FeatureFlagFetchInput input) { - final UUID workspaceId = getWorkspaceId(input.getConnectionId()); - // No feature flags are currently in use. // To get value for a feature flag with the workspace context, add it to the workspaceFlags list. - final List> workspaceFlags = List.of(CheckConnectionUseApiEnabled.INSTANCE, CheckConnectionUseChildWorkflowEnabled.INSTANCE); - final Map featureFlags = new HashMap<>(); - for (final Flag flag : workspaceFlags) { - featureFlags.put(flag.getKey(), featureFlagClient.boolVariation(flag, new Workspace(workspaceId))); - } - - return new FeatureFlagFetchOutput(featureFlags); + return new FeatureFlagFetchOutput(new HashMap<>()); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 9cc367db42f..b673b210e75 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -31,8 +31,6 @@ import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; import io.airbyte.config.StandardSyncInput; -import io.airbyte.featureflag.CheckConnectionUseApiEnabled; -import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.models.JobInput; @@ -48,7 +46,6 @@ import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.GetMaxAttemptOutput; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput; import io.airbyte.workers.temporal.scheduling.activities.FeatureFlagFetchActivity; -import io.airbyte.workers.temporal.scheduling.activities.FeatureFlagFetchActivity.FeatureFlagFetchOutput; import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber; import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivityImpl; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity; @@ -98,7 +95,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -247,10 +243,6 @@ void setUp() throws Exception { when(mRouteToSyncTaskQueueActivity.routeToCheckConnection(Mockito.any())) .thenReturn(new RouteToSyncTaskQueueOutput(TemporalJobType.CHECK_CONNECTION.name())); - when(mFeatureFlagFetchActivity.getFeatureFlags(Mockito.any())) - .thenReturn(new FeatureFlagFetchOutput(Map.of(CheckConnectionUseApiEnabled.INSTANCE.getKey(), false, - CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey(), true))); - when(mCheckRunProgressActivity.checkProgress(Mockito.any())) .thenReturn(new CheckRunProgressActivity.Output(false)); // false == complete failure final var manager = RetryManager.builder().totalCompleteFailureLimit(1).build(); // just run once diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityTest.java index e6fb2b5e10a..49e92af8ec9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityTest.java @@ -5,16 +5,9 @@ package io.airbyte.workers.temporal.scheduling.activities; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import io.airbyte.api.client.generated.WorkspaceApi; -import io.airbyte.api.client.invoker.generated.ApiException; -import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; -import io.airbyte.api.client.model.generated.WorkspaceRead; -import io.airbyte.featureflag.CheckConnectionUseApiEnabled; -import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled; import io.airbyte.featureflag.FeatureFlagClient; -import io.airbyte.featureflag.TestClient; import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.Assertions; @@ -24,20 +17,15 @@ class FeatureFlagFetchActivityTest { private static final UUID CONNECTION_ID = UUID.randomUUID(); - private static final UUID WORKSPACE_ID = UUID.randomUUID(); FeatureFlagFetchActivity featureFlagFetchActivity; FeatureFlagClient featureFlagClient; @BeforeEach - void setUp() throws ApiException { + void setUp() { final WorkspaceApi workspaceApi = mock(WorkspaceApi.class); - featureFlagClient = mock(TestClient.class); - featureFlagFetchActivity = new FeatureFlagFetchActivityImpl(workspaceApi, featureFlagClient); - - when(workspaceApi.getWorkspaceByConnectionId(new ConnectionIdRequestBody().connectionId(CONNECTION_ID))) - .thenReturn(new WorkspaceRead().workspaceId(WORKSPACE_ID)); + featureFlagFetchActivity = new FeatureFlagFetchActivityImpl(workspaceApi); } @Test @@ -45,8 +33,7 @@ void testGetFeatureFlags() { final FeatureFlagFetchActivity.FeatureFlagFetchInput input = new FeatureFlagFetchActivity.FeatureFlagFetchInput(CONNECTION_ID); final FeatureFlagFetchActivity.FeatureFlagFetchOutput output = featureFlagFetchActivity.getFeatureFlags(input); - Assertions.assertEquals(output.getFeatureFlags(), Map.of(CheckConnectionUseApiEnabled.INSTANCE.getKey(), false, - CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey(), false)); + Assertions.assertEquals(output.getFeatureFlags(), Map.of()); }