Skip to content

Commit

Permalink
Revert "Revert "remove outdated check flags (#8424)" (#8461)" (#8575)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Aug 29, 2023
1 parent e7d7270 commit 40a9c8e
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 285 deletions.
2 changes: 1 addition & 1 deletion airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ dependencies {
}

test {
maxHeapSize = '2g'
maxHeapSize = '4g'

useJUnitPlatform {
excludeTags("cloud-storage")
Expand Down
4 changes: 0 additions & 4 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ object CanonicalCatalogSchema : Temporary<Boolean>(key = "canonical-catalog-sche

object CatalogCanonicalJson : Temporary<Boolean>(key = "catalog-canonical-json", default = false)

object CheckConnectionUseApiEnabled : Temporary<Boolean>(key = "check-connection-use-api", default = false)

object CheckConnectionUseChildWorkflowEnabled : Temporary<Boolean>(key = "check-connection-use-child-workflow", default = false)

object ShouldRunOnGkeDataplane : Temporary<Boolean>(key = "should-run-on-gke-dataplane", default = false)

object ShouldRunOnExpandedGkeDataplane : Temporary<Boolean>(key = "should-run-on-expanded-gke-dataplane", default = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.check.connection.SubmitCheckConnectionActivity;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivity;
import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
Expand Down Expand Up @@ -70,7 +69,6 @@ public List<Object> connectionManagerActivities(
final WorkflowConfigActivity workflowConfigActivity,
final RouteToSyncTaskQueueActivity routeToTaskQueueActivity,
final FeatureFlagFetchActivity featureFlagFetchActivity,
final SubmitCheckConnectionActivity submitCheckConnectionActivity,
final CheckRunProgressActivity checkRunProgressActivity,
final RetryStatePersistenceActivity retryStatePersistenceActivity,
final AppendToAttemptLogActivity appendToAttemptLogActivity) {
Expand All @@ -84,7 +82,6 @@ public List<Object> connectionManagerActivities(
workflowConfigActivity,
routeToTaskQueueActivity,
featureFlagFetchActivity,
submitCheckConnectionActivity,
checkRunProgressActivity,
retryStatePersistenceActivity,
appendToAttemptLogActivity);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricTags;
Expand All @@ -43,9 +41,6 @@
import io.airbyte.workers.models.JobInput;
import io.airbyte.workers.models.SyncJobCheckConnectionInputs;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput;
import io.airbyte.workers.temporal.check.connection.SubmitCheckConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity;
import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity.LogInput;
import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity.LogLevel;
Expand Down Expand Up @@ -115,15 +110,12 @@
public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow {

private static final String GENERATE_CHECK_INPUT_TAG = "generate_check_input";
private static final String CHECK_WITH_API_TAG = "check_with_api";
private static final String CHECK_WITH_CHILD_WORKFLOW_TAG = "check_with_child_workflow";
private static final String SYNC_TASK_QUEUE_ROUTE_RENAME_TAG = "sync_task_queue_route_rename";
private static final String CHECK_RUN_PROGRESS_TAG = "check_run_progress";
private static final String NEW_RETRIES_TAG = "new_retries";
private static final String APPEND_ATTEMPT_LOG_TAG = "append_attempt_log";
private static final String DONT_FAIL_FOR_BACKOFF_SCHEDULE_CONFLICT_TAG = "dont_fail_for_backoff_schedule_conflict";
private static final int GENERATE_CHECK_INPUT_CURRENT_VERSION = 1;
private static final int CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION = 1;
private static final int SYNC_TASK_QUEUE_ROUTE_RENAME_CURRENT_VERSION = 1;
private static final int CHECK_RUN_PROGRESS_VERSION = 1;
private static final int NEW_RETRIES_VERSION = 1;
Expand All @@ -136,7 +128,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow

private static final String GET_FEATURE_FLAGS_TAG = "get_feature_flags";
private static final int GET_FEATURE_FLAGS_CURRENT_VERSION = 1;
private static final int CHECK_WITH_API_CURRENT_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private GenerateInputActivity getSyncInputActivity;
Expand All @@ -147,10 +138,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private AutoDisableConnectionActivity autoDisableConnectionActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private CheckConnectionActivity checkActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private SubmitCheckConnectionActivity submitCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private StreamResetActivity streamResetActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RecordMetricActivity recordMetricActivity;
Expand Down Expand Up @@ -302,7 +289,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
StandardSyncOutput standardSyncOutput = null;

try {
final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs, featureFlags);
final SyncCheckConnectionResult syncCheckConnectionResult = checkConnections(getJobRunConfig(), jobInputs);
if (syncCheckConnectionResult.isFailed()) {
final StandardSyncOutput checkFailureOutput = syncCheckConnectionResult.buildFailureOutput();
workflowState.setFailed(getFailStatus(checkFailureOutput));
Expand Down Expand Up @@ -517,10 +504,6 @@ private boolean shouldRunCheckInputGeneration() {
return generateCheckInputVersion >= GENERATE_CHECK_INPUT_CURRENT_VERSION;
}

private ConnectorJobOutput getCheckResponse(final CheckConnectionInput checkInput) {
return runMandatoryActivityWithOutput(checkActivity::runWithJobOutput, checkInput);
}

private SyncJobCheckConnectionInputs getCheckConnectionInputFromSync(final JobInput jobInputs) {
final StandardSyncInput syncInput = jobInputs.getSyncInput();
final JsonNode sourceConfig = syncInput.getSourceConfiguration();
Expand All @@ -546,8 +529,7 @@ private SyncJobCheckConnectionInputs getCheckConnectionInputFromSync(final JobIn
}

private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConfig,
@Nullable final JobInput jobInputs,
final Map<String, Boolean> featureFlags) {
@Nullable final JobInput jobInputs) {
final SyncCheckConnectionResult checkConnectionResult = new SyncCheckConnectionResult(jobRunConfig);

final JobCheckFailureInput jobStateInput =
Expand Down Expand Up @@ -576,29 +558,11 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf
} else {
log.info("SOURCE CHECK: Starting");
final ConnectorJobOutput sourceCheckResponse;
final int checkWithApiVersion =
Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION);
final int checkWithChildWorkflowVersion =
Workflow.getVersion(CHECK_WITH_CHILD_WORKFLOW_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION);
if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) {
sourceCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToSource,
checkInputs.getSourceCheckConnectionInput().getActorId());
} else if (checkWithChildWorkflowVersion >= CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION
&& featureFlags.get(CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey())) {
// Retrieve source definition;

sourceCheckResponse = runCheckInChildWorkflow(jobRunConfig, sourceLauncherConfig, new StandardCheckConnectionInput()
.withActorType(ActorType.SOURCE)
.withActorId(checkInputs.getSourceCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getSourceCheckConnectionInput().getConnectionConfiguration())
.withResourceRequirements(checkInputs.getSourceCheckConnectionInput().getResourceRequirements()));
} else {
final CheckConnectionInput checkSourceInput = new CheckConnectionInput(
jobRunConfig,
sourceLauncherConfig,
checkInputs.getSourceCheckConnectionInput());
sourceCheckResponse = getCheckResponse(checkSourceInput);
}
sourceCheckResponse = runCheckInChildWorkflow(jobRunConfig, sourceLauncherConfig, new StandardCheckConnectionInput()
.withActorType(ActorType.SOURCE)
.withActorId(checkInputs.getSourceCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getSourceCheckConnectionInput().getConnectionConfiguration())
.withResourceRequirements(checkInputs.getSourceCheckConnectionInput().getResourceRequirements()));

if (SyncCheckConnectionResult.isOutputFailed(sourceCheckResponse)) {
checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.SOURCE);
Expand All @@ -614,27 +578,11 @@ private SyncCheckConnectionResult checkConnections(final JobRunConfig jobRunConf
} else {
log.info("DESTINATION CHECK: Starting");
final ConnectorJobOutput destinationCheckResponse;
final int checkWithApiVersion =
Workflow.getVersion(CHECK_WITH_API_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_API_CURRENT_VERSION);
final int checkWithChildWorkflowVersion =
Workflow.getVersion(CHECK_WITH_CHILD_WORKFLOW_TAG, Workflow.DEFAULT_VERSION, CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION);
if (checkWithApiVersion >= CHECK_WITH_API_CURRENT_VERSION && featureFlags.get(CheckConnectionUseApiEnabled.INSTANCE.getKey())) {
destinationCheckResponse = runMandatoryActivityWithOutput(submitCheckActivity::submitCheckConnectionToDestination,
checkInputs.getDestinationCheckConnectionInput().getActorId());
} else if (checkWithChildWorkflowVersion >= CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION
&& featureFlags.get(CheckConnectionUseChildWorkflowEnabled.INSTANCE.getKey())) {
destinationCheckResponse = runCheckInChildWorkflow(jobRunConfig, checkInputs.getDestinationLauncherConfig(),
new StandardCheckConnectionInput()
.withActorType(ActorType.DESTINATION)
.withActorId(checkInputs.getDestinationCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getDestinationCheckConnectionInput().getConnectionConfiguration()));
} else {
final CheckConnectionInput checkDestinationInput = new CheckConnectionInput(
jobRunConfig,
checkInputs.getDestinationLauncherConfig(),
checkInputs.getDestinationCheckConnectionInput());
destinationCheckResponse = getCheckResponse(checkDestinationInput);
}
destinationCheckResponse = runCheckInChildWorkflow(jobRunConfig, checkInputs.getDestinationLauncherConfig(),
new StandardCheckConnectionInput()
.withActorType(ActorType.DESTINATION)
.withActorId(checkInputs.getDestinationCheckConnectionInput().getActorId())
.withConnectionConfiguration(checkInputs.getDestinationCheckConnectionInput().getConnectionConfiguration()));
if (SyncCheckConnectionResult.isOutputFailed(destinationCheckResponse)) {
checkConnectionResult.setFailureOrigin(FailureReason.FailureOrigin.DESTINATION);
checkConnectionResult.setFailureOutput(destinationCheckResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,8 @@
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Flag;
import io.airbyte.featureflag.Workspace;
import jakarta.inject.Singleton;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -28,12 +21,9 @@
public class FeatureFlagFetchActivityImpl implements FeatureFlagFetchActivity {

private final WorkspaceApi workspaceApi;
private final FeatureFlagClient featureFlagClient;

public FeatureFlagFetchActivityImpl(final WorkspaceApi workspaceApi,
final FeatureFlagClient featureFlagClient) {
public FeatureFlagFetchActivityImpl(final WorkspaceApi workspaceApi) {
this.workspaceApi = workspaceApi;
this.featureFlagClient = featureFlagClient;
}

/**
Expand All @@ -53,17 +43,9 @@ public UUID getWorkspaceId(final UUID connectionId) {

@Override
public FeatureFlagFetchOutput getFeatureFlags(final FeatureFlagFetchInput input) {
final UUID workspaceId = getWorkspaceId(input.getConnectionId());

// No feature flags are currently in use.
// To get value for a feature flag with the workspace context, add it to the workspaceFlags list.
final List<Flag<Boolean>> workspaceFlags = List.of(CheckConnectionUseApiEnabled.INSTANCE, CheckConnectionUseChildWorkflowEnabled.INSTANCE);
final Map<String, Boolean> featureFlags = new HashMap<>();
for (final Flag<Boolean> flag : workspaceFlags) {
featureFlags.put(flag.getKey(), featureFlagClient.boolVariation(flag, new Workspace(workspaceId)));
}

return new FeatureFlagFetchOutput(featureFlags);
return new FeatureFlagFetchOutput(new HashMap<>());
}

}
Loading

0 comments on commit 40a9c8e

Please sign in to comment.