Skip to content

Commit

Permalink
Re-introduce changes to add new labels to sync kubernetes pods (#7719)
Browse files Browse the repository at this point in the history
Now that backwards compatibility is enabled in IntegrationLauncherConfig, we can safely add new properties to enable reporting workspace id and connection id on sync pods. Transforms are not yet supported.
  • Loading branch information
Jack Keller committed Jul 18, 2023
1 parent 84a27b9 commit 55ed46c
Show file tree
Hide file tree
Showing 29 changed files with 151 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,15 @@ public Object getJobInput(final SyncInput input) {
final IntegrationLauncherConfig sourceLauncherConfig = getSourceIntegrationLauncherConfig(
jobId,
attempt,
connectionId,
config,
sourceVersion,
attemptSyncConfig.getSourceConfiguration());

final IntegrationLauncherConfig destinationLauncherConfig = getDestinationIntegrationLauncherConfig(
jobId,
attempt,
connectionId,
config,
destinationVersion,
attemptSyncConfig.getDestinationConfiguration(),
Expand All @@ -218,7 +220,7 @@ public Object getJobInput(final SyncInput input) {
.withResourceRequirements(config.getResourceRequirements())
.withSourceResourceRequirements(config.getSourceResourceRequirements())
.withDestinationResourceRequirements(config.getDestinationResourceRequirements())
.withConnectionId(standardSync.getConnectionId())
.withConnectionId(connectionId)
.withWorkspaceId(config.getWorkspaceId())
.withNormalizeInDestinationContainer(shouldNormalizeInDestination)
.withIsReset(JobConfig.ConfigType.RESET_CONNECTION.equals(jobConfigType));
Expand Down Expand Up @@ -263,6 +265,7 @@ public Object getCheckJobInput(final CheckInput input) {
final IntegrationLauncherConfig sourceLauncherConfig = getSourceIntegrationLauncherConfig(
jobId,
attemptNumber,
connectionId,
jobSyncConfig,
sourceVersion,
sourceConfiguration);
Expand All @@ -271,6 +274,7 @@ public Object getCheckJobInput(final CheckInput input) {
getDestinationIntegrationLauncherConfig(
jobId,
attemptNumber,
connectionId,
jobSyncConfig,
destinationVersion,
destinationConfiguration,
Expand Down Expand Up @@ -361,6 +365,7 @@ private void reportNormalizationInDestinationMetrics(final boolean shouldNormali

private IntegrationLauncherConfig getSourceIntegrationLauncherConfig(final long jobId,
final int attempt,
final UUID connectionId,
final JobSyncConfig config,
@Nullable final ActorDefinitionVersion sourceVersion,
final JsonNode sourceConfiguration)
Expand All @@ -370,6 +375,8 @@ private IntegrationLauncherConfig getSourceIntegrationLauncherConfig(final long
final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(jobId))
.withAttemptId((long) attempt)
.withConnectionId(connectionId)
.withWorkspaceId(config.getWorkspaceId())
.withDockerImage(config.getSourceDockerImage())
.withProtocolVersion(config.getSourceProtocolVersion())
.withIsCustomConnector(config.getIsSourceCustomConnector());
Expand All @@ -383,6 +390,7 @@ private IntegrationLauncherConfig getSourceIntegrationLauncherConfig(final long

private IntegrationLauncherConfig getDestinationIntegrationLauncherConfig(final long jobId,
final int attempt,
final UUID connectionId,
final JobSyncConfig config,
final ActorDefinitionVersion destinationVersion,
final JsonNode destinationConfiguration,
Expand All @@ -400,6 +408,8 @@ private IntegrationLauncherConfig getDestinationIntegrationLauncherConfig(final
return new IntegrationLauncherConfig()
.withJobId(String.valueOf(jobId))
.withAttemptId((long) attempt)
.withConnectionId(connectionId)
.withWorkspaceId(config.getWorkspaceId())
.withDockerImage(config.getDestinationDockerImage())
.withProtocolVersion(config.getDestinationProtocolVersion())
.withIsCustomConnector(config.getIsDestinationCustomConnector())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne
ConfigType.CHECK_CONNECTION_SOURCE,
jobReportingContext,
source.getSourceDefinitionId(),
() -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, taskQueue, jobCheckConnectionConfig),
() -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, source.getWorkspaceId(), taskQueue, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
source.getWorkspaceId(),
source.getSourceId());
Expand Down Expand Up @@ -137,7 +137,7 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
ConfigType.CHECK_CONNECTION_DESTINATION,
jobReportingContext,
destination.getDestinationDefinitionId(),
() -> temporalClient.submitCheckConnection(jobId, 0, taskQueue, jobCheckConnectionConfig),
() -> temporalClient.submitCheckConnection(jobId, 0, destination.getWorkspaceId(), taskQueue, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
destination.getWorkspaceId(),
destination.getDestinationId());
Expand Down Expand Up @@ -173,7 +173,7 @@ public SynchronousResponse<UUID> createDiscoverSchemaJob(final SourceConnection
ConfigType.DISCOVER_SCHEMA,
jobReportingContext,
source.getSourceDefinitionId(),
() -> temporalClient.submitDiscoverSchema(jobId, 0, taskQueue, jobDiscoverCatalogConfig),
() -> temporalClient.submitDiscoverSchema(jobId, 0, source.getWorkspaceId(), taskQueue, jobDiscoverCatalogConfig),
ConnectorJobOutput::getDiscoverCatalogId,
source.getWorkspaceId(),
source.getSourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
when(stateHandler.getState(new ConnectionIdRequestBody().connectionId(CONNECTION_ID)))
.thenReturn(new ConnectionState()
.stateType(ConnectionStateType.LEGACY)
.state(STATE.getState()));
.state(STATE.getState())
.connectionId(CONNECTION_ID));

final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withWorkspaceId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withDestinationDockerImage("destinationDockerImage")
.withSourceDockerImage("sourceDockerImage")
.withConfiguredAirbyteCatalog(mock(ConfiguredAirbyteCatalog.class));
Expand All @@ -180,14 +181,14 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
when(job.getScope()).thenReturn(CONNECTION_ID.toString());

final StandardSyncInput expectedStandardSyncInput = new StandardSyncInput()
.withConnectionId(CONNECTION_ID)
.withWorkspaceId(jobSyncConfig.getWorkspaceId())
.withSourceId(SOURCE_ID)
.withDestinationId(DESTINATION_ID)
.withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH_AND_INJECTED_CONFIG)
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
.withState(STATE)
.withCatalog(jobSyncConfig.getConfiguredAirbyteCatalog())
.withWorkspaceId(jobSyncConfig.getWorkspaceId())
.withIsReset(false);

final JobRunConfig expectedJobRunConfig = new JobRunConfig()
Expand All @@ -197,11 +198,15 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
final IntegrationLauncherConfig expectedSourceLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_NUMBER)
.withConnectionId(CONNECTION_ID)
.withWorkspaceId(WORKSPACE_ID)
.withDockerImage(jobSyncConfig.getSourceDockerImage());

final IntegrationLauncherConfig expectedDestinationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_NUMBER)
.withConnectionId(CONNECTION_ID)
.withWorkspaceId(jobSyncConfig.getWorkspaceId())
.withDockerImage(jobSyncConfig.getDestinationDockerImage())
.withAdditionalEnvironmentVariables(Collections.emptyMap());

Expand Down Expand Up @@ -235,10 +240,11 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException, JsonValid
when(stateHandler.getState(new ConnectionIdRequestBody().connectionId(CONNECTION_ID)))
.thenReturn(new ConnectionState()
.stateType(ConnectionStateType.LEGACY)
.state(STATE.getState()));
.state(STATE.getState())
.connectionId(CONNECTION_ID));

final JobResetConnectionConfig jobResetConfig = new JobResetConnectionConfig()
.withWorkspaceId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withDestinationDockerImage("destinationDockerImage")
.withConfiguredAirbyteCatalog(mock(ConfiguredAirbyteCatalog.class));

Expand All @@ -250,14 +256,14 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException, JsonValid
when(job.getScope()).thenReturn(CONNECTION_ID.toString());

final StandardSyncInput expectedStandardSyncInput = new StandardSyncInput()
.withConnectionId(CONNECTION_ID)
.withWorkspaceId(jobResetConfig.getWorkspaceId())
.withSourceId(SOURCE_ID)
.withDestinationId(DESTINATION_ID)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
.withState(STATE)
.withCatalog(jobResetConfig.getConfiguredAirbyteCatalog())
.withWorkspaceId(jobResetConfig.getWorkspaceId())
.withWebhookOperationConfigs(jobResetConfig.getWebhookOperationConfigs())
.withIsReset(true);

Expand All @@ -268,11 +274,15 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException, JsonValid
final IntegrationLauncherConfig expectedSourceLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_NUMBER)
.withConnectionId(CONNECTION_ID)
.withWorkspaceId(jobResetConfig.getWorkspaceId())
.withDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB);

final IntegrationLauncherConfig expectedDestinationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_NUMBER)
.withConnectionId(CONNECTION_ID)
.withWorkspaceId(jobResetConfig.getWorkspaceId())
.withDockerImage(jobResetConfig.getDestinationDockerImage())
.withAdditionalEnvironmentVariables(Collections.emptyMap());

Expand Down Expand Up @@ -314,7 +324,7 @@ void testGetCheckConnectionInputs() throws JsonValidationException, ConfigNotFou
.thenReturn(SOURCE_CONFIG_WITH_OAUTH);

final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withWorkspaceId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withDestinationDockerImage("destinationDockerImage")
.withSourceDockerImage("sourceDockerImage")
.withConfiguredAirbyteCatalog(mock(ConfiguredAirbyteCatalog.class));
Expand All @@ -329,11 +339,15 @@ void testGetCheckConnectionInputs() throws JsonValidationException, ConfigNotFou
final IntegrationLauncherConfig expectedSourceLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_NUMBER)
.withWorkspaceId(WORKSPACE_ID)
.withConnectionId(CONNECTION_ID)
.withDockerImage(jobSyncConfig.getSourceDockerImage());

final IntegrationLauncherConfig expectedDestinationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_NUMBER)
.withWorkspaceId(WORKSPACE_ID)
.withConnectionId(CONNECTION_ID)
.withDockerImage(jobSyncConfig.getDestinationDockerImage())
.withAdditionalEnvironmentVariables(Collections.emptyMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ class DefaultSynchronousSchedulerClientTest {
.put("password", "abc")
.build());
private static final SourceConnection SOURCE_CONNECTION = new SourceConnection()
.withWorkspaceId(WORKSPACE_ID)
.withSourceId(UUID1)
.withSourceDefinitionId(UUID2)
.withConfiguration(CONFIGURATION);
private static final DestinationConnection DESTINATION_CONNECTION = new DestinationConnection()
.withWorkspaceId(WORKSPACE_ID)
.withDestinationId(UUID1)
.withDestinationDefinitionId(UUID2)
.withConfiguration(CONFIGURATION);
Expand Down Expand Up @@ -223,7 +225,7 @@ void testCreateSourceCheckConnectionJob() throws IOException {

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(WORKSPACE_ID), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createSourceCheckConnectionJob(SOURCE_CONNECTION, ACTOR_DEFINITION_VERSION, false);
Expand All @@ -246,7 +248,7 @@ void testCreateSourceCheckConnectionJobWithConfigInjection() throws IOException

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(WORKSPACE_ID), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createSourceCheckConnectionJob(SOURCE_CONNECTION, ACTOR_DEFINITION_VERSION, false);
Expand All @@ -265,7 +267,7 @@ void testCreateDestinationCheckConnectionJob() throws IOException {

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(WORKSPACE_ID), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, ACTOR_DEFINITION_VERSION, false);
Expand All @@ -277,8 +279,9 @@ void testCreateDestinationCheckConnectionJob() throws IOException {
void testCreateDiscoverSchemaJob() throws IOException {
final UUID expectedCatalogId = UUID.randomUUID();
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId(expectedCatalogId);
when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(DISCOVER_TASK_QUEUE), any(JobDiscoverCatalogConfig.class)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
when(
temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(WORKSPACE_ID), eq(DISCOVER_TASK_QUEUE), any(JobDiscoverCatalogConfig.class)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<UUID> response =
schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, ACTOR_DEFINITION_VERSION, false);
assertEquals(expectedCatalogId, response.getOutput());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,14 @@ public TemporalResponse<ConnectorJobOutput> submitGetSpec(final UUID jobId, fina
*/
public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID jobId,
final int attempt,
final UUID workspaceId,
final String taskQueue,
final JobCheckConnectionConfig config) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);
final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig()
.withJobId(jobId.toString())
.withAttemptId((long) attempt)
.withWorkspaceId(workspaceId)
.withDockerImage(config.getDockerImage())
.withProtocolVersion(config.getProtocolVersion())
.withIsCustomConnector(config.getIsCustomConnector());
Expand All @@ -416,12 +418,14 @@ public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID job
*/
public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobId,
final int attempt,
final UUID workspaceId,
final String taskQueue,
final JobDiscoverCatalogConfig config) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);
final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig()
.withJobId(jobId.toString())
.withAttemptId((long) attempt)
.withWorkspaceId(workspaceId)
.withDockerImage(config.getDockerImage())
.withProtocolVersion(config.getProtocolVersion())
.withIsCustomConnector(config.getIsCustomConnector());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ void testSubmitCheckConnection() {
final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration());

temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, CHECK_TASK_QUEUE, checkConnectionConfig);
temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, WORKSPACE_ID, CHECK_TASK_QUEUE, checkConnectionConfig);
checkConnectionWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);
verify(workflowClient).newWorkflowStub(CheckConnectionWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CHECK_CONNECTION));
Expand All @@ -266,7 +266,7 @@ void testSubmitDiscoverSchema() {
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration());

temporalClient.submitDiscoverSchema(JOB_UUID, ATTEMPT_ID, DISCOVER_TASK_QUEUE, checkConnectionConfig);
temporalClient.submitDiscoverSchema(JOB_UUID, ATTEMPT_ID, WORKSPACE_ID, DISCOVER_TASK_QUEUE, checkConnectionConfig);
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);
verify(workflowClient).newWorkflowStub(DiscoverCatalogWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.DISCOVER_SCHEMA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public boolean transform(final String jobId,
CUSTOM_STEP,
jobId,
attempt,
null, // TODO: Provide connectionId
null, // TODO: Provide workspaceId
jobRoot,
dbtConfig.getDockerImage(),
false,
Expand Down
Loading

0 comments on commit 55ed46c

Please sign in to comment.