Skip to content

Commit

Permalink
add create new job endpoint - third attempt (#8319)
Browse files Browse the repository at this point in the history
  • Loading branch information
jpefaur committed Aug 14, 2023
1 parent 1611d2a commit 50ce32a
Show file tree
Hide file tree
Showing 50 changed files with 663 additions and 311 deletions.
33 changes: 33 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2591,6 +2591,32 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/WebBackendGeographiesListResult"
/v1/jobs/create:
post:
tags:
- jobs
- internal
summary:
Creates a new job for a given connection. If a job is already running for the connection, it will be
stopped and a new job will be created.
operationId: createJob
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/JobCreate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/JobInfoRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/jobs/list:
post:
tags:
Expand Down Expand Up @@ -5786,6 +5812,13 @@ components:
- get_spec
- sync
- reset_connection
JobCreate:
type: object
required:
- connectionId
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
JobListRequestBody:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import static io.airbyte.commons.server.handlers.helpers.AutoPropagateSchemaChangeHelper.getUpdatedSchema;
import static io.airbyte.persistence.job.ResourceRequirementsUtils.getResourceRequirementsForJobType;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.api.model.generated.AdvancedAuth;
Expand All @@ -31,6 +33,7 @@
import io.airbyte.api.model.generated.DestinationUpdate;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.JobConfigType;
import io.airbyte.api.model.generated.JobCreate;
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.LogRead;
Expand All @@ -54,12 +57,14 @@
import io.airbyte.commons.server.converters.OauthModelConverter;
import io.airbyte.commons.server.errors.ValueConflictKnownException;
import io.airbyte.commons.server.handlers.helpers.CatalogConverter;
import io.airbyte.commons.server.handlers.helpers.JobCreationAndStatusUpdateHelper;
import io.airbyte.commons.server.scheduler.EventRunner;
import io.airbyte.commons.server.scheduler.SynchronousJobMetadata;
import io.airbyte.commons.server.scheduler.SynchronousResponse;
import io.airbyte.commons.server.scheduler.SynchronousSchedulerClient;
import io.airbyte.commons.temporal.ErrorCode;
import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
Expand All @@ -72,23 +77,30 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.featureflag.AutoPropagateSchema;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.JobCreator;
import io.airbyte.persistence.job.JobNotifier;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.ResourceRequirementsUtils;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.factory.SyncJobFactory;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.tracker.JobTracker;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.StreamDescriptor;
Expand All @@ -109,7 +121,7 @@
/**
* ScheduleHandler. Javadocs suppressed because api docs should be used as source of truth.
*/
@SuppressWarnings("MissingJavadocMethod")
@SuppressWarnings({"MissingJavadocMethod", "ParameterName"})
@Singleton
@Slf4j
public class SchedulerHandler {
Expand All @@ -133,6 +145,11 @@ public class SchedulerHandler {
private final WebUrlHelper webUrlHelper;
private final ActorDefinitionVersionHelper actorDefinitionVersionHelper;
private final FeatureFlagClient featureFlagClient;
private final StreamResetPersistence streamResetPersistence;
private final OAuthConfigSupplier oAuthConfigSupplier;
private final JobCreator jobCreator;
private final SyncJobFactory jobFactory;
private final JobCreationAndStatusUpdateHelper jobCreationAndStatusUpdateHelper;

// TODO: Convert to be fully using micronaut
public SchedulerHandler(final ConfigRepository configRepository,
Expand All @@ -147,7 +164,13 @@ public SchedulerHandler(final ConfigRepository configRepository,
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper,
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final FeatureFlagClient featureFlagClient) {
final FeatureFlagClient featureFlagClient,
final StreamResetPersistence streamResetPersistence,
final OAuthConfigSupplier oAuthConfigSupplier,
final JobCreator jobCreator,
final SyncJobFactory jobFactory,
final JobNotifier jobNotifier,
final JobTracker jobTracker) {
this(
configRepository,
secretsRepositoryWriter,
Expand All @@ -161,7 +184,13 @@ public SchedulerHandler(final ConfigRepository configRepository,
envVariableFeatureFlags,
webUrlHelper,
actorDefinitionVersionHelper,
featureFlagClient);
featureFlagClient,
streamResetPersistence,
oAuthConfigSupplier,
jobCreator,
jobFactory,
jobNotifier,
jobTracker);
}

@VisibleForTesting
Expand All @@ -177,7 +206,13 @@ public SchedulerHandler(final ConfigRepository configRepository,
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper,
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final FeatureFlagClient featureFlagClient) {
final FeatureFlagClient featureFlagClient,
final StreamResetPersistence streamResetPersistence,
final OAuthConfigSupplier oAuthConfigSupplier,
final JobCreator jobCreator,
final SyncJobFactory jobFactory,
final JobNotifier jobNotifier,
final JobTracker jobTracker) {
this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -191,6 +226,15 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.webUrlHelper = webUrlHelper;
this.actorDefinitionVersionHelper = actorDefinitionVersionHelper;
this.featureFlagClient = featureFlagClient;
this.streamResetPersistence = streamResetPersistence;
this.oAuthConfigSupplier = oAuthConfigSupplier;
this.jobCreator = jobCreator;
this.jobFactory = jobFactory;
this.jobCreationAndStatusUpdateHelper = new JobCreationAndStatusUpdateHelper(
jobPersistence,
configRepository,
jobNotifier,
jobTracker);
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -560,6 +604,65 @@ public JobInfoRead resetConnectionStream(final ConnectionStreamRequestBody conne
return submitResetConnectionStreamsToWorker(connectionStreamRequestBody.getConnectionId(), connectionStreamRequestBody.getStreams());
}

public JobInfoRead createJob(final JobCreate jobCreate) throws JsonValidationException, ConfigNotFoundException, IOException {
// Fail non-terminal jobs first to prevent failing to create a new job
jobCreationAndStatusUpdateHelper.failNonTerminalJobs(jobCreate.getConnectionId());

final StandardSync standardSync = configRepository.getStandardSync(jobCreate.getConnectionId());
final List<StreamDescriptor> streamsToReset = streamResetPersistence.getStreamResets(jobCreate.getConnectionId());
log.info("Found the following streams to reset for connection {}: {}", jobCreate.getConnectionId(), streamsToReset);

if (!streamsToReset.isEmpty()) {
final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId());

final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
destination.getDestinationDefinitionId(),
destination.getDestinationId(),
destination.getWorkspaceId(),
destination.getConfiguration());
destination.setConfiguration(destinationConfiguration);

final StandardDestinationDefinition destinationDef =
configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId());
final ActorDefinitionVersion destinationVersion =
actorDefinitionVersionHelper.getDestinationVersion(destinationDef, destination.getWorkspaceId(), destination.getDestinationId());
final String destinationImageName = destinationVersion.getDockerRepository() + ":" + destinationVersion.getDockerImageTag();

final List<StandardSyncOperation> standardSyncOperations = Lists.newArrayList();
for (final var operationId : standardSync.getOperationIds()) {
final StandardSyncOperation standardSyncOperation = configRepository.getStandardSyncOperation(operationId);
standardSyncOperations.add(standardSyncOperation);
}

final Optional<Long> jobIdOptional =
jobCreator.createResetConnectionJob(
destination,
standardSync,
destinationDef,
destinationVersion,
destinationImageName,
new Version(destinationVersion.getProtocolVersion()),
destinationDef.getCustom(),
standardSyncOperations,
streamsToReset,
destination.getWorkspaceId());

final long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
: jobIdOptional.get();

return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
} else {
final long jobId = jobFactory.create(jobCreate.getConnectionId());

log.info("New job created, with id: " + jobId);
final Job job = jobPersistence.getJob(jobId);
jobCreationAndStatusUpdateHelper.emitJobToReleaseStagesMetric(OssMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, job);

return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}
}

public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOException {
return submitCancellationToWorker(jobIdRequestBody.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,16 @@ void setUp() {
final SourceDefinitionsHandler sourceDefinitionsHandler = mock(SourceDefinitionsHandler.class);
final DestinationDefinitionsHandler destinationDefinitionsHandler = mock(DestinationDefinitionsHandler.class);
final AirbyteVersion airbyteVersion = mock(AirbyteVersion.class);
jobHistoryHandler = new JobHistoryHandler(jobPersistence, WorkerEnvironment.DOCKER, LogConfigs.EMPTY, connectionsHandler, sourceHandler,
sourceDefinitionsHandler, destinationHandler, destinationDefinitionsHandler, airbyteVersion);
jobHistoryHandler = new JobHistoryHandler(
jobPersistence,
WorkerEnvironment.DOCKER,
LogConfigs.EMPTY,
connectionsHandler,
sourceHandler,
sourceDefinitionsHandler,
destinationHandler,
destinationDefinitionsHandler,
airbyteVersion);
}

@Nested
Expand Down
Loading

0 comments on commit 50ce32a

Please sign in to comment.