Skip to content

Commit

Permalink
clean up jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi committed Sep 5, 2023
1 parent b2d83f9 commit f74dd99
Show file tree
Hide file tree
Showing 7 changed files with 672 additions and 419 deletions.
832 changes: 450 additions & 382 deletions backend/gen/go/protos/mgmt/v1alpha1/job.pb.go

Large diffs are not rendered by default.

160 changes: 158 additions & 2 deletions backend/gen/go/protos/mgmt/v1alpha1/job.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions backend/internal/dtomaps/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ func ToJobDto(
Status: mgmtv1alpha1.JobStatus(0), // TODO

Check failure on line 33 in backend/internal/dtomaps/jobs.go

View workflow job for this annotation

GitHub Actions / golang-lint

todoCommentWithoutDetail: may want to add detail/assignee to this TODO/FIXME/BUG comment (gocritic)
ConnectionSourceId: inputSourceConnId,
CronSchedule: inputJob.Spec.CronSchedule,
HaltOnNewColumnAddition: *inputJob.Spec.Source.Sql.HaltOnSchemaChange,
ConnectionDestinationIds: inputDestConnIds,
Mappings: mappings,
SourceOptions: &mgmtv1alpha1.JobSourceOptions{
HaltOnNewColumnAddition: *inputJob.Spec.Source.Sql.HaltOnSchemaChange,
},
}
}

func getTransformer(transformerName string) string {
// TODO @alisha handler operator to api transformer mapping
// TODO @alisha handle operator to api transformer mapping
return transformerName
}
8 changes: 6 additions & 2 deletions backend/protos/mgmt/v1alpha1/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ message CreateJobRequest {
string connection_source_id = 2 [(validate.rules).string.uuid = true];
repeated string connection_destination_ids = 3;
optional string cron_schedule = 4;
bool halt_on_new_column_addition = 5;
JobSourceOptions source_options = 5;
repeated JobMapping mappings = 6;
}
message CreateJobResponse {
Expand Down Expand Up @@ -118,6 +118,10 @@ message CancelJobRunRequest {
}
message CancelJobRunResponse {}

message JobSourceOptions {
bool halt_on_new_column_addition = 1;
}

message Job {
string id = 1;

Expand All @@ -131,7 +135,7 @@ message Job {
repeated string connection_destination_ids = 7;
repeated JobMapping mappings = 8;
optional string cron_schedule = 9;
bool halt_on_new_column_addition = 10;
JobSourceOptions source_options = 10;
}

message JobRun {
Expand Down
31 changes: 7 additions & 24 deletions backend/services/mgmt/v1alpha1/job-service/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func (s *Service) GetJobs(
if err != nil {
return nil, err
}
destConnIds = append(destConnIds, connsNameToIdMap[destConnName])
destConnId, ok := connsNameToIdMap[destConnName]
if ok {
destConnIds = append(destConnIds, destConnId)
}

}

dto := dtomaps.ToJobDto(&job, sourceConnId, destConnIds)
Expand Down Expand Up @@ -103,7 +107,7 @@ func (s *Service) GetJob(
}
sourceConnName, err := getSourceConnectionName(job.Spec.Source)
if err != nil {
return nil, err // TODO @alisha should return job even without connections
return nil, err
}
connNames := []string{sourceConnName}
connNames = append(connNames, destConnNames...)
Expand Down Expand Up @@ -174,16 +178,13 @@ func (s *Service) CreateJob(
return nil, err
}

trueBool := true // TODO @alisha
jobDestinations := []*neosyncdevv1alpha1.JobConfigDestination{}
for _, name := range destConnNames {
jobDestinations = append(jobDestinations, &neosyncdevv1alpha1.JobConfigDestination{
Sql: &neosyncdevv1alpha1.DestinationSql{
ConnectionRef: &neosyncdevv1alpha1.LocalResourceRef{
Name: name,
},
TruncateBeforeInsert: &trueBool, // TODO @alisha
InitDbSchema: &trueBool, // TODO @alisha
},
})
}
Expand All @@ -204,7 +205,7 @@ func (s *Service) CreateJob(
ConnectionRef: neosyncdevv1alpha1.LocalResourceRef{
Name: *sourceConnName,
},
HaltOnSchemaChange: &req.Msg.HaltOnNewColumnAddition,
HaltOnSchemaChange: &req.Msg.SourceOptions.HaltOnNewColumnAddition,
Schemas: schemas,
},
},
Expand Down Expand Up @@ -651,21 +652,3 @@ func getJobById(
}
return &jobs.Items[0], nil
}

func patchJobConfig(
ctx context.Context,
k8sclient *neosync_k8sclient.Client,
jobConfig *neosyncdevv1alpha1.JobConfig,
patch *neosyncdevv1alpha1.JobConfig,
) error {
patchBits, err := json.Marshal(patch)
if err != nil {
return err
}

err = k8sclient.CustomResourceClient.Patch(ctx, jobConfig, runtimeclient.RawPatch(types.MergePatchType, patchBits))
if err != nil {
return err
}
return nil
}
5 changes: 4 additions & 1 deletion frontend/app/new/job/schema/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
CreateJobResponse,
JobMapping,
JobMappingTransformer,
JobSourceOptions,
} from '@/neosync-api-client/mgmt/v1alpha1/job_pb';
import { yupResolver } from '@hookform/resolvers/yup';
import { useRouter } from 'next/navigation';
Expand Down Expand Up @@ -122,7 +123,9 @@ async function createNewJob(formData: FormValues): Promise<CreateJobResponse> {
const body = new CreateJobRequest({
jobName: formData.define.jobName,
cronSchedule: formData.define.cronSchedule,
haltOnNewColumnAddition: false,
sourceOptions: new JobSourceOptions({
haltOnNewColumnAddition: false,
}),
mappings: formData.schema.mappings.map((m) => {
return new JobMapping({
schema: m.schema,
Expand Down
Loading

0 comments on commit f74dd99

Please sign in to comment.