Skip to content

Commit

Permalink
Enhance schema validation logic for data only migrations (#739)
Browse files Browse the repository at this point in the history
* Enhance schema validation logic for data only migrations

* Address comments

* Add #columns check

* Fix typos
  • Loading branch information
manitgupta committed Jan 4, 2024
1 parent 4068e96 commit 3fd736f
Showing 1 changed file with 34 additions and 52 deletions.
86 changes: 34 additions & 52 deletions common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,90 +588,72 @@ func ReadSpannerSchema(ctx context.Context, conv *internal.Conv, client *sp.Clie
}

// CompareSchema compares the spanner schema of two conv objects and returns specific error if they don't match
func CompareSchema(conv1, conv2 *internal.Conv) error {
if conv1.SpDialect != conv2.SpDialect {
return fmt.Errorf("spanner dialect don't match")
func CompareSchema(sessionFileConv, actualSpannerConv *internal.Conv) error {
if sessionFileConv.SpDialect != actualSpannerConv.SpDialect {
return fmt.Errorf("spanner dialect don't match: session dialect %v, spanner dialect %v", sessionFileConv.SpDialect, actualSpannerConv.SpDialect)
}
for _, sessionTable := range conv1.SpSchema {
spannerTableId, _ := internal.GetTableIdFromSpName(conv2.SpSchema, sessionTable.Name)
spannerTable := conv2.SpSchema[spannerTableId]
for _, sessionTable := range sessionFileConv.SpSchema {
spannerTableId, err := internal.GetTableIdFromSpName(actualSpannerConv.SpSchema, sessionTable.Name)
if err != nil {
return fmt.Errorf("table %v not found in the spanner database schema but found in the session file. If this table does not need to be migrated, please exclude it during the schema conversion and migration process", sessionTable.Name)
}
spannerTable := actualSpannerConv.SpSchema[spannerTableId]
sessionTableParentName := sessionFileConv.SpSchema[sessionTable.ParentId].Name
spannerTableParentName := actualSpannerConv.SpSchema[spannerTable.ParentId].Name

//table names should match
if sessionTable.Name != spannerTable.Name {
return fmt.Errorf("table name don't match: session table %v, spanner table %v", sessionTable.Name, spannerTable.Name)
}

//parent table names should match
if sessionTableParentName != spannerTableParentName {
return fmt.Errorf("parent table name don't match: session table %v, parent session table name: %v, spanner table %v, parent spanner table name: %v", sessionTable.Name, sessionTableParentName, spannerTable.Name, spannerTableParentName)
}

sessionTableParentName := conv1.SpSchema[sessionTable.ParentId].Name
spannerTableParentName := conv2.SpSchema[spannerTable.ParentId].Name
//number of columns should match
if len(sessionTable.ColDefs) != len(spannerTable.ColDefs) {
return fmt.Errorf("number of columns don't match: session table %v, spanner table %v", sessionTable.Name, spannerTable.Name)
}

if sessionTable.Name != spannerTable.Name || sessionTableParentName != spannerTableParentName ||
len(sessionTable.PrimaryKeys) != len(spannerTable.PrimaryKeys) || len(sessionTable.ColDefs) != len(spannerTable.ColDefs) ||
len(sessionTable.Indexes) != len(spannerTable.Indexes) {
return fmt.Errorf("table detail for table %v don't match", sessionTable.Name)
//primary keys should be of the same length
if len(sessionTable.PrimaryKeys) != len(spannerTable.PrimaryKeys) {
return fmt.Errorf("primary keys don't match: session table primary key length %v: %v, spanner table primary key length %v: %v", sessionTable.Name, len(sessionTable.PrimaryKeys), spannerTable.Name, len(spannerTable.PrimaryKeys))
}

// Sorts both primary key slices based on primary key order
sortKeysByOrder(sessionTable.PrimaryKeys)
sortKeysByOrder(spannerTable.PrimaryKeys)

//primary keys should be of the same order
for idx, sessionPk := range sessionTable.PrimaryKeys {
sessionTablePkCol := sessionTable.ColDefs[sessionPk.ColId]
correspondingSpColId, _ := internal.GetColIdFromSpName(spannerTable.ColDefs, sessionTablePkCol.Name)
spannerTablePkCol := spannerTable.ColDefs[correspondingSpColId]

if sessionTablePkCol.Name != spannerTablePkCol.Name || sessionTable.PrimaryKeys[idx].Desc != spannerTable.PrimaryKeys[idx].Desc {
return fmt.Errorf("primary keys for table %v don't match", sessionTable.Name)
return fmt.Errorf("primary keys for table %v are not identical: session table primary key %v, spanner table primary key %v", sessionTable.Name, sessionTable.PrimaryKeys, spannerTable.PrimaryKeys)
}
}

//columns should be identical in terms of data type, name, length, nullability
for _, sessionColDef := range sessionTable.ColDefs {
correspondingSpColId, _ := internal.GetColIdFromSpName(spannerTable.ColDefs, sessionColDef.Name)
spannerColDef := spannerTable.ColDefs[correspondingSpColId]

// In case of PostgreSQL dialect, Spanner by default adds is_nullable = false to all the columns that are a part of primary key.
// Therefore, we cannot compare NotNull attributes for these columns.
if conv1.SpDialect == constants.DIALECT_POSTGRESQL && FindInPrimaryKey(sessionColDef.Id, sessionTable.PrimaryKeys) {
if sessionFileConv.SpDialect == constants.DIALECT_POSTGRESQL && FindInPrimaryKey(sessionColDef.Id, sessionTable.PrimaryKeys) {
if sessionColDef.Name != spannerColDef.Name ||
sessionColDef.T.IsArray != spannerColDef.T.IsArray || sessionColDef.T.Len != spannerColDef.T.Len || sessionColDef.T.Name != spannerColDef.T.Name {
return fmt.Errorf("column detail for table %v don't match", sessionTable.Name)
return fmt.Errorf("column detail for table %v don't match: session column name: %v, spanner column: %v", sessionTable.Name, sessionColDef, spannerColDef)
}

} else {
if sessionColDef.Name != spannerColDef.Name ||
sessionColDef.T.IsArray != spannerColDef.T.IsArray || sessionColDef.T.Len != spannerColDef.T.Len || sessionColDef.T.Name != spannerColDef.T.Name || sessionColDef.NotNull != spannerColDef.NotNull {
return fmt.Errorf("column detail for table %v don't match", sessionTable.Name)
}
}
}
for _, sessionTableIndex := range sessionTable.Indexes {
found := 0
for _, spannerTableIndex := range spannerTable.Indexes {
if sessionTableIndex.Name == spannerTableIndex.Name {
found = 1

sessionTableName := conv1.SpSchema[sessionTableIndex.TableId].Name
spannerTableName := conv2.SpSchema[spannerTableIndex.TableId].Name

// Sorts both primary key slices based on index key order
sortKeysByOrder(sessionTableIndex.Keys)
sortKeysByOrder(spannerTableIndex.Keys)

if sessionTableName != spannerTableName || sessionTableIndex.Unique != spannerTableIndex.Unique ||
len(sessionTableIndex.Keys) != len(spannerTableIndex.Keys) {
return fmt.Errorf("index %v - details don't match", sessionTableIndex.Name)
}

for idx, indexKey := range sessionTableIndex.Keys {
sessionIndexColumn := sessionTable.ColDefs[indexKey.ColId]
spannerIndexColumnId, _ := internal.GetColIdFromSpName(spannerTable.ColDefs, sessionIndexColumn.Name)
spannerIndexColumn := spannerTable.ColDefs[spannerIndexColumnId]

if sessionIndexColumn.Name != spannerIndexColumn.Name ||
sessionTableIndex.Keys[idx].Desc != spannerTableIndex.Keys[idx].Desc {
return fmt.Errorf("index %v - keys don't match", sessionTableIndex.Name)
}
}
break
return fmt.Errorf("column detail for table %v don't match: session column: %v, spanner column: %v", sessionTable.Name, sessionColDef, spannerColDef)
}
}
if found == 0 {
return fmt.Errorf("index %v not found in spanner schema", sessionTableIndex.Name)
}
}
}
return nil
Expand Down

0 comments on commit 3fd736f

Please sign in to comment.