Skip to content

Commit

Permalink
Addition initial shell for additional mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Oct 31, 2024
1 parent acfee14 commit fa10347
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 2 deletions.
234 changes: 232 additions & 2 deletions internal/benthos/benthos-builder/builders/sql-util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency"
bb_internal "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/internal"
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
"github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared"
)

const (
Expand All @@ -22,7 +23,10 @@ const (
)

type sqlJobSourceOpts struct {
HaltOnNewColumnAddition bool
// Determines if the job should halt if a new column is detected that is not present in the job mappings
HaltOnNewColumnAddition bool
// Newly detected columns are automatically transformed
GenerateNewColumnTransformers bool
SubsetByForeignKeyConstraints bool
SchemaOpt []*schemaOptions
}
Expand All @@ -45,6 +49,8 @@ type tableMapping struct {
Mappings []*mgmtv1alpha1.JobMapping
}

// Based on the source schema and the provided job mappings, the job mappings must be at least a subset of the source schema
// Otherwise, the sync is doomed for failure
func areMappingsSubsetOfSchemas(
groupedSchemas map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow,
mappings []*mgmtv1alpha1.JobMapping,
Expand Down Expand Up @@ -81,6 +87,7 @@ func areMappingsSubsetOfSchemas(
return true
}

// Builds a map of <schema.table>->column
func getUniqueColMappingsMap(
mappings []*mgmtv1alpha1.JobMapping,
) map[string]map[string]struct{} {
Expand All @@ -98,6 +105,8 @@ func getUniqueColMappingsMap(
return tableColMappings
}

// Based on the source schema, we check each mapped table for newly added columns that are not present in the mappings,
// but are present in the source. If so, halt because this means PII may be leaked.
func shouldHaltOnSchemaAddition(
groupedSchemas map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow,
mappings []*mgmtv1alpha1.JobMapping,
Expand Down Expand Up @@ -487,8 +496,22 @@ func getSqlJobSourceOpts(
Tables: tableOpts,
})
}
shouldHalt := false
shouldGenerateNewColTransforms := false
switch jobSourceConfig.Postgres.GetNewColumnAdditionStrategy().GetStrategy().(type) {
case *mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob:
shouldHalt = true
case *mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy_UseAutoMap:
shouldGenerateNewColTransforms = true
}
// deprecated fallback if no strategy has been defined
if !shouldHalt && !shouldGenerateNewColTransforms {
shouldHalt = jobSourceConfig.Postgres.GetHaltOnNewColumnAddition()
}

return &sqlJobSourceOpts{
HaltOnNewColumnAddition: jobSourceConfig.Postgres.HaltOnNewColumnAddition,
HaltOnNewColumnAddition: shouldHalt,
GenerateNewColumnTransformers: shouldGenerateNewColTransforms,
SubsetByForeignKeyConstraints: jobSourceConfig.Postgres.SubsetByForeignKeyConstraints,
SchemaOpt: schemaOpt,
}, nil
Expand Down Expand Up @@ -651,3 +674,210 @@ func getParsedBatchingConfig(destOpt batchDestinationOption) (batchingConfig, er
}
return output, nil
}

// Based on the source schema and the provided mappings, we find the missing columns (if any) and generate job mappings for them automatically
func getAdditionalJobMappings(
driver string,
groupedSchemas map[string]map[string]*sqlmanager_shared.ColumnInfo,
mappings []*mgmtv1alpha1.JobMapping,
getTableFromKey func(key string) (schema, table string, err error),
logger *slog.Logger,
) ([]*mgmtv1alpha1.JobMapping, error) {
output := []*mgmtv1alpha1.JobMapping{}

tableColMappings := getUniqueColMappingsMap(mappings)
// The mappings are equivalent to the grouped schemas, no work needs to be done
if len(tableColMappings) == len(groupedSchemas) {
return output, nil
}

for schematable, cols := range groupedSchemas {
mappedCols, ok := tableColMappings[schematable]
if !ok {
// todo: we may want to generate mappings for this entire table? However this may be dead code as we get the grouped schemas based on the mappings
logger.Warn("table found in schema data that is not present in job mappings", "table", schematable)
continue
}
if len(cols) == len(mappedCols) {
continue
}
for col, info := range cols {
if _, ok := mappedCols[col]; !ok {
schema, table, err := getTableFromKey(schematable)
if err != nil {
return nil, err
}
// we found a column that is not present in the mappings, let's create a mapping for it
if info.ColumnDefault != "" {
output = append(output, &mgmtv1alpha1.JobMapping{
Schema: schema,
Table: table,
Column: col,
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig{
GenerateDefaultConfig: &mgmtv1alpha1.GenerateDefault{},
},
},
},
})
} else if info.IdentityGeneration != nil {
// maybe need to do something special here?
} else if info.IsNullable {
output = append(output, &mgmtv1alpha1.JobMapping{
Schema: schema,
Table: table,
Column: col,
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{
Nullconfig: &mgmtv1alpha1.Null{},
},
},
},
})
} else {
switch driver {
case "postgres":
output = append(output, &mgmtv1alpha1.JobMapping{
Schema: schema,
Table: table,
Column: col,
Transformer: getJmTransformerByPostgresDataType(info),
})
default:
logger.Warn("this driver is not currently supported for additional job mapping by data type")
continue
}
}
}
}
}

return output, nil
}
func getJmTransformerByPostgresDataType(colInfo *sqlmanager_shared.ColumnInfo) *mgmtv1alpha1.JobMappingTransformer {
switch colInfo.DataType {
case "smallint":
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64Config{
GenerateInt64Config: &mgmtv1alpha1.GenerateInt64{
Min: shared.Ptr(int64(-32768)),
Max: shared.Ptr(int64(32767)),
},
},
},
}
case "integer":
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64Config{
GenerateInt64Config: &mgmtv1alpha1.GenerateInt64{
Min: shared.Ptr(int64(-2147483648)),
Max: shared.Ptr(int64(2147483647)),
},
},
},
}
case "bigint":
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64Config{
GenerateInt64Config: &mgmtv1alpha1.GenerateInt64{
Min: shared.Ptr(int64(-9223372036854775808)),
Max: shared.Ptr(int64(9223372036854775807)),
},
},
},
}
case "decimal", "numeric":
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateFloat64Config{
GenerateFloat64Config: &mgmtv1alpha1.GenerateFloat64{
Precision: intPtrToInt64Ptr(colInfo.NumericPrecision), // todo: we need to expose scale...
// Min: shared.Ptr(float64(1)),
// Max: shared.Ptr(float64(1)),
// Precision: shared.Ptr(int64(*colInfo.NumericPrecision)),
// // Min: shared.Ptr(int64(-9223372036854775808)),
// // Max: shared.Ptr(int64(9223372036854775807)),
},
},
},
}
case "real", "double precision":
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateFloat64Config{
GenerateFloat64Config: &mgmtv1alpha1.GenerateFloat64{
Precision: intPtrToInt64Ptr(colInfo.NumericPrecision),
// Min: shared.Ptr(float64(1)),
// Max: shared.Ptr(float64(1)),
// Precision: shared.Ptr(int64(*colInfo.NumericPrecision)),
// // Min: shared.Ptr(int64(-9223372036854775808)),
// // Max: shared.Ptr(int64(9223372036854775807)),
},
},
},
}

case "smallserial", "serial", "bigserial":
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig{
GenerateDefaultConfig: &mgmtv1alpha1.GenerateDefault{},
},
},
}
case "money":
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateFloat64Config{
GenerateFloat64Config: &mgmtv1alpha1.GenerateFloat64{
// todo: to adequately support money, we need to know the scale which is set via the lc_monetary setting (but may be properly populated via our query..)
Precision: intPtrToInt64Ptr(colInfo.NumericPrecision),
Min: shared.Ptr(float64(-92233720368547758.08)),
Max: shared.Ptr(float64(92233720368547758.07)),
},
},
},
}
case "text", "bpchar", "character", "character varying": // todo: test to see if this works when (n) has been specified
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateStringConfig{
GenerateStringConfig: &mgmtv1alpha1.GenerateString{}, // todo?
},
},
}
// case "bytea": // todo https://www.postgresql.org/docs/current/datatype-binary.html
case "date":
return &mgmtv1alpha1.JobMappingTransformer{}

case "time without time zone":
return &mgmtv1alpha1.JobMappingTransformer{}

case "time with time zone":
return &mgmtv1alpha1.JobMappingTransformer{}

case "interval":
return &mgmtv1alpha1.JobMappingTransformer{}

case "timestamp without time zone":
return &mgmtv1alpha1.JobMappingTransformer{}

case "timestamp with time zone":
return &mgmtv1alpha1.JobMappingTransformer{}
default:
return &mgmtv1alpha1.JobMappingTransformer{}
}
}

func intPtrToInt64Ptr(input *int) *int64 {
if input == nil {
return nil
}
out := int64(*input)
return &out
}
9 changes: 9 additions & 0 deletions internal/benthos/benthos-builder/builders/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ func (b *sqlSyncBuilder) BuildSourceConfigs(ctx context.Context, params *bb_inte
shouldHaltOnSchemaAddition(groupedColumnInfo, job.Mappings) {
return nil, errors.New(haltOnSchemaAdditionErrMsg)
}
if sqlSourceOpts != nil && sqlSourceOpts.GenerateNewColumnTransformers {
extraMappings, err := getAdditionalJobMappings(b.driver, groupedColumnInfo, job.Mappings, func(key string) (schema string, table string, err error) {
return "", "", nil // todo
}, logger)
if err != nil {
return nil, err
}
job.Mappings = append(job.Mappings, extraMappings...)
}
uniqueSchemas := shared.GetUniqueSchemasFromMappings(job.Mappings)

tableConstraints, err := db.Db.GetTableConstraintsBySchema(ctx, uniqueSchemas)
Expand Down

0 comments on commit fa10347

Please sign in to comment.