diff --git a/backend/sql/postgresql/models/models.go b/backend/sql/postgresql/models/models.go index 1e820ca3fb..0fa9f2f4d6 100644 --- a/backend/sql/postgresql/models/models.go +++ b/backend/sql/postgresql/models/models.go @@ -1449,6 +1449,50 @@ type PostgresDestinationOptions struct { InitTableSchema bool `json:"initTableSchema"` OnConflictConfig *PostgresOnConflictConfig `json:"onConflictConfig,omitempty"` SkipForeignKeyViolations bool `json:"skipForeignKeyViolations"` + MaxInFlight *uint32 `json:"maxInFlight,omitempty"` + Batch *BatchConfig `json:"batch,omitempty"` +} + +func (m *PostgresDestinationOptions) ToDto() *mgmtv1alpha1.PostgresDestinationConnectionOptions { + if m.TruncateTableConfig == nil { + m.TruncateTableConfig = &PostgresTruncateTableConfig{} + } + if m.OnConflictConfig == nil { + m.OnConflictConfig = &PostgresOnConflictConfig{} + } + var batchConfig *mgmtv1alpha1.BatchConfig + if m.Batch != nil { + batchConfig = m.Batch.ToDto() + } + return &mgmtv1alpha1.PostgresDestinationConnectionOptions{ + TruncateTable: m.TruncateTableConfig.ToDto(), + InitTableSchema: m.InitTableSchema, + OnConflict: m.OnConflictConfig.ToDto(), + SkipForeignKeyViolations: m.SkipForeignKeyViolations, + MaxInFlight: m.MaxInFlight, + Batch: batchConfig, + } +} + +func (m *PostgresDestinationOptions) FromDto(dto *mgmtv1alpha1.PostgresDestinationConnectionOptions) { + if dto == nil { + dto = &mgmtv1alpha1.PostgresDestinationConnectionOptions{} + } + m.InitTableSchema = dto.GetInitTableSchema() + if dto.GetOnConflict() != nil { + m.OnConflictConfig = &PostgresOnConflictConfig{} + m.OnConflictConfig.FromDto(dto.GetOnConflict()) + } + if dto.GetTruncateTable() != nil { + m.TruncateTableConfig = &PostgresTruncateTableConfig{} + m.TruncateTableConfig.FromDto(dto.GetTruncateTable()) + } + m.SkipForeignKeyViolations = dto.GetSkipForeignKeyViolations() + m.MaxInFlight = dto.MaxInFlight + if dto.GetBatch() != nil { + m.Batch = &BatchConfig{} + m.Batch.FromDto(dto.GetBatch()) + } } type PostgresOnConflictConfig struct { @@ -1487,6 +1531,50 @@ type MysqlDestinationOptions struct { InitTableSchema bool `json:"initTableSchema"` OnConflictConfig *MysqlOnConflictConfig `json:"onConflict,omitempty"` SkipForeignKeyViolations bool `json:"skipForeignKeyViolations"` + MaxInFlight *uint32 `json:"maxInFlight,omitempty"` + Batch *BatchConfig `json:"batch,omitempty"` +} + +func (m *MysqlDestinationOptions) ToDto() *mgmtv1alpha1.MysqlDestinationConnectionOptions { + if m.TruncateTableConfig == nil { + m.TruncateTableConfig = &MysqlTruncateTableConfig{} + } + if m.OnConflictConfig == nil { + m.OnConflictConfig = &MysqlOnConflictConfig{} + } + var batchConfig *mgmtv1alpha1.BatchConfig + if m.Batch != nil { + batchConfig = m.Batch.ToDto() + } + return &mgmtv1alpha1.MysqlDestinationConnectionOptions{ + TruncateTable: m.TruncateTableConfig.ToDto(), + InitTableSchema: m.InitTableSchema, + OnConflict: m.OnConflictConfig.ToDto(), + SkipForeignKeyViolations: m.SkipForeignKeyViolations, + MaxInFlight: m.MaxInFlight, + Batch: batchConfig, + } +} + +func (m *MysqlDestinationOptions) FromDto(dto *mgmtv1alpha1.MysqlDestinationConnectionOptions) { + if dto == nil { + dto = &mgmtv1alpha1.MysqlDestinationConnectionOptions{} + } + m.InitTableSchema = dto.GetInitTableSchema() + if dto.GetOnConflict() != nil { + m.OnConflictConfig = &MysqlOnConflictConfig{} + m.OnConflictConfig.FromDto(dto.GetOnConflict()) + } + if dto.GetTruncateTable() != nil { + m.TruncateTableConfig = &MysqlTruncateTableConfig{} + m.TruncateTableConfig.FromDto(dto.GetTruncateTable()) + } + m.SkipForeignKeyViolations = dto.GetSkipForeignKeyViolations() + m.MaxInFlight = dto.MaxInFlight + if dto.GetBatch() != nil { + m.Batch = &BatchConfig{} + m.Batch.FromDto(dto.GetBatch()) + } } type MysqlOnConflictConfig struct { @@ -1522,6 +1610,8 @@ type MssqlDestinationOptions struct { InitTableSchema bool `json:"initTableSchema"` OnConflictConfig *MssqlOnConflictConfig `json:"onConflict,omitempty"` SkipForeignKeyViolations bool `json:"skipForeignKeyViolations"` + MaxInFlight *uint32 `json:"maxInFlight,omitempty"` + Batch *BatchConfig `json:"batch,omitempty"` } func (m *MssqlDestinationOptions) ToDto() *mgmtv1alpha1.MssqlDestinationConnectionOptions { @@ -1534,11 +1624,18 @@ func (m *MssqlDestinationOptions) ToDto() *mgmtv1alpha1.MssqlDestinationConnecti onconflictConfig = m.OnConflictConfig.ToDto() } + var batchConfig *mgmtv1alpha1.BatchConfig + if m.Batch != nil { + batchConfig = m.Batch.ToDto() + } + return &mgmtv1alpha1.MssqlDestinationConnectionOptions{ TruncateTable: truncateTableConfig, InitTableSchema: m.InitTableSchema, OnConflict: onconflictConfig, SkipForeignKeyViolations: m.SkipForeignKeyViolations, + MaxInFlight: m.MaxInFlight, + Batch: batchConfig, } } func (m *MssqlDestinationOptions) FromDto(dto *mgmtv1alpha1.MssqlDestinationConnectionOptions) { @@ -1555,6 +1652,11 @@ func (m *MssqlDestinationOptions) FromDto(dto *mgmtv1alpha1.MssqlDestinationConn m.TruncateTableConfig.FromDto(dto.GetTruncateTable()) } m.SkipForeignKeyViolations = dto.GetSkipForeignKeyViolations() + m.MaxInFlight = dto.MaxInFlight + if dto.GetBatch() != nil { + m.Batch = &BatchConfig{} + m.Batch.FromDto(dto.GetBatch()) + } } type MssqlOnConflictConfig struct { @@ -1593,38 +1695,16 @@ func (t *MssqlTruncateTableConfig) FromDto(dto *mgmtv1alpha1.MssqlTruncateTableC func (j *JobDestinationOptions) ToDto() *mgmtv1alpha1.JobDestinationOptions { if j.PostgresOptions != nil { - if j.PostgresOptions.TruncateTableConfig == nil { - j.PostgresOptions.TruncateTableConfig = &PostgresTruncateTableConfig{} - } - if j.PostgresOptions.OnConflictConfig == nil { - j.PostgresOptions.OnConflictConfig = &PostgresOnConflictConfig{} - } return &mgmtv1alpha1.JobDestinationOptions{ Config: &mgmtv1alpha1.JobDestinationOptions_PostgresOptions{ - PostgresOptions: &mgmtv1alpha1.PostgresDestinationConnectionOptions{ - TruncateTable: j.PostgresOptions.TruncateTableConfig.ToDto(), - InitTableSchema: j.PostgresOptions.InitTableSchema, - OnConflict: j.PostgresOptions.OnConflictConfig.ToDto(), - SkipForeignKeyViolations: j.PostgresOptions.SkipForeignKeyViolations, - }, + PostgresOptions: j.PostgresOptions.ToDto(), }, } } if j.MysqlOptions != nil { - if j.MysqlOptions.TruncateTableConfig == nil { - j.MysqlOptions.TruncateTableConfig = &MysqlTruncateTableConfig{} - } - if j.MysqlOptions.OnConflictConfig == nil { - j.MysqlOptions.OnConflictConfig = &MysqlOnConflictConfig{} - } return &mgmtv1alpha1.JobDestinationOptions{ Config: &mgmtv1alpha1.JobDestinationOptions_MysqlOptions{ - MysqlOptions: &mgmtv1alpha1.MysqlDestinationConnectionOptions{ - TruncateTable: j.MysqlOptions.TruncateTableConfig.ToDto(), - InitTableSchema: j.MysqlOptions.InitTableSchema, - OnConflict: j.MysqlOptions.OnConflictConfig.ToDto(), - SkipForeignKeyViolations: j.MysqlOptions.SkipForeignKeyViolations, - }, + MysqlOptions: j.MysqlOptions.ToDto(), }, } } @@ -1719,33 +1799,13 @@ func (j *JobDestinationOptions) FromDto(dto *mgmtv1alpha1.JobDestinationOptions) if dto == nil { dto = &mgmtv1alpha1.JobDestinationOptions{} } - switch config := dto.Config.(type) { + switch config := dto.GetConfig().(type) { case *mgmtv1alpha1.JobDestinationOptions_PostgresOptions: - truncateCfg := &PostgresTruncateTableConfig{} - truncateCfg.FromDto(config.PostgresOptions.TruncateTable) - j.PostgresOptions = &PostgresDestinationOptions{ - InitTableSchema: config.PostgresOptions.InitTableSchema, - TruncateTableConfig: truncateCfg, - SkipForeignKeyViolations: config.PostgresOptions.SkipForeignKeyViolations, - } - if config.PostgresOptions.OnConflict != nil { - onConflictCfg := &PostgresOnConflictConfig{} - onConflictCfg.FromDto(config.PostgresOptions.OnConflict) - j.PostgresOptions.OnConflictConfig = onConflictCfg - } + j.PostgresOptions = &PostgresDestinationOptions{} + j.PostgresOptions.FromDto(config.PostgresOptions) case *mgmtv1alpha1.JobDestinationOptions_MysqlOptions: - truncateCfg := &MysqlTruncateTableConfig{} - truncateCfg.FromDto(config.MysqlOptions.TruncateTable) - j.MysqlOptions = &MysqlDestinationOptions{ - InitTableSchema: config.MysqlOptions.InitTableSchema, - TruncateTableConfig: truncateCfg, - SkipForeignKeyViolations: config.MysqlOptions.SkipForeignKeyViolations, - } - if config.MysqlOptions.OnConflict != nil { - onConflictCfg := &MysqlOnConflictConfig{} - onConflictCfg.FromDto(config.MysqlOptions.OnConflict) - j.MysqlOptions.OnConflictConfig = onConflictCfg - } + j.MysqlOptions = &MysqlDestinationOptions{} + j.MysqlOptions.FromDto(config.MysqlOptions) case *mgmtv1alpha1.JobDestinationOptions_AwsS3Options: j.AwsS3Options = &AwsS3DestinationOptions{} j.AwsS3Options.FromDto(config.AwsS3Options)