Skip to content

Commit

Permalink
Rbroughan/destination overrides (#8531)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Aug 25, 2023
1 parent f0f15f5 commit 2d9d461
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 9 deletions.
2 changes: 2 additions & 0 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,6 @@ object FieldSelectionWorkspaces : EnvVar(envVar = "FIELD_SELECTION_WORKSPACES")
object ConnectorOAuthConsentDisabled : Permanent<Boolean>(key = "connectors.oauth.disableOAuthConsent", default = false)

object AddSchedulingJitter : Temporary<Boolean>(key = "platform.add-scheduling-jitter", default = false)

object DestResourceOverrides : Temporary<String>(key = "dest-resource-overrides", default = "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.airbyte.featureflag.Destination;
import io.airbyte.featureflag.DestinationDefinition;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.FieldSelectionWorkspaces.DestResourceOverrides;
import io.airbyte.featureflag.Multi;
import io.airbyte.featureflag.Source;
import io.airbyte.featureflag.SourceDefinition;
Expand Down Expand Up @@ -174,15 +175,17 @@ private SyncResourceRequirements getSyncResourceRequirements(final UUID workspac
final StandardSourceDefinition sourceDefinition,
final StandardDestinationDefinition destinationDefinition,
final boolean isReset) {
final String variant = getResourceRequirementsVariant(workspaceId, standardSync, sourceDefinition, destinationDefinition);
final var ffContext = buildFeatureFlagContext(workspaceId, standardSync, sourceDefinition, destinationDefinition);
final String variant = featureFlagClient.stringVariation(UseResourceRequirementsVariant.INSTANCE, ffContext);

// Note on use of sourceType, throughput is driven by the source, if the source is slow, the rest is
// going to be slow. With this in mind, we align the resources given to the orchestrator and the
// destination based on the source to avoid oversizing orchestrator and destination when the source
// is slow.
final Optional<String> sourceType = getSourceType(sourceDefinition);
final ResourceRequirements mergedOrchestratorResourceReq = getOrchestratorResourceRequirements(standardSync, sourceType, variant);
final ResourceRequirements mergedDstResourceReq = getDestinationResourceRequirements(standardSync, destinationDefinition, sourceType, variant);
final ResourceRequirements mergedDstResourceReq =
getDestinationResourceRequirements(standardSync, destinationDefinition, sourceType, variant, ffContext);

final var syncResourceRequirements = new SyncResourceRequirements()
.withConfigKey(new SyncResourceRequirementsKey().withVariant(variant).withSubType(sourceType.orElse(null)))
Expand All @@ -204,10 +207,10 @@ private SyncResourceRequirements getSyncResourceRequirements(final UUID workspac
return syncResourceRequirements;
}

private String getResourceRequirementsVariant(final UUID workspaceId,
final StandardSync standardSync,
final StandardSourceDefinition sourceDefinition,
final StandardDestinationDefinition destinationDefinition) {
private Context buildFeatureFlagContext(final UUID workspaceId,
final StandardSync standardSync,
final StandardSourceDefinition sourceDefinition,
final StandardDestinationDefinition destinationDefinition) {
final List<Context> contextList = new ArrayList<>();
addIfNotNull(contextList, workspaceId, Workspace::new);
addIfNotNull(contextList, standardSync.getConnectionId(), Connection::new);
Expand All @@ -216,7 +219,7 @@ private String getResourceRequirementsVariant(final UUID workspaceId,
addIfNotNull(contextList, sourceDefinition != null ? sourceDefinition.getSourceDefinitionId() : null, SourceDefinition::new);
addIfNotNull(contextList, standardSync.getDestinationId(), Destination::new);
addIfNotNull(contextList, destinationDefinition.getDestinationDefinitionId(), DestinationDefinition::new);
return featureFlagClient.stringVariation(UseResourceRequirementsVariant.INSTANCE, new Multi(contextList));
return new Multi(contextList);
}

private static void addIfNotNull(final List<Context> contextList, final UUID uuid, final Function<UUID, Context> supplier) {
Expand Down Expand Up @@ -247,17 +250,34 @@ private ResourceRequirements getSourceResourceRequirements(final StandardSync st
JobType.SYNC);
}

private ResourceRequirements getDestinationResourceOverrides(final Context ffCtx) {
final String destOverrides = featureFlagClient.stringVariation(DestResourceOverrides.INSTANCE, ffCtx);
try {
return ResourceRequirementsUtils.parse(destOverrides);
} catch (final Exception e) {
log.warn("Could not parse DESTINATION resource overrides from feature flag string: '{}'", destOverrides);
log.warn("Error parsing DESTINATION resource overrides: {}", e.getMessage());
return null;
}
}

private ResourceRequirements getDestinationResourceRequirements(final StandardSync standardSync,
final StandardDestinationDefinition destinationDefinition,
final Optional<String> sourceType,
final String variant) {
final String variant,
final Context ffContext) {
final ResourceRequirements defaultDstRssReqs =
resourceRequirementsProvider.getResourceRequirements(ResourceRequirementsType.DESTINATION, sourceType, variant);
return ResourceRequirementsUtils.getResourceRequirements(

final var mergedRssReqs = ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
destinationDefinition.getResourceRequirements(),
defaultDstRssReqs,
JobType.SYNC);

final var overrides = getDestinationResourceOverrides(ffContext);

return ResourceRequirementsUtils.getResourceRequirements(overrides, mergedRssReqs);
}

private Optional<String> getSourceType(final StandardSourceDefinition sourceDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.persistence.job;

import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.JobTypeResourceLimit;
import io.airbyte.config.JobTypeResourceLimit.JobType;
Expand Down Expand Up @@ -137,4 +138,19 @@ public static Optional<ResourceRequirements> getResourceRequirementsForJobType(f
: Optional.of(jobTypeResourceRequirement.get(0));
}

/**
* Utility for deserializing from a raw json string.
*
* @param rawOverrides A json string to be parsed.
* @return ResourceRequirements parsed from the string.
*/
public static ResourceRequirements parse(final String rawOverrides) {
if (rawOverrides.isEmpty()) {
return null;
}

final var json = Jsons.deserialize(rawOverrides);
return Jsons.object(json, ResourceRequirements.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.airbyte.config.SyncResourceRequirements;
import io.airbyte.config.SyncResourceRequirementsKey;
import io.airbyte.config.provider.ResourceRequirementsProvider;
import io.airbyte.featureflag.FieldSelectionWorkspaces.DestResourceOverrides;
import io.airbyte.featureflag.TestClient;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand All @@ -51,12 +52,21 @@
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.ArgumentCaptor;

@SuppressWarnings("PMD.AvoidDuplicateLiterals")
class DefaultJobCreatorTest {

private static final String DEFAULT_VARIANT = "default";
Expand Down Expand Up @@ -507,6 +517,131 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
verify(jobPersistence, times(1)).enqueueJob(expectedScope, expectedJobConfig);
}

@ParameterizedTest
@MethodSource("resourceOverrideMatrix")
void testDestinationResourceReqsOverrides(final String cpuReqOverride,
final String cpuLimitOverride,
final String memReqOverride,
final String memLimitOverride)
throws IOException {
final var overrides = new HashMap<>();
if (cpuReqOverride != null) {
overrides.put("cpu_request", cpuReqOverride);
}
if (cpuLimitOverride != null) {
overrides.put("cpu_limit", cpuLimitOverride);
}
if (memReqOverride != null) {
overrides.put("memory_request", memReqOverride);
}
if (memLimitOverride != null) {
overrides.put("memory_limit", memLimitOverride);
}

final ResourceRequirements originalReqs = new ResourceRequirements()
.withCpuLimit("0.8")
.withCpuRequest("0.8")
.withMemoryLimit("800Mi")
.withMemoryRequest("800Mi");

final var jobCreator = new DefaultJobCreator(jobPersistence, resourceRequirementsProvider,
new TestClient(Map.of(DestResourceOverrides.INSTANCE.getKey(), Jsons.serialize(overrides))));

jobCreator.createSyncJob(
SOURCE_CONNECTION,
DESTINATION_CONNECTION,
STANDARD_SYNC,
SOURCE_IMAGE_NAME,
SOURCE_PROTOCOL_VERSION,
DESTINATION_IMAGE_NAME,
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)),
new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(originalReqs)))),
SOURCE_DEFINITION_VERSION,
DESTINATION_DEFINITION_VERSION,
WORKSPACE_ID);

final ArgumentCaptor<JobConfig> configCaptor = ArgumentCaptor.forClass(JobConfig.class);
verify(jobPersistence, times(1)).enqueueJob(any(), configCaptor.capture());
final var destConfigValues = configCaptor.getValue().getSync().getSyncResourceRequirements().getDestination();

final var expectedCpuReq = StringUtils.isNotBlank(cpuReqOverride) ? cpuReqOverride : originalReqs.getCpuRequest();
assertEquals(expectedCpuReq, destConfigValues.getCpuRequest());

final var expectedCpuLimit = StringUtils.isNotBlank(cpuLimitOverride) ? cpuLimitOverride : originalReqs.getCpuLimit();
assertEquals(expectedCpuLimit, destConfigValues.getCpuLimit());

final var expectedMemReq = StringUtils.isNotBlank(memReqOverride) ? memReqOverride : originalReqs.getMemoryRequest();
assertEquals(expectedMemReq, destConfigValues.getMemoryRequest());

final var expectedMemLimit = StringUtils.isNotBlank(memLimitOverride) ? memLimitOverride : originalReqs.getMemoryLimit();
assertEquals(expectedMemLimit, destConfigValues.getMemoryLimit());
}

private static Stream<Arguments> resourceOverrideMatrix() {
return Stream.of(
Arguments.of("0.7", "0.4", "1000Mi", "2000Mi"),
Arguments.of("0.3", null, "1000Mi", null),
Arguments.of(null, null, null, null),
Arguments.of(null, "0.4", null, null),
Arguments.of("3", "3", "3000Mi", "3000Mi"),
Arguments.of("4", "5", "6000Mi", "7000Mi"));
}

@ParameterizedTest
@MethodSource("weirdnessOverrideMatrix")
void ignoresOverridesIfJsonStringWeird(final String weirdness) throws IOException {
final ResourceRequirements originalReqs = new ResourceRequirements()
.withCpuLimit("0.8")
.withCpuRequest("0.8")
.withMemoryLimit("800Mi")
.withMemoryRequest("800Mi");

final var jobCreator = new DefaultJobCreator(jobPersistence, resourceRequirementsProvider,
new TestClient(Map.of(DestResourceOverrides.INSTANCE.getKey(), Jsons.serialize(weirdness))));

jobCreator.createSyncJob(
SOURCE_CONNECTION,
DESTINATION_CONNECTION,
STANDARD_SYNC,
SOURCE_IMAGE_NAME,
SOURCE_PROTOCOL_VERSION,
DESTINATION_IMAGE_NAME,
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)),
new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(originalReqs)))),
SOURCE_DEFINITION_VERSION,
DESTINATION_DEFINITION_VERSION,
WORKSPACE_ID);

final ArgumentCaptor<JobConfig> configCaptor = ArgumentCaptor.forClass(JobConfig.class);
verify(jobPersistence, times(1)).enqueueJob(any(), configCaptor.capture());
final var destConfigValues = configCaptor.getValue().getSync().getSyncResourceRequirements().getDestination();

assertEquals(originalReqs.getCpuRequest(), destConfigValues.getCpuRequest());
assertEquals(originalReqs.getCpuLimit(), destConfigValues.getCpuLimit());
assertEquals(originalReqs.getMemoryRequest(), destConfigValues.getMemoryRequest());
assertEquals(originalReqs.getMemoryLimit(), destConfigValues.getMemoryLimit());
}

private static Stream<Arguments> weirdnessOverrideMatrix() {
return Stream.of(
Arguments.of("0.7"),
Arguments.of("0.5, 1, 1000Mi, 2000Mi"),
Arguments.of("cat burglar"),
Arguments.of("{ \"cpu_limit\": \"2\", \"cpu_request\": \"1\" "),
Arguments.of("null"),
Arguments.of("undefined"),
Arguments.of(""),
Arguments.of("{}"));
}

@Test
void testCreateResetConnectionJob() throws IOException {
final Optional<String> expectedSourceType = Optional.empty();
Expand Down

0 comments on commit 2d9d461

Please sign in to comment.