Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEOS-1587: Exposes MaxInFlight, BatchConfig for SQL Destinations #2885

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,307 changes: 1,195 additions & 1,112 deletions backend/gen/go/protos/mgmt/v1alpha1/job.pb.go

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion backend/protos/mgmt/v1alpha1/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ message PostgresDestinationConnectionOptions {
PostgresOnConflictConfig on_conflict = 3;
// Insert all valid records, skipping any that violate foreign key constraints.
bool skip_foreign_key_violations = 4;
// Configure batching options to handle how much data is sent to your database at once.
BatchConfig batch = 5;
// Determines the maximum number of parallel batched inserts.
optional uint32 max_in_flight = 6 [(buf.validate.field).uint32 = {gte: 1}];
}

message PostgresOnConflictConfig {
Expand All @@ -239,6 +243,10 @@ message MysqlDestinationConnectionOptions {
MysqlOnConflictConfig on_conflict = 3;
// Insert all valid records, skipping any that violate foreign key constraints.
bool skip_foreign_key_violations = 4;
// Configure batching options to handle how much data is sent to your database at once.
BatchConfig batch = 5;
// Determines the maximum number of parallel batched inserts.
optional uint32 max_in_flight = 6 [(buf.validate.field).uint32 = {gte: 1}];
}
message MysqlTruncateTableConfig {
bool truncate_before_insert = 1;
Expand All @@ -255,6 +263,10 @@ message MssqlDestinationConnectionOptions {
MssqlOnConflictConfig on_conflict = 3;
// Insert all valid records, skipping any that violate foreign key constraints.
bool skip_foreign_key_violations = 4;
// Configure batching options to handle how much data is sent to your database at once.
BatchConfig batch = 5;
// Determines the maximum number of parallel batched inserts.
optional uint32 max_in_flight = 6 [(buf.validate.field).uint32 = {gte: 1}];
}
message MssqlTruncateTableConfig {
bool truncate_before_insert = 1;
Expand All @@ -267,7 +279,7 @@ message AwsS3DestinationConnectionOptions {
// The storage class that will be used when objects are written to S3
StorageClass storage_class = 1;
// The maximum number of batched messages to have in flight at a given time. Increase this to improve throughput.
optional uint32 max_in_flight = 2;
optional uint32 max_in_flight = 2 [(buf.validate.field).uint32 = {gte: 1}];
// The maximum period (duration string) to wait on an upload before abandoning it and reattempting.
optional string timeout = 3;
// Configure batching options to more efficiently store records in S3
Expand Down
158 changes: 109 additions & 49 deletions backend/sql/postgresql/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,50 @@ type PostgresDestinationOptions struct {
InitTableSchema bool `json:"initTableSchema"`
OnConflictConfig *PostgresOnConflictConfig `json:"onConflictConfig,omitempty"`
SkipForeignKeyViolations bool `json:"skipForeignKeyViolations"`
MaxInFlight *uint32 `json:"maxInFlight,omitempty"`
Batch *BatchConfig `json:"batch,omitempty"`
}

func (m *PostgresDestinationOptions) ToDto() *mgmtv1alpha1.PostgresDestinationConnectionOptions {
if m.TruncateTableConfig == nil {
m.TruncateTableConfig = &PostgresTruncateTableConfig{}
}
if m.OnConflictConfig == nil {
m.OnConflictConfig = &PostgresOnConflictConfig{}
}
var batchConfig *mgmtv1alpha1.BatchConfig
if m.Batch != nil {
batchConfig = m.Batch.ToDto()
}
return &mgmtv1alpha1.PostgresDestinationConnectionOptions{
TruncateTable: m.TruncateTableConfig.ToDto(),
InitTableSchema: m.InitTableSchema,
OnConflict: m.OnConflictConfig.ToDto(),
SkipForeignKeyViolations: m.SkipForeignKeyViolations,
MaxInFlight: m.MaxInFlight,
Batch: batchConfig,
}
}

func (m *PostgresDestinationOptions) FromDto(dto *mgmtv1alpha1.PostgresDestinationConnectionOptions) {
if dto == nil {
dto = &mgmtv1alpha1.PostgresDestinationConnectionOptions{}
}
m.InitTableSchema = dto.GetInitTableSchema()
if dto.GetOnConflict() != nil {
m.OnConflictConfig = &PostgresOnConflictConfig{}
m.OnConflictConfig.FromDto(dto.GetOnConflict())
}
if dto.GetTruncateTable() != nil {
m.TruncateTableConfig = &PostgresTruncateTableConfig{}
m.TruncateTableConfig.FromDto(dto.GetTruncateTable())
}
m.SkipForeignKeyViolations = dto.GetSkipForeignKeyViolations()
m.MaxInFlight = dto.MaxInFlight
if dto.GetBatch() != nil {
m.Batch = &BatchConfig{}
m.Batch.FromDto(dto.GetBatch())
}
}

type PostgresOnConflictConfig struct {
Expand Down Expand Up @@ -1487,6 +1531,50 @@ type MysqlDestinationOptions struct {
InitTableSchema bool `json:"initTableSchema"`
OnConflictConfig *MysqlOnConflictConfig `json:"onConflict,omitempty"`
SkipForeignKeyViolations bool `json:"skipForeignKeyViolations"`
MaxInFlight *uint32 `json:"maxInFlight,omitempty"`
Batch *BatchConfig `json:"batch,omitempty"`
}

func (m *MysqlDestinationOptions) ToDto() *mgmtv1alpha1.MysqlDestinationConnectionOptions {
if m.TruncateTableConfig == nil {
m.TruncateTableConfig = &MysqlTruncateTableConfig{}
}
if m.OnConflictConfig == nil {
m.OnConflictConfig = &MysqlOnConflictConfig{}
}
var batchConfig *mgmtv1alpha1.BatchConfig
if m.Batch != nil {
batchConfig = m.Batch.ToDto()
}
return &mgmtv1alpha1.MysqlDestinationConnectionOptions{
TruncateTable: m.TruncateTableConfig.ToDto(),
InitTableSchema: m.InitTableSchema,
OnConflict: m.OnConflictConfig.ToDto(),
SkipForeignKeyViolations: m.SkipForeignKeyViolations,
MaxInFlight: m.MaxInFlight,
Batch: batchConfig,
}
}

func (m *MysqlDestinationOptions) FromDto(dto *mgmtv1alpha1.MysqlDestinationConnectionOptions) {
if dto == nil {
dto = &mgmtv1alpha1.MysqlDestinationConnectionOptions{}
}
m.InitTableSchema = dto.GetInitTableSchema()
if dto.GetOnConflict() != nil {
m.OnConflictConfig = &MysqlOnConflictConfig{}
m.OnConflictConfig.FromDto(dto.GetOnConflict())
}
if dto.GetTruncateTable() != nil {
m.TruncateTableConfig = &MysqlTruncateTableConfig{}
m.TruncateTableConfig.FromDto(dto.GetTruncateTable())
}
m.SkipForeignKeyViolations = dto.GetSkipForeignKeyViolations()
m.MaxInFlight = dto.MaxInFlight
if dto.GetBatch() != nil {
m.Batch = &BatchConfig{}
m.Batch.FromDto(dto.GetBatch())
}
}

type MysqlOnConflictConfig struct {
Expand Down Expand Up @@ -1522,6 +1610,8 @@ type MssqlDestinationOptions struct {
InitTableSchema bool `json:"initTableSchema"`
OnConflictConfig *MssqlOnConflictConfig `json:"onConflict,omitempty"`
SkipForeignKeyViolations bool `json:"skipForeignKeyViolations"`
MaxInFlight *uint32 `json:"maxInFlight,omitempty"`
Batch *BatchConfig `json:"batch,omitempty"`
}

func (m *MssqlDestinationOptions) ToDto() *mgmtv1alpha1.MssqlDestinationConnectionOptions {
Expand All @@ -1534,11 +1624,18 @@ func (m *MssqlDestinationOptions) ToDto() *mgmtv1alpha1.MssqlDestinationConnecti
onconflictConfig = m.OnConflictConfig.ToDto()
}

var batchConfig *mgmtv1alpha1.BatchConfig
if m.Batch != nil {
batchConfig = m.Batch.ToDto()
}

return &mgmtv1alpha1.MssqlDestinationConnectionOptions{
TruncateTable: truncateTableConfig,
InitTableSchema: m.InitTableSchema,
OnConflict: onconflictConfig,
SkipForeignKeyViolations: m.SkipForeignKeyViolations,
MaxInFlight: m.MaxInFlight,
Batch: batchConfig,
}
}
func (m *MssqlDestinationOptions) FromDto(dto *mgmtv1alpha1.MssqlDestinationConnectionOptions) {
Expand All @@ -1555,6 +1652,11 @@ func (m *MssqlDestinationOptions) FromDto(dto *mgmtv1alpha1.MssqlDestinationConn
m.TruncateTableConfig.FromDto(dto.GetTruncateTable())
}
m.SkipForeignKeyViolations = dto.GetSkipForeignKeyViolations()
m.MaxInFlight = dto.MaxInFlight
if dto.GetBatch() != nil {
m.Batch = &BatchConfig{}
m.Batch.FromDto(dto.GetBatch())
}
}

type MssqlOnConflictConfig struct {
Expand Down Expand Up @@ -1593,38 +1695,16 @@ func (t *MssqlTruncateTableConfig) FromDto(dto *mgmtv1alpha1.MssqlTruncateTableC

func (j *JobDestinationOptions) ToDto() *mgmtv1alpha1.JobDestinationOptions {
if j.PostgresOptions != nil {
if j.PostgresOptions.TruncateTableConfig == nil {
j.PostgresOptions.TruncateTableConfig = &PostgresTruncateTableConfig{}
}
if j.PostgresOptions.OnConflictConfig == nil {
j.PostgresOptions.OnConflictConfig = &PostgresOnConflictConfig{}
}
return &mgmtv1alpha1.JobDestinationOptions{
Config: &mgmtv1alpha1.JobDestinationOptions_PostgresOptions{
PostgresOptions: &mgmtv1alpha1.PostgresDestinationConnectionOptions{
TruncateTable: j.PostgresOptions.TruncateTableConfig.ToDto(),
InitTableSchema: j.PostgresOptions.InitTableSchema,
OnConflict: j.PostgresOptions.OnConflictConfig.ToDto(),
SkipForeignKeyViolations: j.PostgresOptions.SkipForeignKeyViolations,
},
PostgresOptions: j.PostgresOptions.ToDto(),
},
}
}
if j.MysqlOptions != nil {
if j.MysqlOptions.TruncateTableConfig == nil {
j.MysqlOptions.TruncateTableConfig = &MysqlTruncateTableConfig{}
}
if j.MysqlOptions.OnConflictConfig == nil {
j.MysqlOptions.OnConflictConfig = &MysqlOnConflictConfig{}
}
return &mgmtv1alpha1.JobDestinationOptions{
Config: &mgmtv1alpha1.JobDestinationOptions_MysqlOptions{
MysqlOptions: &mgmtv1alpha1.MysqlDestinationConnectionOptions{
TruncateTable: j.MysqlOptions.TruncateTableConfig.ToDto(),
InitTableSchema: j.MysqlOptions.InitTableSchema,
OnConflict: j.MysqlOptions.OnConflictConfig.ToDto(),
SkipForeignKeyViolations: j.MysqlOptions.SkipForeignKeyViolations,
},
MysqlOptions: j.MysqlOptions.ToDto(),
},
}
}
Expand Down Expand Up @@ -1719,33 +1799,13 @@ func (j *JobDestinationOptions) FromDto(dto *mgmtv1alpha1.JobDestinationOptions)
if dto == nil {
dto = &mgmtv1alpha1.JobDestinationOptions{}
}
switch config := dto.Config.(type) {
switch config := dto.GetConfig().(type) {
case *mgmtv1alpha1.JobDestinationOptions_PostgresOptions:
truncateCfg := &PostgresTruncateTableConfig{}
truncateCfg.FromDto(config.PostgresOptions.TruncateTable)
j.PostgresOptions = &PostgresDestinationOptions{
InitTableSchema: config.PostgresOptions.InitTableSchema,
TruncateTableConfig: truncateCfg,
SkipForeignKeyViolations: config.PostgresOptions.SkipForeignKeyViolations,
}
if config.PostgresOptions.OnConflict != nil {
onConflictCfg := &PostgresOnConflictConfig{}
onConflictCfg.FromDto(config.PostgresOptions.OnConflict)
j.PostgresOptions.OnConflictConfig = onConflictCfg
}
j.PostgresOptions = &PostgresDestinationOptions{}
j.PostgresOptions.FromDto(config.PostgresOptions)
case *mgmtv1alpha1.JobDestinationOptions_MysqlOptions:
truncateCfg := &MysqlTruncateTableConfig{}
truncateCfg.FromDto(config.MysqlOptions.TruncateTable)
j.MysqlOptions = &MysqlDestinationOptions{
InitTableSchema: config.MysqlOptions.InitTableSchema,
TruncateTableConfig: truncateCfg,
SkipForeignKeyViolations: config.MysqlOptions.SkipForeignKeyViolations,
}
if config.MysqlOptions.OnConflict != nil {
onConflictCfg := &MysqlOnConflictConfig{}
onConflictCfg.FromDto(config.MysqlOptions.OnConflict)
j.MysqlOptions.OnConflictConfig = onConflictCfg
}
j.MysqlOptions = &MysqlDestinationOptions{}
j.MysqlOptions.FromDto(config.MysqlOptions)
case *mgmtv1alpha1.JobDestinationOptions_AwsS3Options:
j.AwsS3Options = &AwsS3DestinationOptions{}
j.AwsS3Options.FromDto(config.AwsS3Options)
Expand Down
76 changes: 76 additions & 0 deletions docs/openapi/mgmt/v1alpha1/job.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,7 @@ components:
maxInFlight:
type: integer
title: max_in_flight
minimum: 1
description: The maximum number of batched messages to have in flight at a given time. Increase this to improve throughput.
timeout:
type: string
Expand Down Expand Up @@ -3196,6 +3197,21 @@ components:
description: MongoDB connection options for a job source
mgmt.v1alpha1.MssqlDestinationConnectionOptions:
type: object
allOf:
- anyOf:
- required:
- maxInFlight
- not:
anyOf:
- required:
- maxInFlight
anyOf:
- required:
- maxInFlight
- not:
anyOf:
- required:
- maxInFlight
properties:
truncateTable:
allOf:
Expand All @@ -3214,6 +3230,16 @@ components:
type: boolean
title: skip_foreign_key_violations
description: Insert all valid records, skipping any that violate foreign key constraints.
batch:
allOf:
- title: batch
description: Configure batching options to handle how much data is sent to your database at once.
- $ref: '#/components/schemas/mgmt.v1alpha1.BatchConfig'
maxInFlight:
type: integer
title: max_in_flight
minimum: 1
description: Determines the maximum number of parallel batched inserts.
title: MssqlDestinationConnectionOptions
additionalProperties: false
mgmt.v1alpha1.MssqlOnConflictConfig:
Expand Down Expand Up @@ -3303,6 +3329,21 @@ components:
additionalProperties: false
mgmt.v1alpha1.MysqlDestinationConnectionOptions:
type: object
allOf:
- anyOf:
- required:
- maxInFlight
- not:
anyOf:
- required:
- maxInFlight
anyOf:
- required:
- maxInFlight
- not:
anyOf:
- required:
- maxInFlight
properties:
truncateTable:
allOf:
Expand All @@ -3319,6 +3360,16 @@ components:
type: boolean
title: skip_foreign_key_violations
description: Insert all valid records, skipping any that violate foreign key constraints.
batch:
allOf:
- title: batch
description: Configure batching options to handle how much data is sent to your database at once.
- $ref: '#/components/schemas/mgmt.v1alpha1.BatchConfig'
maxInFlight:
type: integer
title: max_in_flight
minimum: 1
description: Determines the maximum number of parallel batched inserts.
title: MysqlDestinationConnectionOptions
additionalProperties: false
mgmt.v1alpha1.MysqlOnConflictConfig:
Expand Down Expand Up @@ -3659,6 +3710,21 @@ components:
additionalProperties: false
mgmt.v1alpha1.PostgresDestinationConnectionOptions:
type: object
allOf:
- anyOf:
- required:
- maxInFlight
- not:
anyOf:
- required:
- maxInFlight
anyOf:
- required:
- maxInFlight
- not:
anyOf:
- required:
- maxInFlight
properties:
truncateTable:
allOf:
Expand All @@ -3675,6 +3741,16 @@ components:
type: boolean
title: skip_foreign_key_violations
description: Insert all valid records, skipping any that violate foreign key constraints.
batch:
allOf:
- title: batch
description: Configure batching options to handle how much data is sent to your database at once.
- $ref: '#/components/schemas/mgmt.v1alpha1.BatchConfig'
maxInFlight:
type: integer
title: max_in_flight
minimum: 1
description: Determines the maximum number of parallel batched inserts.
title: PostgresDestinationConnectionOptions
additionalProperties: false
mgmt.v1alpha1.PostgresOnConflictConfig:
Expand Down
Loading
Loading