Skip to content

Commit

Permalink
Introducing migrationProjectId and decoupling Spanner projectId (#802)
Browse files Browse the repository at this point in the history
* Introducing migrationProjectId and decoupling Spanner projectId

* Test file changes

* Addressing comments and adding UI side changes

* Fixing cleanup command

* reverting whitespace changes

* Not using project id from target profile

* documentation changes

* Addressing comments and correcting LaunchDataflowJob method
  • Loading branch information
darshan-sj committed Apr 5, 2024
1 parent c5e5b2e commit 7681dce
Show file tree
Hide file tree
Showing 42 changed files with 337 additions and 252 deletions.
10 changes: 9 additions & 1 deletion cmd/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"os"
"path"

"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/streaming"
"github.com/google/subcommands"
"go.uber.org/zap"
)

// CleanupCmd is the command for cleaning up the migration resources during a minimal
Expand Down Expand Up @@ -108,7 +110,13 @@ func (cmd *CleanupCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interf
Pubsub: cmd.pubsub,
Monitoring: cmd.monitoring,
}
getInfo := &utils.GetUtilInfoImpl{}
migrationProjectId, err := getInfo.GetProject()
if err != nil {
logger.Log.Error("Could not get project id from gcloud environment. Inferring migration project id from target profile.", zap.Error(err))
migrationProjectId = project
}
logger.Log.Info(fmt.Sprintf("Initiating job cleanup for jobId: %v \n", cmd.jobId))
streaming.InitiateJobCleanup(ctx, cmd.jobId, dataShardIds, jobCleanupOptions, project, instance)
streaming.InitiateJobCleanup(ctx, cmd.jobId, dataShardIds, jobCleanupOptions, migrationProjectId, project, instance)
return subcommands.ExitSuccess
}
16 changes: 13 additions & 3 deletions cmd/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type DataCmd struct {
targetProfile string
sessionJSON string
filePrefix string // TODO: move filePrefix to global flags
project string
WriteLimit int64
dryRun bool
logLevel string
Expand Down Expand Up @@ -84,6 +85,7 @@ func (cmd *DataCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)")
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration")
f.Int64Var(&cmd.WriteLimit, "write-limit", DefaultWritersLimit, "Write limit for writes to spanner")
f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database")
f.StringVar(&cmd.logLevel, "log-level", "DEBUG", "Configure the logging level for the command (INFO, DEBUG), defaults to DEBUG")
Expand Down Expand Up @@ -114,6 +116,14 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
err = fmt.Errorf("error while preparing prerequisites for migration: %v", err)
return subcommands.ExitUsageError
}
if cmd.project == "" {
getInfo := &utils.GetUtilInfoImpl{}
cmd.project, err = getInfo.GetProject()
if err != nil {
logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err))
return subcommands.ExitUsageError
}
}
var (
bw *writer.BatchWriter
banner string
Expand Down Expand Up @@ -149,7 +159,7 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
)
if !cmd.dryRun {
now := time.Now()
bw, err = MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
bw, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
if err != nil {
err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err)
return subcommands.ExitFailure
Expand All @@ -159,14 +169,14 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
conv.Audit.DryRun = true
// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
err := ValidateResourceGenerationHelper(ctx, cmd.project, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
if err != nil {
return subcommands.ExitFailure
}
}

convImpl := &conversion.ConvImpl{}
bw, err = convImpl.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
bw, err = convImpl.DataConv(ctx, cmd.project, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})

if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbName, err)
Expand Down
14 changes: 12 additions & 2 deletions cmd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type SchemaCmd struct {
target string
targetProfile string
filePrefix string // TODO: move filePrefix to global flags
project string
logLevel string
dryRun bool
validate bool
Expand Down Expand Up @@ -74,6 +75,7 @@ func (cmd *SchemaCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)")
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration")
f.StringVar(&cmd.logLevel, "log-level", "DEBUG", "Configure the logging level for the command (INFO, DEBUG), defaults to DEBUG")
f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database")
f.BoolVar(&cmd.validate, "validate", false, "Flag for validating if all the required input parameters are present")
Expand All @@ -100,6 +102,14 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
err = fmt.Errorf("error while preparing prerequisites for migration: %v", err)
return subcommands.ExitUsageError
}
if cmd.project == "" {
getInfo := &utils.GetUtilInfoImpl{}
cmd.project, err = getInfo.GetProject()
if err != nil {
logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err))
return subcommands.ExitUsageError
}
}

if cmd.validate {
return subcommands.ExitSuccess
Expand All @@ -113,7 +123,7 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
schemaConversionStartTime := time.Now()
var conv *internal.Conv
convImpl := &conversion.ConvImpl{}
conv, err = convImpl.SchemaConv(sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{})
conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{})
if err != nil {
return subcommands.ExitFailure
}
Expand All @@ -127,7 +137,7 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
conv.Audit.MigrationType = migration.MigrationData_SCHEMA_ONLY.Enum()
conv.Audit.SkipMetricsPopulation = os.Getenv("SKIP_METRICS_POPULATION") == "true"
if !cmd.dryRun {
_, err = MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
_, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
if err != nil {
err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err)
return subcommands.ExitFailure
Expand Down
18 changes: 14 additions & 4 deletions cmd/schema_and_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type SchemaAndDataCmd struct {
targetProfile string
SkipForeignKeys bool
filePrefix string // TODO: move filePrefix to global flags
project string
WriteLimit int64
dryRun bool
logLevel string
Expand Down Expand Up @@ -79,6 +80,7 @@ func (cmd *SchemaAndDataCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration")
f.Int64Var(&cmd.WriteLimit, "write-limit", DefaultWritersLimit, "Write limit for writes to spanner")
f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database")
f.StringVar(&cmd.logLevel, "log-level", "DEBUG", "Configure the logging level for the command (INFO, DEBUG), defaults to DEBUG")
Expand Down Expand Up @@ -106,6 +108,14 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
err = fmt.Errorf("error while preparing prerequisites for migration: %v", err)
return subcommands.ExitUsageError
}
if cmd.project == "" {
getInfo := &utils.GetUtilInfoImpl{}
cmd.project, err = getInfo.GetProject()
if err != nil {
logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err))
return subcommands.ExitUsageError
}
}
if cmd.validate {
return subcommands.ExitSuccess
}
Expand All @@ -123,7 +133,7 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
dbURI string
)
convImpl := &conversion.ConvImpl{}
conv, err = convImpl.SchemaConv(sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{})
conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{})
if err != nil {
panic(err)
}
Expand All @@ -141,7 +151,7 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
reportImpl := conversion.ReportImpl{}
if !cmd.dryRun {
reportImpl.GenerateReport(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix, dbName, ioHelper.Out)
bw, err = MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
bw, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
if err != nil {
err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err)
return subcommands.ExitFailure
Expand All @@ -156,14 +166,14 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
conv.Audit.SchemaConversionDuration = schemaCoversionEndTime.Sub(schemaConversionStartTime)
// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
err := ValidateResourceGenerationHelper(ctx, cmd.project, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
if err != nil {
logger.Log.Error(err.Error())
return subcommands.ExitFailure
}
}

bw, err = convImpl.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
bw, err = convImpl.DataConv(ctx, cmd.project, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbName, err)
return subcommands.ExitFailure
Expand Down
26 changes: 13 additions & 13 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func PrepareMigrationPrerequisites(sourceProfileString, targetProfileString, sou
}

// MigrateData creates database and populates data in it.
func MigrateDatabase(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, dbName string, ioHelper *utils.IOStreams, cmd interface{}, conv *internal.Conv, migrationError *error) (*writer.BatchWriter, error) {
func MigrateDatabase(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, dbName string, ioHelper *utils.IOStreams, cmd interface{}, conv *internal.Conv, migrationError *error) (*writer.BatchWriter, error) {
var (
bw *writer.BatchWriter
err error
Expand All @@ -148,9 +148,9 @@ func MigrateDatabase(ctx context.Context, targetProfile profiles.TargetProfile,
case *SchemaCmd:
err = migrateSchema(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient)
case *DataCmd:
bw, err = migrateData(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v)
bw, err = migrateData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v)
case *SchemaAndDataCmd:
bw, err = migrateSchemaAndData(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v)
bw, err = migrateSchemaAndData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v)
}
if err != nil {
err = fmt.Errorf("can't migrate database: %v", err)
Expand All @@ -176,7 +176,7 @@ func migrateSchema(ctx context.Context, targetProfile profiles.TargetProfile, so
return nil
}

func migrateData(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
func migrateData(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, cmd *DataCmd) (*writer.BatchWriter, error) {
var (
bw *writer.BatchWriter
Expand All @@ -193,14 +193,14 @@ func migrateData(ctx context.Context, targetProfile profiles.TargetProfile, sour

// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
if err != nil {
return nil, err
}
}

c := &conversion.ConvImpl{}
bw, err = c.DataConv(ctx, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
bw, err = c.DataConv(ctx, migrationProjectId, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})

if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
Expand All @@ -218,7 +218,7 @@ func migrateData(ctx context.Context, targetProfile profiles.TargetProfile, sour
return bw, nil
}

func migrateSchemaAndData(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
func migrateSchemaAndData(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, cmd *SchemaAndDataCmd) (*writer.BatchWriter, error) {
spA := spanneraccessor.SpannerAccessorImpl{}
adminClientImpl, err := spanneradmin.NewAdminClientImpl(ctx)
Expand All @@ -235,14 +235,14 @@ func migrateSchemaAndData(ctx context.Context, targetProfile profiles.TargetProf

// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
if err != nil {
return nil, err
}
}

convImpl := &conversion.ConvImpl{}
bw, err := convImpl.DataConv(ctx, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
bw, err := convImpl.DataConv(ctx, migrationProjectId, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})

if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
Expand All @@ -256,8 +256,8 @@ func migrateSchemaAndData(ctx context.Context, targetProfile profiles.TargetProf
return bw, nil
}

func ValidateResourceGenerationHelper(ctx context.Context, projectId string, instanceId string, sourceProfile profiles.SourceProfile, conv *internal.Conv) error {
spClient, err:= spinstanceadmin.NewInstanceAdminClientImpl(ctx)
func ValidateResourceGenerationHelper(ctx context.Context, migrationProjectId string, spannerProjectId string, instanceId string, sourceProfile profiles.SourceProfile, conv *internal.Conv) error {
spClient, err := spinstanceadmin.NewInstanceAdminClientImpl(ctx)
if err != nil {
return err
}
Expand All @@ -271,9 +271,9 @@ func ValidateResourceGenerationHelper(ctx context.Context, projectId string, ins
}
validateResource := conversion.NewValidateResourcesImpl(&spanneraccessor.SpannerAccessorImpl{}, spClient, &datastream_accessor.DatastreamAccessorImpl{},
dsClient, &storageaccessor.StorageAccessorImpl{}, storageclient)
err = validateResource.ValidateResourceGeneration(ctx, projectId, instanceId, sourceProfile, conv)
err = validateResource.ValidateResourceGeneration(ctx, migrationProjectId, spannerProjectId, instanceId, sourceProfile, conv)
if err != nil {
return err
}
return nil
}
}
7 changes: 4 additions & 3 deletions common/metrics/dashboard_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ var dashboardClient *dashboard.DashboardsClient

// MonitoringMetricsResources contains information required to create the monitoring dashboard
type MonitoringMetricsResources struct {
ProjectId string
MigrationProjectId string
DataflowJobId string
DatastreamId string
JobMetadataGcsBucket string
PubsubSubscriptionId string
SpannerProjectId string
SpannerInstanceId string
SpannerDatabaseId string
ShardToShardResourcesMap map[string]internal.ShardResources
Expand Down Expand Up @@ -258,7 +259,7 @@ func createAggIndependentTopMetrics(resourceIds MonitoringMetricsResources) []*d
func createAggIndependentBottomMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
shardToDashboardMappingText := ""
for shardId, shardResource := range resourceIds.ShardToShardResourcesMap {
shardUrl := fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", shardResource.MonitoringResources.DashboardName, resourceIds.ProjectId)
shardUrl := fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", shardResource.MonitoringResources.DashboardName, resourceIds.MigrationProjectId)
shardString := fmt.Sprintf("Shard [%s](%s)", shardId, shardUrl)
if shardToDashboardMappingText == "" {
shardToDashboardMappingText = shardString
Expand Down Expand Up @@ -399,7 +400,7 @@ func getCreateMonitoringDashboardRequest(
Layout: &layout,
}
req := &dashboardpb.CreateDashboardRequest{
Parent: "projects/" + resourceIds.ProjectId,
Parent: "projects/" + resourceIds.MigrationProjectId,
Dashboard: &db,
}
return req
Expand Down
Loading

0 comments on commit 7681dce

Please sign in to comment.