From 7681dcebecb9215393f51e9f28ed7de840d178d5 Mon Sep 17 00:00:00 2001 From: darshan-sj Date: Fri, 5 Apr 2024 15:14:10 +0530 Subject: [PATCH] Introducing migrationProjectId and decoupling Spanner projectId (#802) * Introducing migrationProjectId and decoupling Spanner projectId * Test file changes * Addressing comments and adding UI side changes * Fixing cleanup command * reverting whitespace changes * Not using project id from target profile * documentation changes * Addressing comments and correcting LaunchDataflowJob method --- cmd/cleanup.go | 10 +++- cmd/data.go | 16 +++++-- cmd/schema.go | 14 +++++- cmd/schema_and_data.go | 18 +++++-- cmd/utils.go | 26 +++++----- common/metrics/dashboard_components.go | 7 +-- common/utils/utils.go | 2 +- conversion/conversion.go | 16 +++---- conversion/conversion_from_source.go | 41 ++++++++-------- conversion/conversion_from_source_test.go | 8 ++-- conversion/conversion_test.go | 4 +- conversion/data_from_database.go | 52 ++++++++++---------- conversion/get_info.go | 58 ++++++++++++----------- conversion/mocks.go | 20 ++++---- conversion/resource_generation.go | 18 +++---- conversion/resource_generation_test.go | 2 +- docs/cli/data.md | 9 +++- docs/cli/flags.md | 3 ++ docs/cli/schema-and-data.md | 9 +++- docs/cli/schema.md | 9 +++- sources/common/infoschema.go | 2 +- sources/dynamodb/schema.go | 2 +- sources/mysql/infoschema.go | 17 +++---- sources/mysql/infoschema_test.go | 10 ++-- sources/oracle/infoschema.go | 17 +++---- sources/oracle/infoschema_test.go | 2 +- sources/postgres/infoschema.go | 17 +++---- sources/postgres/infoschema_test.go | 10 ++-- sources/spanner/infoschema.go | 2 +- sources/sqlserver/infoschema.go | 2 +- streaming/cleanup.go | 24 +++++----- streaming/streaming.go | 39 +++++++-------- webv2/api/schema.go | 2 +- webv2/config.json | 1 + webv2/config/config_handler.go | 8 +++- webv2/config/config_service.go | 6 +++ webv2/profile/profile.go | 2 +- webv2/session/session_handler.go | 4 +- webv2/session/session_service.go | 11 +++-- webv2/session/session_test.go | 6 +-- webv2/session/types.go | 39 +++++++-------- webv2/web.go | 24 +++++++--- 42 files changed, 337 insertions(+), 252 deletions(-) diff --git a/cmd/cleanup.go b/cmd/cleanup.go index b96513db4..98c10a927 100644 --- a/cmd/cleanup.go +++ b/cmd/cleanup.go @@ -20,10 +20,12 @@ import ( "os" "path" + "github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils" "github.com/GoogleCloudPlatform/spanner-migration-tool/logger" "github.com/GoogleCloudPlatform/spanner-migration-tool/profiles" "github.com/GoogleCloudPlatform/spanner-migration-tool/streaming" "github.com/google/subcommands" + "go.uber.org/zap" ) // CleanupCmd is the command for cleaning up the migration resources during a minimal @@ -108,7 +110,13 @@ func (cmd *CleanupCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interf Pubsub: cmd.pubsub, Monitoring: cmd.monitoring, } + getInfo := &utils.GetUtilInfoImpl{} + migrationProjectId, err := getInfo.GetProject() + if err != nil { + logger.Log.Error("Could not get project id from gcloud environment. Inferring migration project id from target profile.", zap.Error(err)) + migrationProjectId = project + } logger.Log.Info(fmt.Sprintf("Initiating job cleanup for jobId: %v \n", cmd.jobId)) - streaming.InitiateJobCleanup(ctx, cmd.jobId, dataShardIds, jobCleanupOptions, project, instance) + streaming.InitiateJobCleanup(ctx, cmd.jobId, dataShardIds, jobCleanupOptions, migrationProjectId, project, instance) return subcommands.ExitSuccess } diff --git a/cmd/data.go b/cmd/data.go index 15c095eab..56ac28d71 100644 --- a/cmd/data.go +++ b/cmd/data.go @@ -48,6 +48,7 @@ type DataCmd struct { targetProfile string sessionJSON string filePrefix string // TODO: move filePrefix to global flags + project string WriteLimit int64 dryRun bool logLevel string @@ -84,6 +85,7 @@ func (cmd *DataCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)") f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"") f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files") + f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration") f.Int64Var(&cmd.WriteLimit, "write-limit", DefaultWritersLimit, "Write limit for writes to spanner") f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database") f.StringVar(&cmd.logLevel, "log-level", "DEBUG", "Configure the logging level for the command (INFO, DEBUG), defaults to DEBUG") @@ -114,6 +116,14 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface err = fmt.Errorf("error while preparing prerequisites for migration: %v", err) return subcommands.ExitUsageError } + if cmd.project == "" { + getInfo := &utils.GetUtilInfoImpl{} + cmd.project, err = getInfo.GetProject() + if err != nil { + logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err)) + return subcommands.ExitUsageError + } + } var ( bw *writer.BatchWriter banner string @@ -149,7 +159,7 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface ) if !cmd.dryRun { now := time.Now() - bw, err = MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil) + bw, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil) if err != nil { err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err) return subcommands.ExitFailure @@ -159,14 +169,14 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface conv.Audit.DryRun = true // If migration type is Minimal Downtime, validate if required resources can be generated if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION { - err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) + err := ValidateResourceGenerationHelper(ctx, cmd.project, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) if err != nil { return subcommands.ExitFailure } } convImpl := &conversion.ConvImpl{} - bw, err = convImpl.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) + bw, err = convImpl.DataConv(ctx, cmd.project, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) if err != nil { err = fmt.Errorf("can't finish data conversion for db %s: %v", dbName, err) diff --git a/cmd/schema.go b/cmd/schema.go index 76025ab51..4b2e4c5eb 100644 --- a/cmd/schema.go +++ b/cmd/schema.go @@ -41,6 +41,7 @@ type SchemaCmd struct { target string targetProfile string filePrefix string // TODO: move filePrefix to global flags + project string logLevel string dryRun bool validate bool @@ -74,6 +75,7 @@ func (cmd *SchemaCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)") f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"") f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files") + f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration") f.StringVar(&cmd.logLevel, "log-level", "DEBUG", "Configure the logging level for the command (INFO, DEBUG), defaults to DEBUG") f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database") f.BoolVar(&cmd.validate, "validate", false, "Flag for validating if all the required input parameters are present") @@ -100,6 +102,14 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa err = fmt.Errorf("error while preparing prerequisites for migration: %v", err) return subcommands.ExitUsageError } + if cmd.project == "" { + getInfo := &utils.GetUtilInfoImpl{} + cmd.project, err = getInfo.GetProject() + if err != nil { + logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err)) + return subcommands.ExitUsageError + } + } if cmd.validate { return subcommands.ExitSuccess @@ -113,7 +123,7 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa schemaConversionStartTime := time.Now() var conv *internal.Conv convImpl := &conversion.ConvImpl{} - conv, err = convImpl.SchemaConv(sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{}) + conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{}) if err != nil { return subcommands.ExitFailure } @@ -127,7 +137,7 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa conv.Audit.MigrationType = migration.MigrationData_SCHEMA_ONLY.Enum() conv.Audit.SkipMetricsPopulation = os.Getenv("SKIP_METRICS_POPULATION") == "true" if !cmd.dryRun { - _, err = MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil) + _, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil) if err != nil { err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err) return subcommands.ExitFailure diff --git a/cmd/schema_and_data.go b/cmd/schema_and_data.go index 9831b2765..d9af41c7d 100644 --- a/cmd/schema_and_data.go +++ b/cmd/schema_and_data.go @@ -44,6 +44,7 @@ type SchemaAndDataCmd struct { targetProfile string SkipForeignKeys bool filePrefix string // TODO: move filePrefix to global flags + project string WriteLimit int64 dryRun bool logLevel string @@ -79,6 +80,7 @@ func (cmd *SchemaAndDataCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"") f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)") f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files") + f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration") f.Int64Var(&cmd.WriteLimit, "write-limit", DefaultWritersLimit, "Write limit for writes to spanner") f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database") f.StringVar(&cmd.logLevel, "log-level", "DEBUG", "Configure the logging level for the command (INFO, DEBUG), defaults to DEBUG") @@ -106,6 +108,14 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ... err = fmt.Errorf("error while preparing prerequisites for migration: %v", err) return subcommands.ExitUsageError } + if cmd.project == "" { + getInfo := &utils.GetUtilInfoImpl{} + cmd.project, err = getInfo.GetProject() + if err != nil { + logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err)) + return subcommands.ExitUsageError + } + } if cmd.validate { return subcommands.ExitSuccess } @@ -123,7 +133,7 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ... dbURI string ) convImpl := &conversion.ConvImpl{} - conv, err = convImpl.SchemaConv(sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{}) + conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{}) if err != nil { panic(err) } @@ -141,7 +151,7 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ... reportImpl := conversion.ReportImpl{} if !cmd.dryRun { reportImpl.GenerateReport(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix, dbName, ioHelper.Out) - bw, err = MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil) + bw, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil) if err != nil { err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err) return subcommands.ExitFailure @@ -156,14 +166,14 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ... conv.Audit.SchemaConversionDuration = schemaCoversionEndTime.Sub(schemaConversionStartTime) // If migration type is Minimal Downtime, validate if required resources can be generated if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION { - err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) + err := ValidateResourceGenerationHelper(ctx, cmd.project, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) if err != nil { logger.Log.Error(err.Error()) return subcommands.ExitFailure } } - bw, err = convImpl.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) + bw, err = convImpl.DataConv(ctx, cmd.project, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) if err != nil { err = fmt.Errorf("can't finish data conversion for db %s: %v", dbName, err) return subcommands.ExitFailure diff --git a/cmd/utils.go b/cmd/utils.go index 3f0501209..9753ee291 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -127,7 +127,7 @@ func PrepareMigrationPrerequisites(sourceProfileString, targetProfileString, sou } // MigrateData creates database and populates data in it. -func MigrateDatabase(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, dbName string, ioHelper *utils.IOStreams, cmd interface{}, conv *internal.Conv, migrationError *error) (*writer.BatchWriter, error) { +func MigrateDatabase(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, dbName string, ioHelper *utils.IOStreams, cmd interface{}, conv *internal.Conv, migrationError *error) (*writer.BatchWriter, error) { var ( bw *writer.BatchWriter err error @@ -148,9 +148,9 @@ func MigrateDatabase(ctx context.Context, targetProfile profiles.TargetProfile, case *SchemaCmd: err = migrateSchema(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient) case *DataCmd: - bw, err = migrateData(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v) + bw, err = migrateData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v) case *SchemaAndDataCmd: - bw, err = migrateSchemaAndData(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v) + bw, err = migrateSchemaAndData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v) } if err != nil { err = fmt.Errorf("can't migrate database: %v", err) @@ -176,7 +176,7 @@ func migrateSchema(ctx context.Context, targetProfile profiles.TargetProfile, so return nil } -func migrateData(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, +func migrateData(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, cmd *DataCmd) (*writer.BatchWriter, error) { var ( bw *writer.BatchWriter @@ -193,14 +193,14 @@ func migrateData(ctx context.Context, targetProfile profiles.TargetProfile, sour // If migration type is Minimal Downtime, validate if required resources can be generated if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION { - err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) + err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) if err != nil { return nil, err } } c := &conversion.ConvImpl{} - bw, err = c.DataConv(ctx, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) + bw, err = c.DataConv(ctx, migrationProjectId, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) if err != nil { err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err) @@ -218,7 +218,7 @@ func migrateData(ctx context.Context, targetProfile profiles.TargetProfile, sour return bw, nil } -func migrateSchemaAndData(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, +func migrateSchemaAndData(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, cmd *SchemaAndDataCmd) (*writer.BatchWriter, error) { spA := spanneraccessor.SpannerAccessorImpl{} adminClientImpl, err := spanneradmin.NewAdminClientImpl(ctx) @@ -235,14 +235,14 @@ func migrateSchemaAndData(ctx context.Context, targetProfile profiles.TargetProf // If migration type is Minimal Downtime, validate if required resources can be generated if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION { - err := ValidateResourceGenerationHelper(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) + err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile, conv) if err != nil { return nil, err } } convImpl := &conversion.ConvImpl{} - bw, err := convImpl.DataConv(ctx, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) + bw, err := convImpl.DataConv(ctx, migrationProjectId, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{}) if err != nil { err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err) @@ -256,8 +256,8 @@ func migrateSchemaAndData(ctx context.Context, targetProfile profiles.TargetProf return bw, nil } -func ValidateResourceGenerationHelper(ctx context.Context, projectId string, instanceId string, sourceProfile profiles.SourceProfile, conv *internal.Conv) error { - spClient, err:= spinstanceadmin.NewInstanceAdminClientImpl(ctx) +func ValidateResourceGenerationHelper(ctx context.Context, migrationProjectId string, spannerProjectId string, instanceId string, sourceProfile profiles.SourceProfile, conv *internal.Conv) error { + spClient, err := spinstanceadmin.NewInstanceAdminClientImpl(ctx) if err != nil { return err } @@ -271,9 +271,9 @@ func ValidateResourceGenerationHelper(ctx context.Context, projectId string, ins } validateResource := conversion.NewValidateResourcesImpl(&spanneraccessor.SpannerAccessorImpl{}, spClient, &datastream_accessor.DatastreamAccessorImpl{}, dsClient, &storageaccessor.StorageAccessorImpl{}, storageclient) - err = validateResource.ValidateResourceGeneration(ctx, projectId, instanceId, sourceProfile, conv) + err = validateResource.ValidateResourceGeneration(ctx, migrationProjectId, spannerProjectId, instanceId, sourceProfile, conv) if err != nil { return err } return nil -} \ No newline at end of file +} diff --git a/common/metrics/dashboard_components.go b/common/metrics/dashboard_components.go index b0e01a530..72a0e63b0 100644 --- a/common/metrics/dashboard_components.go +++ b/common/metrics/dashboard_components.go @@ -39,11 +39,12 @@ var dashboardClient *dashboard.DashboardsClient // MonitoringMetricsResources contains information required to create the monitoring dashboard type MonitoringMetricsResources struct { - ProjectId string + MigrationProjectId string DataflowJobId string DatastreamId string JobMetadataGcsBucket string PubsubSubscriptionId string + SpannerProjectId string SpannerInstanceId string SpannerDatabaseId string ShardToShardResourcesMap map[string]internal.ShardResources @@ -258,7 +259,7 @@ func createAggIndependentTopMetrics(resourceIds MonitoringMetricsResources) []*d func createAggIndependentBottomMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile { shardToDashboardMappingText := "" for shardId, shardResource := range resourceIds.ShardToShardResourcesMap { - shardUrl := fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", shardResource.MonitoringResources.DashboardName, resourceIds.ProjectId) + shardUrl := fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", shardResource.MonitoringResources.DashboardName, resourceIds.MigrationProjectId) shardString := fmt.Sprintf("Shard [%s](%s)", shardId, shardUrl) if shardToDashboardMappingText == "" { shardToDashboardMappingText = shardString @@ -399,7 +400,7 @@ func getCreateMonitoringDashboardRequest( Layout: &layout, } req := &dashboardpb.CreateDashboardRequest{ - Parent: "projects/" + resourceIds.ProjectId, + Parent: "projects/" + resourceIds.MigrationProjectId, Dashboard: &db, } return req diff --git a/common/utils/utils.go b/common/utils/utils.go index 956bdc65c..9a3b0edd5 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -182,7 +182,7 @@ func PreloadGCSFiles(tables []ManifestTable) ([]ManifestTable, error) { return tables, nil } -// GetProject returns the cloud project we should use for accessing Spanner. +// GetProject returns the cloud project we should use by default to create resources. // Use environment variable GCLOUD_PROJECT if it is set. // Otherwise, use the default project returned from gcloud. func (gui *GetUtilInfoImpl) GetProject() (string, error) { diff --git a/conversion/conversion.go b/conversion/conversion.go index 1a987493a..fb3459813 100644 --- a/conversion/conversion.go +++ b/conversion/conversion.go @@ -66,17 +66,17 @@ func GetDatastreamClient(ctx context.Context) *datastream.Client { } type ConvInterface interface { - SchemaConv(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, schemaFromSource SchemaFromSourceInterface) (*internal.Conv, error) - DataConv(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, writeLimit int64, dataFromSource DataFromSourceInterface) (*writer.BatchWriter, error) + SchemaConv(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, schemaFromSource SchemaFromSourceInterface) (*internal.Conv, error) + DataConv(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, writeLimit int64, dataFromSource DataFromSourceInterface) (*writer.BatchWriter, error) } -type ConvImpl struct {} +type ConvImpl struct{} // SchemaConv performs the schema conversion // The SourceProfile param provides the connection details to use the go SQL library. -func (ci *ConvImpl) SchemaConv(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, schemaFromSource SchemaFromSourceInterface) (*internal.Conv, error) { +func (ci *ConvImpl) SchemaConv(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, schemaFromSource SchemaFromSourceInterface) (*internal.Conv, error) { switch sourceProfile.Driver { case constants.POSTGRES, constants.MYSQL, constants.DYNAMODB, constants.SQLSERVER, constants.ORACLE: - return schemaFromSource.schemaFromDatabase(sourceProfile, targetProfile, &GetInfoImpl{}, &common.ProcessSchemaImpl{}) + return schemaFromSource.schemaFromDatabase(migrationProjectId, sourceProfile, targetProfile, &GetInfoImpl{}, &common.ProcessSchemaImpl{}) case constants.PGDUMP, constants.MYSQLDUMP: return schemaFromSource.SchemaFromDump(sourceProfile.Driver, targetProfile.Conn.Sp.Dialect, ioHelper, &ProcessDumpByDialectImpl{}) default: @@ -86,7 +86,7 @@ func (ci *ConvImpl) SchemaConv(sourceProfile profiles.SourceProfile, targetProfi // DataConv performs the data conversion // The SourceProfile param provides the connection details to use the go SQL library. -func (ci *ConvImpl) DataConv(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, writeLimit int64, dataFromSource DataFromSourceInterface) (*writer.BatchWriter, error) { +func (ci *ConvImpl) DataConv(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, writeLimit int64, dataFromSource DataFromSourceInterface) (*writer.BatchWriter, error) { config := writer.BatchWriterConfig{ BytesLimit: 100 * 1000 * 1000, WriteLimit: writeLimit, @@ -95,7 +95,7 @@ func (ci *ConvImpl) DataConv(ctx context.Context, sourceProfile profiles.SourceP } switch sourceProfile.Driver { case constants.POSTGRES, constants.MYSQL, constants.DYNAMODB, constants.SQLSERVER, constants.ORACLE: - return dataFromSource.dataFromDatabase(ctx, sourceProfile, targetProfile, config, conv, client, &GetInfoImpl{}, &DataFromDatabaseImpl{}, &SnapshotMigrationImpl{}) + return dataFromSource.dataFromDatabase(ctx, migrationProjectId, sourceProfile, targetProfile, config, conv, client, &GetInfoImpl{}, &DataFromDatabaseImpl{}, &SnapshotMigrationImpl{}) case constants.PGDUMP, constants.MYSQLDUMP: if conv.SpSchema.CheckInterleaved() { return nil, fmt.Errorf("spanner migration tool does not currently support data conversion from dump files\nif the schema contains interleaved tables. Suggest using direct access to source database\ni.e. using drivers postgres and mysql") @@ -112,7 +112,7 @@ type ReportInterface interface { GenerateReport(driver string, badWrites map[string]int64, BytesRead int64, banner string, conv *internal.Conv, reportFileName string, dbName string, out *os.File) } -type ReportImpl struct {} +type ReportImpl struct{} // Report generates a report of schema and data conversion. func (r *ReportImpl) GenerateReport(driver string, badWrites map[string]int64, BytesRead int64, banner string, conv *internal.Conv, reportFileName string, dbName string, out *os.File) { diff --git a/conversion/conversion_from_source.go b/conversion/conversion_from_source.go index dcac1249f..612af033b 100644 --- a/conversion/conversion_from_source.go +++ b/conversion/conversion_from_source.go @@ -21,6 +21,8 @@ import ( "strings" sp "cloud.google.com/go/spanner" + storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage" + storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/metrics" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils" @@ -31,27 +33,25 @@ import ( "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/csv" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/writer" "github.com/GoogleCloudPlatform/spanner-migration-tool/streaming" - storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage" - storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage" "go.uber.org/zap" ) type SchemaFromSourceInterface interface { - schemaFromDatabase(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) + schemaFromDatabase(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) - } +} type SchemaFromSourceImpl struct{} type DataFromSourceInterface interface { - dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) + dataFromDatabase(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface) (*writer.BatchWriter, error) dataFromCSV(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, populateDataConv PopulateDataConvInterface, csv csv.CsvInterface) (*writer.BatchWriter, error) } type DataFromSourceImpl struct{} -func (sads *SchemaFromSourceImpl) schemaFromDatabase(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) { +func (sads *SchemaFromSourceImpl) schemaFromDatabase(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) { conv := internal.MakeConv() conv.SpDialect = targetProfile.Conn.Sp.Dialect //handle fetching schema differently for sharded migrations, we only connect to the primary shard to @@ -65,13 +65,13 @@ func (sads *SchemaFromSourceImpl) schemaFromDatabase(sourceProfile profiles.Sour //Find Primary Shard Name if sourceProfile.Config.ConfigType == constants.BULK_MIGRATION { schemaSource := sourceProfile.Config.ShardConfigurationBulk.SchemaSource - infoSchema, err = getInfo.getInfoSchemaForShard(schemaSource, sourceProfile.Driver, targetProfile, &profiles.SourceProfileDialectImpl{}, &GetInfoImpl{}) + infoSchema, err = getInfo.getInfoSchemaForShard(migrationProjectId, schemaSource, sourceProfile.Driver, targetProfile, &profiles.SourceProfileDialectImpl{}, &GetInfoImpl{}) if err != nil { return conv, err } } else if sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION { schemaSource := sourceProfile.Config.ShardConfigurationDataflow.SchemaSource - infoSchema, err = getInfo.getInfoSchemaForShard(schemaSource, sourceProfile.Driver, targetProfile, &profiles.SourceProfileDialectImpl{}, &GetInfoImpl{}) + infoSchema, err = getInfo.getInfoSchemaForShard(migrationProjectId, schemaSource, sourceProfile.Driver, targetProfile, &profiles.SourceProfileDialectImpl{}, &GetInfoImpl{}) if err != nil { return conv, err } @@ -82,13 +82,13 @@ func (sads *SchemaFromSourceImpl) schemaFromDatabase(sourceProfile profiles.Sour return conv, fmt.Errorf("unknown type of migration, please select one of bulk, dataflow or dms") } case profiles.SourceProfileTypeCloudSQL: - infoSchema, err = getInfo.GetInfoSchemaFromCloudSQL(sourceProfile, targetProfile) + infoSchema, err = getInfo.GetInfoSchemaFromCloudSQL(migrationProjectId, sourceProfile, targetProfile) if err != nil { return conv, err } default: - infoSchema, err = getInfo.GetInfoSchema(sourceProfile, targetProfile) + infoSchema, err = getInfo.GetInfoSchema(migrationProjectId, sourceProfile, targetProfile) if err != nil { return conv, err } @@ -122,7 +122,6 @@ func (sads *SchemaFromSourceImpl) SchemaFromDump(driver string, spDialect string return conv, nil } - func (sads *DataFromSourceImpl) dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface) (*writer.BatchWriter, error) { // TODO: refactor of the way we handle getSeekable // to avoid the code duplication here @@ -207,8 +206,7 @@ func (sads *DataFromSourceImpl) dataFromCSV(ctx context.Context, sourceProfile p return batchWriter, nil } - -func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) { +func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) { //handle migrating data for sharded migrations differently //sharded migrations are identified via the config= flag, if that flag is not present //carry on with the existing code path in the else block @@ -218,9 +216,9 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, sourceProf //We provide an if-else based handling for each within the sharded code branch //This will be determined via the configType, which can be "bulk", "dataflow" or "dms" if sourceProfile.Config.ConfigType == constants.BULK_MIGRATION { - return dataFromDb.dataFromDatabaseForBulkMigration(sourceProfile, targetProfile, config, conv, client, getInfo, snapshotMigration) + return dataFromDb.dataFromDatabaseForBulkMigration(migrationProjectId, sourceProfile, targetProfile, config, conv, client, getInfo, snapshotMigration) } else if sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION { - return dataFromDb.dataFromDatabaseForDataflowMigration(targetProfile, ctx, sourceProfile, conv, &common.InfoSchemaImpl{}) + return dataFromDb.dataFromDatabaseForDataflowMigration(migrationProjectId, targetProfile, ctx, sourceProfile, conv, &common.InfoSchemaImpl{}) } else if sourceProfile.Config.ConfigType == constants.DMS_MIGRATION { return dataFromDb.dataFromDatabaseForDMSMigration() } else { @@ -230,12 +228,12 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, sourceProf var infoSchema common.InfoSchema var err error if sourceProfile.Ty == profiles.SourceProfileTypeCloudSQL { - infoSchema, err = getInfo.GetInfoSchemaFromCloudSQL(sourceProfile, targetProfile) + infoSchema, err = getInfo.GetInfoSchemaFromCloudSQL(migrationProjectId, sourceProfile, targetProfile) if err != nil { return nil, err } } else { - infoSchema, err = getInfo.GetInfoSchema(sourceProfile, targetProfile) + infoSchema, err = getInfo.GetInfoSchema(migrationProjectId, sourceProfile, targetProfile) if err != nil { return nil, err } @@ -254,7 +252,7 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, sourceProf if err != nil { return nil, err } - dfOutput, err := infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo) + dfOutput, err := infoSchema.StartStreamingMigration(ctx, migrationProjectId, client, conv, streamInfo) if err != nil { return nil, err } @@ -263,7 +261,7 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, sourceProf streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg) // Fetch and store the GCS bucket associated with the datastream dsClient := GetDatastreamClient(ctx) - gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig) + gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig) if fetchGcsErr != nil { logger.Log.Info("Could not fetch GCS Bucket, hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n") logger.Log.Debug("Error", zap.Error(fetchGcsErr)) @@ -289,11 +287,12 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, sourceProf } monitoringResources := metrics.MonitoringMetricsResources{ - ProjectId: targetProfile.Conn.Sp.Project, + MigrationProjectId: migrationProjectId, DataflowJobId: dfOutput.JobID, DatastreamId: streamingCfg.DatastreamCfg.StreamId, JobMetadataGcsBucket: gcsBucket, PubsubSubscriptionId: streamingCfg.PubsubCfg.SubscriptionId, + SpannerProjectId: targetProfile.Conn.Sp.Project, SpannerInstanceId: targetProfile.Conn.Sp.Instance, SpannerDatabaseId: targetProfile.Conn.Sp.Dbname, ShardId: "", @@ -310,7 +309,7 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, sourceProf fmt.Printf("Monitoring Dashboard: %+v\n", dashboardName) } // store the generated resources locally in conv, this is used as source of truth for persistence and the UI (should change to persisted values) - streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, targetProfile.Conn.Sp.Project, "", internal.GcsResources{BucketName: gcsBucket}, dashboardName) + streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, migrationProjectId, "", internal.GcsResources{BucketName: gcsBucket}, dashboardName) //persist job and shard level data in the metadata db err = streaming.PersistJobDetails(ctx, targetProfile, sourceProfile, conv, migrationJobId, false) if err != nil { diff --git a/conversion/conversion_from_source_test.go b/conversion/conversion_from_source_test.go index 45a6a1d8f..d165fab39 100644 --- a/conversion/conversion_from_source_test.go +++ b/conversion/conversion_from_source_test.go @@ -154,13 +154,13 @@ func TestSchemaFromDatabase(t *testing.T) { gim := MockGetInfo{} ps := common.MockProcessSchema{} - gim.On("getInfoSchemaForShard", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mysql.InfoSchemaImpl{}, tc.getInfoError) - gim.On("GetInfoSchemaFromCloudSQL", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mysql.InfoSchemaImpl{}, tc.getInfoError) - gim.On("GetInfoSchema", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mysql.InfoSchemaImpl{}, tc.getInfoError) + gim.On("getInfoSchemaForShard", "migration-project-id", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mysql.InfoSchemaImpl{}, tc.getInfoError) + gim.On("GetInfoSchemaFromCloudSQL", "migration-project-id", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mysql.InfoSchemaImpl{}, tc.getInfoError) + gim.On("GetInfoSchema", "migration-project-id", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mysql.InfoSchemaImpl{}, tc.getInfoError) ps.On("ProcessSchema", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.processSchemaError) s := SchemaFromSourceImpl{} - _, err := s.schemaFromDatabase(tc.sourceProfile, targetProfile, &gim, &ps) + _, err := s.schemaFromDatabase("migration-project-id", tc.sourceProfile, targetProfile, &gim, &ps) assert.Equal(t, tc.errorExpected, err != nil, tc.name) } } \ No newline at end of file diff --git a/conversion/conversion_test.go b/conversion/conversion_test.go index 3f40981f0..351ba35c5 100644 --- a/conversion/conversion_test.go +++ b/conversion/conversion_test.go @@ -98,7 +98,7 @@ func TestSchemaConv(t *testing.T) { m := MockSchemaFromSource{} m.On(tc.function, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.output, nil) c := ConvImpl{} - _, err := c.SchemaConv(profiles.SourceProfile{Driver: tc.sourceProfileDriver}, profiles.TargetProfile{}, &utils.IOStreams{}, &m) + _, err := c.SchemaConv("migration-project-id", profiles.SourceProfile{Driver: tc.sourceProfileDriver}, profiles.TargetProfile{}, &utils.IOStreams{}, &m) assert.Equal(t, tc.errorExpected, err != nil, tc.name) if err == nil { m.AssertExpectations(t) @@ -185,7 +185,7 @@ func TestDataConv(t *testing.T) { m := MockDataFromSource{} m.On(tc.function, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.output, nil) c := ConvImpl{} - _, err := c.DataConv(ctx, profiles.SourceProfile{Driver: tc.sourceProfileDriver}, profiles.TargetProfile{}, &utils.IOStreams{}, &sp.Client{}, &internal.Conv{}, true, int64(5), &m) + _, err := c.DataConv(ctx, "migration-project-id", profiles.SourceProfile{Driver: tc.sourceProfileDriver}, profiles.TargetProfile{}, &utils.IOStreams{}, &sp.Client{}, &internal.Conv{}, true, int64(5), &m) assert.Equal(t, tc.errorExpected, err != nil, tc.name) if err == nil { m.AssertExpectations(t) diff --git a/conversion/data_from_database.go b/conversion/data_from_database.go index 4355af7fc..c6d22ba2e 100644 --- a/conversion/data_from_database.go +++ b/conversion/data_from_database.go @@ -39,16 +39,14 @@ import ( "go.uber.org/zap" ) -type DataFromDatabaseInterface interface{ +type DataFromDatabaseInterface interface { dataFromDatabaseForDMSMigration() (*writer.BatchWriter, error) - dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error) - dataFromDatabaseForBulkMigration(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, gi GetInfoInterface, sm SnapshotMigrationInterface) (*writer.BatchWriter, error) - + dataFromDatabaseForDataflowMigration(migrationProjectId string, targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error) + dataFromDatabaseForBulkMigration(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, gi GetInfoInterface, sm SnapshotMigrationInterface) (*writer.BatchWriter, error) } type DataFromDatabaseImpl struct{} - // TODO: Define the data processing logic for DMS migrations here. func (dd *DataFromDatabaseImpl) dataFromDatabaseForDMSMigration() (*writer.BatchWriter, error) { return nil, fmt.Errorf("dms configType is not implemented yet, please use one of 'bulk' or 'dataflow'") @@ -59,7 +57,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDMSMigration() (*writer.Batch // 3. Verify the CFG and update it with SMT defaults // 4. Launch the stream for the physical shard // 5. Perform streaming migration via dataflow -func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error) { +func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationProjectId string, targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv, is common.InfoSchemaInterface) (*writer.BatchWriter, error) { // Fetch Spanner Region if conv.SpRegion == "" { spInstanceAdmin, err := spinstanceadmin.NewInstanceAdminClientImpl(ctx) @@ -67,7 +65,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err) } spAcc := spanneraccessor.SpannerAccessorImpl{} - spannerRegion, err := spAcc.GetSpannerLeaderLocation(ctx, spInstanceAdmin, "projects/" + targetProfile.Conn.Sp.Project+ "/instances/" + targetProfile.Conn.Sp.Instance) + spannerRegion, err := spAcc.GetSpannerLeaderLocation(ctx, spInstanceAdmin, "projects/"+targetProfile.Conn.Sp.Project+"/instances/"+targetProfile.Conn.Sp.Instance) if err != nil { return nil, fmt.Errorf("unable to fetch Spanner Region for resource creation: %v", err) } @@ -86,7 +84,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi return nil, err } createResources := NewValidateOrCreateResourcesImpl(&datastream_accessor.DatastreamAccessorImpl{}, dsClient, &storageaccessor.StorageAccessorImpl{}, storageClient) - err = createResources.ValidateOrCreateResourcesForShardedMigration(ctx, targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, false, conv.SpRegion, sourceProfile) + err = createResources.ValidateOrCreateResourcesForShardedMigration(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, false, conv.SpRegion, sourceProfile) if err != nil { return nil, fmt.Errorf("unable to create connection profiles: %v", err) } @@ -95,12 +93,12 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi //Set the TmpDir from the sessionState bucket which is derived from the target connection profile for _, dataShard := range sourceProfile.Config.ShardConfigurationDataflow.DataShards { if dataShard.TmpDir == "" { - bucket, rootPath, err := GetBucketFromDatastreamProfile(targetProfile.Conn.Sp.Project, conv.SpRegion, dataShard.DstConnectionProfile.Name) - if err != nil { - return nil, fmt.Errorf("error while getting target bucket: %v", err) + bucket, rootPath, err := GetBucketFromDatastreamProfile(migrationProjectId, conv.SpRegion, dataShard.DstConnectionProfile.Name) + if err != nil { + return nil, fmt.Errorf("error while getting target bucket: %v", err) + } + dataShard.TmpDir = "gs://" + bucket + rootPath } - dataShard.TmpDir = "gs://" + bucket + rootPath - } } updateShardsWithTuningConfigs(sourceProfile.Config.ShardConfigurationDataflow) @@ -138,17 +136,17 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId) - pubsubCfg, err := streaming.CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname) + pubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname) if err != nil { return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } streamingCfg.PubsubCfg = *pubsubCfg - err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg) + err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, migrationProjectId, streamingCfg.DatastreamCfg) if err != nil { return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap - dfOutput, err := streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv) + dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, targetProfile, streamingCfg, conv) if err != nil { return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } @@ -156,7 +154,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi // Fetch and store the GCS bucket associated with the datastream dsClient := GetDatastreamClient(ctx) - gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig) + gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig) if fetchGcsErr != nil { logger.Log.Info(fmt.Sprintf("Could not fetch GCS Bucket for Shard %s hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n", p.DataShardId)) logger.Log.Debug("Error", zap.Error(fetchGcsErr)) @@ -183,11 +181,12 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi // create monitoring dashboard for a single shard monitoringResources := metrics.MonitoringMetricsResources{ - ProjectId: targetProfile.Conn.Sp.Project, + MigrationProjectId: migrationProjectId, DataflowJobId: dfOutput.JobID, DatastreamId: streamingCfg.DatastreamCfg.StreamId, JobMetadataGcsBucket: gcsBucket, PubsubSubscriptionId: streamingCfg.PubsubCfg.SubscriptionId, + SpannerProjectId: targetProfile.Conn.Sp.Project, SpannerInstanceId: targetProfile.Conn.Sp.Instance, SpannerDatabaseId: targetProfile.Conn.Sp.Dbname, ShardId: p.DataShardId, @@ -203,7 +202,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi dashboardName = strings.Split(respDash.Name, "/")[3] fmt.Printf("Monitoring Dashboard for shard %v: %+v\n", p.DataShardId, dashboardName) } - streaming.StoreGeneratedResources(conv, streamingCfg, dfOutput.JobID, dfOutput.GCloudCmd, targetProfile.Conn.Sp.Project, p.DataShardId, internal.GcsResources{BucketName: gcsBucket}, dashboardName) + streaming.StoreGeneratedResources(conv, streamingCfg, dfOutput.JobID, dfOutput.GCloudCmd, migrationProjectId, p.DataShardId, internal.GcsResources{BucketName: gcsBucket}, dashboardName) //persist the generated resources in a metadata db err = streaming.PersistResources(ctx, targetProfile, sourceProfile, conv, migrationJobId, p.DataShardId) if err != nil { @@ -219,11 +218,12 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi // create monitoring aggregated dashboard for sharded migration aggMonitoringResources := metrics.MonitoringMetricsResources{ - ProjectId: targetProfile.Conn.Sp.Project, - SpannerInstanceId: targetProfile.Conn.Sp.Instance, - SpannerDatabaseId: targetProfile.Conn.Sp.Dbname, - ShardToShardResourcesMap: conv.Audit.StreamingStats.ShardToShardResourcesMap, - MigrationRequestId: conv.Audit.MigrationRequestId, + MigrationProjectId: migrationProjectId, + SpannerProjectId: targetProfile.Conn.Sp.Project, + SpannerInstanceId: targetProfile.Conn.Sp.Instance, + SpannerDatabaseId: targetProfile.Conn.Sp.Dbname, + ShardToShardResourcesMap: conv.Audit.StreamingStats.ShardToShardResourcesMap, + MigrationRequestId: conv.Audit.MigrationRequestId, } aggRespDash, dashboardErr := aggMonitoringResources.CreateDataflowAggMonitoringDashboard(ctx) if dashboardErr != nil { @@ -245,12 +245,12 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(targetProfi // 2. Create a connection profile object for it // 3. Perform a snapshot migration for the shard // 4. Once all shard migrations are complete, return the batch writer object -func (dd *DataFromDatabaseImpl) dataFromDatabaseForBulkMigration(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, gi GetInfoInterface, sm SnapshotMigrationInterface) (*writer.BatchWriter, error) { +func (dd *DataFromDatabaseImpl) dataFromDatabaseForBulkMigration(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, gi GetInfoInterface, sm SnapshotMigrationInterface) (*writer.BatchWriter, error) { var bw *writer.BatchWriter for _, dataShard := range sourceProfile.Config.ShardConfigurationBulk.DataShards { fmt.Printf("Initiating migration for shard: %v\n", dataShard.DbName) - infoSchema, err := gi.getInfoSchemaForShard(dataShard, sourceProfile.Driver, targetProfile, &profiles.SourceProfileDialectImpl{}, &GetInfoImpl{}) + infoSchema, err := gi.getInfoSchemaForShard(migrationProjectId, dataShard, sourceProfile.Driver, targetProfile, &profiles.SourceProfileDialectImpl{}, &GetInfoImpl{}) if err != nil { return nil, err } diff --git a/conversion/get_info.go b/conversion/get_info.go index e33212863..6c34fdc90 100644 --- a/conversion/get_info.go +++ b/conversion/get_info.go @@ -40,14 +40,14 @@ import ( "github.com/jackc/pgx/v5/stdlib" ) -type GetInfoInterface interface{ - getInfoSchemaForShard(shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, sourceProfileDialect profiles.SourceProfileDialectInterface, getInfo GetInfoInterface) (common.InfoSchema, error) - GetInfoSchemaFromCloudSQL(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) - GetInfoSchema(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) +type GetInfoInterface interface { + getInfoSchemaForShard(migrationProjectId string, shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, sourceProfileDialect profiles.SourceProfileDialectInterface, getInfo GetInfoInterface) (common.InfoSchema, error) + GetInfoSchemaFromCloudSQL(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) + GetInfoSchema(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) } type GetInfoImpl struct{} -func (gi *GetInfoImpl) getInfoSchemaForShard(shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, sourceProfileDialect profiles.SourceProfileDialectInterface, getInfo GetInfoInterface) (common.InfoSchema, error) { +func (gi *GetInfoImpl) getInfoSchemaForShard(migrationProjectId string, shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, sourceProfileDialect profiles.SourceProfileDialectInterface, getInfo GetInfoInterface) (common.InfoSchema, error) { params := make(map[string]string) params["host"] = shardConnInfo.Host params["user"] = shardConnInfo.User @@ -67,15 +67,14 @@ func (gi *GetInfoImpl) getInfoSchemaForShard(shardConnInfo profiles.DirectConnec //this is done because GetSQLConnectionStr() should not be aware of sharding newSourceProfile := profiles.SourceProfile{Conn: sourceProfileConnection, Ty: profiles.SourceProfileTypeConnection} newSourceProfile.Driver = driver - infoSchema, err := getInfo.GetInfoSchema(newSourceProfile, targetProfile) + infoSchema, err := getInfo.GetInfoSchema(migrationProjectId, newSourceProfile, targetProfile) if err != nil { return nil, err } return infoSchema, nil } - -func (gi *GetInfoImpl) GetInfoSchemaFromCloudSQL(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { +func (gi *GetInfoImpl) GetInfoSchemaFromCloudSQL(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { driver := sourceProfile.Driver switch driver { case constants.MYSQL: @@ -98,10 +97,11 @@ func (gi *GetInfoImpl) GetInfoSchemaFromCloudSQL(sourceProfile profiles.SourcePr return nil, fmt.Errorf("sql.Open: %w", err) } return mysql.InfoSchemaImpl{ - DbName: sourceProfile.ConnCloudSQL.Mysql.Db, - Db: db, - SourceProfile: sourceProfile, - TargetProfile: targetProfile, + DbName: sourceProfile.ConnCloudSQL.Mysql.Db, + Db: db, + MigrationProjectId: migrationProjectId, + SourceProfile: sourceProfile, + TargetProfile: targetProfile, }, nil case constants.POSTGRES: d, err := cloudsqlconn.NewDialer(context.Background(), cloudsqlconn.WithIAMAuthN()) @@ -126,18 +126,18 @@ func (gi *GetInfoImpl) GetInfoSchemaFromCloudSQL(sourceProfile profiles.SourcePr } temp := false return postgres.InfoSchemaImpl{ - Db: db, - SourceProfile: sourceProfile, - TargetProfile: targetProfile, - IsSchemaUnique: &temp, //this is a workaround to set a bool pointer + Db: db, + MigrationProjectId: migrationProjectId, + SourceProfile: sourceProfile, + TargetProfile: targetProfile, + IsSchemaUnique: &temp, //this is a workaround to set a bool pointer }, nil default: return nil, fmt.Errorf("driver %s not supported", driver) } } - -func (gi *GetInfoImpl) GetInfoSchema(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { +func (gi *GetInfoImpl) GetInfoSchema(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { connectionConfig, err := connectionConfig(sourceProfile) if err != nil { return nil, err @@ -151,10 +151,11 @@ func (gi *GetInfoImpl) GetInfoSchema(sourceProfile profiles.SourceProfile, targe return nil, err } return mysql.InfoSchemaImpl{ - DbName: dbName, - Db: db, - SourceProfile: sourceProfile, - TargetProfile: targetProfile, + DbName: dbName, + Db: db, + MigrationProjectId: migrationProjectId, + SourceProfile: sourceProfile, + TargetProfile: targetProfile, }, nil case constants.POSTGRES: db, err := sql.Open(driver, connectionConfig.(string)) @@ -163,10 +164,11 @@ func (gi *GetInfoImpl) GetInfoSchema(sourceProfile profiles.SourceProfile, targe } temp := false return postgres.InfoSchemaImpl{ - Db: db, - SourceProfile: sourceProfile, - TargetProfile: targetProfile, - IsSchemaUnique: &temp, //this is a workaround to set a bool pointer + Db: db, + MigrationProjectId: migrationProjectId, + SourceProfile: sourceProfile, + TargetProfile: targetProfile, + IsSchemaUnique: &temp, //this is a workaround to set a bool pointer }, nil case constants.DYNAMODB: mySession := session.Must(session.NewSession()) @@ -194,8 +196,8 @@ func (gi *GetInfoImpl) GetInfoSchema(sourceProfile profiles.SourceProfile, targe if err != nil { return nil, err } - return oracle.InfoSchemaImpl{DbName: strings.ToUpper(dbName), Db: db, SourceProfile: sourceProfile, TargetProfile: targetProfile}, nil + return oracle.InfoSchemaImpl{DbName: strings.ToUpper(dbName), Db: db, MigrationProjectId: migrationProjectId, SourceProfile: sourceProfile, TargetProfile: targetProfile}, nil default: return nil, fmt.Errorf("driver %s not supported", driver) } -} \ No newline at end of file +} diff --git a/conversion/mocks.go b/conversion/mocks.go index 66ea2f2e0..2e81b790e 100644 --- a/conversion/mocks.go +++ b/conversion/mocks.go @@ -32,16 +32,16 @@ type MockGetInfo struct { mock.Mock } -func (mgi *MockGetInfo) getInfoSchemaForShard(shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, s profiles.SourceProfileDialectInterface, g GetInfoInterface) (common.InfoSchema, error) { - args := mgi.Called(shardConnInfo, driver, targetProfile, s, g) +func (mgi *MockGetInfo) getInfoSchemaForShard(migrationShardId string, shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, s profiles.SourceProfileDialectInterface, g GetInfoInterface) (common.InfoSchema, error) { + args := mgi.Called(migrationShardId, shardConnInfo, driver, targetProfile, s, g) return args.Get(0).(common.InfoSchema), args.Error(1) } -func (mgi *MockGetInfo) GetInfoSchemaFromCloudSQL(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { - args := mgi.Called(sourceProfile, targetProfile) +func (mgi *MockGetInfo) GetInfoSchemaFromCloudSQL(migrationShardId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { + args := mgi.Called(migrationShardId, sourceProfile, targetProfile) return args.Get(0).(common.InfoSchema), args.Error(1) } -func (mgi *MockGetInfo) GetInfoSchema(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { - args := mgi.Called(sourceProfile, targetProfile) +func (mgi *MockGetInfo) GetInfoSchema(migrationShardId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { + args := mgi.Called(migrationShardId, sourceProfile, targetProfile) return args.Get(0).(common.InfoSchema), args.Error(1) } @@ -49,8 +49,8 @@ type MockSchemaFromSource struct { mock.Mock } -func (msads *MockSchemaFromSource) schemaFromDatabase(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) { - args := msads.Called(sourceProfile, targetProfile, getInfo, processSchema) +func (msads *MockSchemaFromSource) schemaFromDatabase(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) { + args := msads.Called(migrationProjectId, sourceProfile, targetProfile, getInfo, processSchema) return args.Get(0).(*internal.Conv), args.Error(1) } func (msads *MockSchemaFromSource) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) { @@ -62,8 +62,8 @@ type MockDataFromSource struct { mock.Mock } -func (msads *MockDataFromSource) dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) { - args := msads.Called(ctx, sourceProfile, targetProfile, config, conv, client, getInfo, dataFromDb, snapshotMigration) +func (msads *MockDataFromSource) dataFromDatabase(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error) { + args := msads.Called(ctx, migrationProjectId, sourceProfile, targetProfile, config, conv, client, getInfo, dataFromDb, snapshotMigration) return args.Get(0).(*writer.BatchWriter), args.Error(1) } func (msads *MockDataFromSource) dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface) (*writer.BatchWriter, error) { diff --git a/conversion/resource_generation.go b/conversion/resource_generation.go index 407d5f4e6..68785458e 100644 --- a/conversion/resource_generation.go +++ b/conversion/resource_generation.go @@ -53,7 +53,7 @@ type ResourceGenerationImpl struct { } type ValidateOrCreateResourcesInterface interface { - ValidateOrCreateResourcesForShardedMigration(ctx context.Context, projectId string, instanceName string, validateOnly bool, region string, sourceProfile profiles.SourceProfile) error + ValidateOrCreateResourcesForShardedMigration(ctx context.Context, migrationProjectId string, instanceName string, validateOnly bool, region string, sourceProfile profiles.SourceProfile) error } type ValidateOrCreateResourcesImpl struct { @@ -96,14 +96,14 @@ func NewResourceGenerationImpl(dsAcc datastream_accessor.DatastreamAccessor, dsC } // Method to validate if in a minimal downtime migration, required resources can be generated -func (v *ValidateResourcesImpl) ValidateResourceGeneration(ctx context.Context, projectId string, instanceId string, sourceProfile profiles.SourceProfile, conv *internal.Conv) error { - spannerRegion, err := v.SpAcc.GetSpannerLeaderLocation(ctx, v.SpInstanceAdmin, "projects/"+projectId+"/instances/"+instanceId) +func (v *ValidateResourcesImpl) ValidateResourceGeneration(ctx context.Context, migrationProjectId string, spannerProjectId string, instanceId string, sourceProfile profiles.SourceProfile, conv *internal.Conv) error { + spannerRegion, err := v.SpAcc.GetSpannerLeaderLocation(ctx, v.SpInstanceAdmin, "projects/"+spannerProjectId+"/instances/"+instanceId) if err != nil { err = fmt.Errorf("unable to fetch Spanner Region: %v", err) return err } conv.SpRegion = spannerRegion - err = v.ValidateOrCreateResources.ValidateOrCreateResourcesForShardedMigration(ctx, projectId, instanceId, true, spannerRegion, sourceProfile) + err = v.ValidateOrCreateResources.ValidateOrCreateResourcesForShardedMigration(ctx, migrationProjectId, instanceId, true, spannerRegion, sourceProfile) if err != nil { err = fmt.Errorf("unable to create connection profiles: %v", err) return err @@ -181,7 +181,7 @@ func (r ResourceGenerationImpl) RollbackResourceCreation(ctx context.Context, pr } // Returns source and destination connection profiles to be created -func (r ResourceGenerationImpl) GetConnectionProfilesForResources(ctx context.Context, projectId string, sourceProfile profiles.SourceProfile, region string, validateOnly bool) ([]*ConnectionProfileReq, []*ConnectionProfileReq, error) { +func (r ResourceGenerationImpl) GetConnectionProfilesForResources(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, region string, validateOnly bool) ([]*ConnectionProfileReq, []*ConnectionProfileReq, error) { var sourceProfilesToCreate []*ConnectionProfileReq var dstProfilesToCreate []*ConnectionProfileReq @@ -191,7 +191,7 @@ func (r ResourceGenerationImpl) GetConnectionProfilesForResources(ctx context.Co for _, profile := range sourceProfile.Config.ShardConfigurationDataflow.DataShards { // Check if source profile needs to be created - sourceProfile, err := getSourceConnectionProfileForCreation(ctx, projectId, profile, region, validateOnly, connectionProfiles, r.DsAcc, r.DsClient) + sourceProfile, err := getSourceConnectionProfileForCreation(ctx, migrationProjectId, profile, region, validateOnly, connectionProfiles, r.DsAcc, r.DsClient) if err != nil { return sourceProfilesToCreate, dstProfilesToCreate, err } @@ -200,7 +200,7 @@ func (r ResourceGenerationImpl) GetConnectionProfilesForResources(ctx context.Co } // Check if destination profile needs to be created - dstProfile, err := getDstConnectionProfileForCreation(ctx, projectId, profile, region, validateOnly, connectionProfiles, r.DsAcc, r.DsClient) + dstProfile, err := getDstConnectionProfileForCreation(ctx, migrationProjectId, profile, region, validateOnly, connectionProfiles, r.DsAcc, r.DsClient) if err != nil { return sourceProfilesToCreate, dstProfilesToCreate, err } @@ -214,12 +214,12 @@ func (r ResourceGenerationImpl) GetConnectionProfilesForResources(ctx context.Co // 1. For each datashard, check if source and destination connection profile exists or not // 2. If source connection profile doesn't exists create it or validate if creation is possible. // 3. If validation is false and destination connection profile doesn't exists create a corresponding gcs bucket and then a destination connection profile -func (c *ValidateOrCreateResourcesImpl) ValidateOrCreateResourcesForShardedMigration(ctx context.Context, projectId string, instanceName string, validateOnly bool, region string, sourceProfile profiles.SourceProfile) error { +func (c *ValidateOrCreateResourcesImpl) ValidateOrCreateResourcesForShardedMigration(ctx context.Context, migrationProjectId string, instanceName string, validateOnly bool, region string, sourceProfile profiles.SourceProfile) error { var sourceProfilesToCreate []*ConnectionProfileReq var dstProfilesToCreate []*ConnectionProfileReq // Fetches list with resources which do not exist and need to be created - sourceProfilesToCreate, dstProfilesToCreate, err := c.ResourceGenerator.GetConnectionProfilesForResources(ctx, projectId, sourceProfile, region, validateOnly) + sourceProfilesToCreate, dstProfilesToCreate, err := c.ResourceGenerator.GetConnectionProfilesForResources(ctx, migrationProjectId, sourceProfile, region, validateOnly) if err != nil { return fmt.Errorf("resource generation failed %s", err) } diff --git a/conversion/resource_generation_test.go b/conversion/resource_generation_test.go index ca591c744..411bb896e 100644 --- a/conversion/resource_generation_test.go +++ b/conversion/resource_generation_test.go @@ -84,7 +84,7 @@ func TestValidateResourceGeneration(t *testing.T) { var m = conversion.MockValidateOrCreateResources{} m.On("ValidateOrCreateResourcesForShardedMigration", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.createResourcesError) vrg.ValidateOrCreateResources = &m - err := vrg.ValidateResourceGeneration(ctx, "project-id", "instance-id", sourceProfile, conv) + err := vrg.ValidateResourceGeneration(ctx, "migration-project-id", "spanner-project-id", "instance-id", sourceProfile, conv) assert.Equal(t, tc.expectError, err != nil, tc.name) } } diff --git a/docs/cli/data.md b/docs/cli/data.md index ab9bd26dd..1c3d6c71f 100644 --- a/docs/cli/data.md +++ b/docs/cli/data.md @@ -33,7 +33,7 @@ reference of the gCloud version of SMT, please refer [here](https://cloud.google [--dry-run] [--log-level=LOG_LEVEL] [--prefix=PREFIX] [--skip-foreign-keys] [--source-profile=SOURCE_PROFILE] [--target=TARGET] [--target-profile=TARGET_PROFILE] - [--write-limit=WRITE_LIMIT] [GCLOUD_WIDE_FLAG ...] + [--write-limit=WRITE_LIMIT] [--project=PROJECT] [GCLOUD_WIDE_FLAG ...] ## DESCRIPTION @@ -53,7 +53,7 @@ reference of the gCloud version of SMT, please refer [here](https://cloud.google $ ./spanner-migration-tool data --session=./session.json \ --source=MySQL \ --source-profile='host=host,port=3306,user=user,password=pwd,dbName=db,streamingCfg=streaming.json' \ - --target-profile='project=spanner-project,instance=spanner-instance' + --target-profile='project=spanner-project,instance=spanner-instance' --project=migration-project ## REQUIRED FLAGS @@ -97,3 +97,8 @@ Detailed description of optional flags can be found [here](./flags.md). --write-limit=WRITE_LIMIT Number of parallel writers to Cloud Spanner during bulk data migrations (default 40). + + --project=PROJECT + Flag for specifying the name of the Google Cloud Project in which the Spanner migration tool + can create resources required for migration. If the project is not specified, Spanner migration + tool will try to fetch the configured project in the gCloud CLI. diff --git a/docs/cli/flags.md b/docs/cli/flags.md index 2ba759b55..aa07662ba 100644 --- a/docs/cli/flags.md +++ b/docs/cli/flags.md @@ -55,6 +55,9 @@ specified as "key1=value1,key2=value,..." pairs: * **`project`**: Specifies the name of the Google Cloud Project in which the Spanner instance is present. If the project is not specified, Spanner migration tool will try to fetch the configured project in the gCloud CLI. +{: .note } +This project flag can have different value than the --project flag in the main command. In some cases, you may want to keep the spanner instance in a separate GCP project than the project where all the migration resources are created. This project flag refers to the project in which the Spanner instance is present and --project flag in the main command refers to the project where the tool can create resources (Dataflow jobs, GCS Buckets etc.) for the migration. + * **`dbName`**: Specifies the name of the Spanner database to create. This must be a new database. If dbName is not specified, Spanner migration tool creates a new unique dbName. diff --git a/docs/cli/schema-and-data.md b/docs/cli/schema-and-data.md index bae1abbe7..c30ff4d9f 100644 --- a/docs/cli/schema-and-data.md +++ b/docs/cli/schema-and-data.md @@ -35,7 +35,7 @@ reference of the gCloud version of SMT, please refer [here](https://cloud.google [--log-level=LOG_LEVEL] [--prefix=PREFIX] [--skip-foreign-keys] [--source-profile=SOURCE_PROFILE] [--target=TARGET] [--target-profile=TARGET_PROFILE] [--write-limit=WRITE_LIMIT] - [GCLOUD_WIDE_FLAG ...] + [--project=PROJECT] [GCLOUD_WIDE_FLAG ...] ## DESCRIPTION @@ -54,7 +54,7 @@ reference of the gCloud version of SMT, please refer [here](https://cloud.google --source-profile='host=host,port=3306,user=user,password=pwd,dbN\ ame=db,streamingCfg=streaming.json' \ --target-profile='project=spanner-project,instance=spanner-insta\ - nce' + nce' --project='migration-project' ## REQUIRED FLAGS @@ -95,3 +95,8 @@ Detailed description of optional flags can be found [here](./flags.md). --write-limit=WRITE_LIMIT Number of parallel writers to Cloud Spanner during bulk data migrations (default 40). + + --project=PROJECT + Flag for specifying the name of the Google Cloud Project in which the Spanner migration tool + can create resources required for migration. If the project is not specified, Spanner migration + tool will try to fetch the configured project in the gCloud CLI. \ No newline at end of file diff --git a/docs/cli/schema.md b/docs/cli/schema.md index aa8c383a0..9f52491e0 100644 --- a/docs/cli/schema.md +++ b/docs/cli/schema.md @@ -35,7 +35,7 @@ reference of the gCloud version of SMT, please refer [here](https://cloud.google ./spanner-migration-tool schema --source=SOURCE [--dry-run] [--log-level=LOG_LEVEL] [--prefix=PREFIX] [--source-profile=SOURCE_PROFILE] [--target=TARGET] - [--target-profile=TARGET_PROFILE] [GCLOUD_WIDE_FLAG ...] + [--target-profile=TARGET_PROFILE] [--project=PROJECT] [GCLOUD_WIDE_FLAG ...] ## DESCRIPTION @@ -54,7 +54,7 @@ reference of the gCloud version of SMT, please refer [here](https://cloud.google --source-profile='host=host,port=3306,user=user,password=pwd,dbN\ ame=db' \ --target-profile='project=spanner-project,instance=spanner-insta\ - nce' + nce' --project='migration-project' ## REQUIRED FLAGS @@ -88,3 +88,8 @@ Detailed description of optional flags can be found [here](./flags.md). --target-profile=TARGET_PROFILE Flag for specifying connection profile for target database (e.g., "dialect=postgresql"). + + --project=PROJECT + Flag for specifying the name of the Google Cloud Project in which the Spanner migration tool + can create resources required for migration. If the project is not specified, Spanner migration + tool will try to fetch the configured project in the gCloud CLI. \ No newline at end of file diff --git a/sources/common/infoschema.go b/sources/common/infoschema.go index ffa6a6cc7..3c1f72823 100644 --- a/sources/common/infoschema.go +++ b/sources/common/infoschema.go @@ -41,7 +41,7 @@ type InfoSchema interface { GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error) - StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (internal.DataflowOutput, error) + StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (internal.DataflowOutput, error) } // SchemaAndName contains the schema and name for a table diff --git a/sources/dynamodb/schema.go b/sources/dynamodb/schema.go index 7c332e30e..4bc38f0ea 100644 --- a/sources/dynamodb/schema.go +++ b/sources/dynamodb/schema.go @@ -219,7 +219,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration starts the streaming migration process by creating a seperate // worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if // customer wants to stop the process. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error) { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error) { fmt.Println("Processing of DynamoDB Streams started...") fmt.Println("Use Ctrl+C to stop the process.") diff --git a/sources/mysql/infoschema.go b/sources/mysql/infoschema.go index d54dbac3d..9a063f297 100644 --- a/sources/mysql/infoschema.go +++ b/sources/mysql/infoschema.go @@ -35,10 +35,11 @@ import ( // InfoSchemaImpl is MySQL specific implementation for InfoSchema. type InfoSchemaImpl struct { - DbName string - Db *sql.DB - SourceProfile profiles.SourceProfile - TargetProfile profiles.TargetProfile + DbName string + Db *sql.DB + MigrationProjectId string + SourceProfile profiles.SourceProfile + TargetProfile profiles.TargetProfile } // GetToDdl implement the common.InfoSchema interface. @@ -372,12 +373,12 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte if err != nil { return nil, fmt.Errorf("error reading streaming config: %v", err) } - pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db) + pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db) if err != nil { return nil, fmt.Errorf("error creating pubsub resources: %v", err) } streamingCfg.PubsubCfg = *pubsubCfg - streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) + streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err @@ -388,10 +389,10 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) - dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) + dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv) if err != nil { err = fmt.Errorf("error starting dataflow: %v", err) return internal.DataflowOutput{}, err diff --git a/sources/mysql/infoschema_test.go b/sources/mysql/infoschema_test.go index eca4bfd2b..fde3f5ff0 100644 --- a/sources/mysql/infoschema_test.go +++ b/sources/mysql/infoschema_test.go @@ -209,7 +209,7 @@ func TestProcessSchemaMYSQL(t *testing.T) { } db := mkMockDB(t, ms) conv := internal.MakeConv() - isi := InfoSchemaImpl{"test", db, profiles.SourceProfile{}, profiles.TargetProfile{}} + isi := InfoSchemaImpl{"test", db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}} commonInfoSchema := common.InfoSchemaImpl{} _, err := commonInfoSchema.GenerateSrcSchema(conv, isi, 1) assert.Nil(t, err) @@ -308,7 +308,7 @@ func TestProcessData(t *testing.T) { func(table string, cols []string, vals []interface{}) { rows = append(rows, spannerData{table: table, cols: cols, vals: vals}) }) - isi := InfoSchemaImpl{"test", db, profiles.SourceProfile{}, profiles.TargetProfile{}} + isi := InfoSchemaImpl{"test", db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}} commonInfoSchema := common.InfoSchemaImpl{} commonInfoSchema.ProcessData(conv, isi, internal.AdditionalDataAttributes{}) assert.Equal(t, @@ -367,7 +367,7 @@ func TestProcessData_MultiCol(t *testing.T) { } db := mkMockDB(t, ms) conv := internal.MakeConv() - isi := InfoSchemaImpl{"test", db, profiles.SourceProfile{}, profiles.TargetProfile{}} + isi := InfoSchemaImpl{"test", db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}} processSchema := common.ProcessSchemaImpl{} err := processSchema.ProcessSchema(conv, isi, 1, internal.AdditionalSchemaAttributes{}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) assert.Nil(t, err) @@ -456,7 +456,7 @@ func TestProcessSchema_Sharded(t *testing.T) { } db := mkMockDB(t, ms) conv := internal.MakeConv() - isi := InfoSchemaImpl{"test", db, profiles.SourceProfile{}, profiles.TargetProfile{}} + isi := InfoSchemaImpl{"test", db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}} processSchema := common.ProcessSchemaImpl{} err := processSchema.ProcessSchema(conv, isi, 1, internal.AdditionalSchemaAttributes{IsSharded: true}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) assert.Nil(t, err) @@ -496,7 +496,7 @@ func TestSetRowStats(t *testing.T) { db := mkMockDB(t, ms) conv := internal.MakeConv() conv.SetDataMode() - isi := InfoSchemaImpl{"test", db, profiles.SourceProfile{}, profiles.TargetProfile{}} + isi := InfoSchemaImpl{"test", db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}} commonInfoSchema := common.InfoSchemaImpl{} commonInfoSchema.SetRowStats(conv, isi) assert.Equal(t, int64(5), conv.Stats.Rows["test1"]) diff --git a/sources/oracle/infoschema.go b/sources/oracle/infoschema.go index 3a6cbbb51..a2bb71522 100644 --- a/sources/oracle/infoschema.go +++ b/sources/oracle/infoschema.go @@ -32,10 +32,11 @@ import ( ) type InfoSchemaImpl struct { - DbName string - Db *sql.DB - SourceProfile profiles.SourceProfile - TargetProfile profiles.TargetProfile + DbName string + Db *sql.DB + MigrationProjectId string + SourceProfile profiles.SourceProfile + TargetProfile profiles.TargetProfile } // GetToDdl function below implement the common.InfoSchema interface. @@ -437,12 +438,12 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte if err != nil { return nil, fmt.Errorf("error reading streaming config: %v", err) } - pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User) + pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User) if err != nil { return nil, fmt.Errorf("error creating pubsub resources: %v", err) } streamingCfg.PubsubCfg = *pubsubCfg - streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) + streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err @@ -453,9 +454,9 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) - dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) + dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv) if err != nil { return internal.DataflowOutput{}, err } diff --git a/sources/oracle/infoschema_test.go b/sources/oracle/infoschema_test.go index 4c81307cc..8e6538474 100644 --- a/sources/oracle/infoschema_test.go +++ b/sources/oracle/infoschema_test.go @@ -159,7 +159,7 @@ func TestProcessSchemaOracle(t *testing.T) { db := mkMockDB(t, ms) conv := internal.MakeConv() processSchema := common.ProcessSchemaImpl{} - err := processSchema.ProcessSchema(conv, InfoSchemaImpl{"test", db, profiles.SourceProfile{}, profiles.TargetProfile{}}, 1, internal.AdditionalSchemaAttributes{}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) + err := processSchema.ProcessSchema(conv, InfoSchemaImpl{"test", db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}}, 1, internal.AdditionalSchemaAttributes{}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) assert.Nil(t, err) expectedSchema := map[string]ddl.CreateTable{ "USER": { diff --git a/sources/postgres/infoschema.go b/sources/postgres/infoschema.go index bd5756d3f..37405be92 100644 --- a/sources/postgres/infoschema.go +++ b/sources/postgres/infoschema.go @@ -39,10 +39,11 @@ import ( // InfoSchemaImpl postgres specific implementation for InfoSchema. type InfoSchemaImpl struct { - Db *sql.DB - SourceProfile profiles.SourceProfile - TargetProfile profiles.TargetProfile - IsSchemaUnique *bool + Db *sql.DB + MigrationProjectId string + SourceProfile profiles.SourceProfile + TargetProfile profiles.TargetProfile + IsSchemaUnique *bool } func (isi InfoSchemaImpl) populateSchemaIsUnique(schemaAndNames []common.SchemaAndName) { @@ -74,12 +75,12 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte if err != nil { return nil, fmt.Errorf("error reading streaming config: %v", err) } - pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname) + pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname) if err != nil { return nil, fmt.Errorf("error creating pubsub resources: %v", err) } streamingCfg.PubsubCfg = *pubsubCfg - streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) + streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err @@ -90,10 +91,10 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) - dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) + dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv) if err != nil { err = fmt.Errorf("error starting dataflow: %v", err) return internal.DataflowOutput{}, err diff --git a/sources/postgres/infoschema_test.go b/sources/postgres/infoschema_test.go index 381ceecc4..6e2c2d8fa 100644 --- a/sources/postgres/infoschema_test.go +++ b/sources/postgres/infoschema_test.go @@ -230,7 +230,7 @@ func TestProcessSchema(t *testing.T) { db := mkMockDB(t, ms) conv := internal.MakeConv() processSchema := common.ProcessSchemaImpl{} - err := processSchema.ProcessSchema(conv, InfoSchemaImpl{db, profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, 1, internal.AdditionalSchemaAttributes{}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) + err := processSchema.ProcessSchema(conv, InfoSchemaImpl{db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, 1, internal.AdditionalSchemaAttributes{}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) assert.Nil(t, err) expectedSchema := map[string]ddl.CreateTable{ "user": ddl.CreateTable{ @@ -366,7 +366,7 @@ func TestProcessData(t *testing.T) { rows = append(rows, spannerData{table: table, cols: cols, vals: vals}) }) commonInfoSchema := common.InfoSchemaImpl{} - commonInfoSchema.ProcessData(conv, InfoSchemaImpl{db, profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, internal.AdditionalDataAttributes{}) + commonInfoSchema.ProcessData(conv, InfoSchemaImpl{db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, internal.AdditionalDataAttributes{}) assert.Equal(t, []spannerData{ @@ -508,7 +508,7 @@ func TestConvertSqlRow_MultiCol(t *testing.T) { db := mkMockDB(t, ms) conv := internal.MakeConv() processSchema := common.ProcessSchemaImpl{} - err := processSchema.ProcessSchema(conv, InfoSchemaImpl{db, profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, 1, internal.AdditionalSchemaAttributes{}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) + err := processSchema.ProcessSchema(conv, InfoSchemaImpl{db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, 1, internal.AdditionalSchemaAttributes{}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) assert.Nil(t, err) conv.SetDataMode() var rows []spannerData @@ -517,7 +517,7 @@ func TestConvertSqlRow_MultiCol(t *testing.T) { rows = append(rows, spannerData{table: table, cols: cols, vals: vals}) }) commonInfoSchema := common.InfoSchemaImpl{} - commonInfoSchema.ProcessData(conv, InfoSchemaImpl{db, profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, internal.AdditionalDataAttributes{}) + commonInfoSchema.ProcessData(conv, InfoSchemaImpl{db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}, internal.AdditionalDataAttributes{}) assert.Equal(t, []spannerData{ {table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"cat", float64(42.3), "0"}}, {table: "test", cols: []string{"a", "c", "synth_id"}, vals: []interface{}{"dog", int64(22), "-9223372036854775808"}}}, @@ -545,7 +545,7 @@ func TestSetRowStats(t *testing.T) { conv := internal.MakeConv() conv.SetDataMode() commonInfoSchema := common.InfoSchemaImpl{} - commonInfoSchema.SetRowStats(conv, InfoSchemaImpl{db, profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}) + commonInfoSchema.SetRowStats(conv, InfoSchemaImpl{db, "migration-project-id", profiles.SourceProfile{}, profiles.TargetProfile{}, newFalsePtr()}) assert.Equal(t, int64(5), conv.Stats.Rows["test1"]) assert.Equal(t, int64(142), conv.Stats.Rows["test2"]) assert.Equal(t, int64(0), conv.Unexpecteds()) diff --git a/sources/spanner/infoschema.go b/sources/spanner/infoschema.go index 0f191b7b8..c3270fcfa 100644 --- a/sources/spanner/infoschema.go +++ b/sources/spanner/infoschema.go @@ -79,7 +79,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte return nil, nil } -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { return internal.DataflowOutput{}, nil } diff --git a/sources/sqlserver/infoschema.go b/sources/sqlserver/infoschema.go index a5e226647..50f46d157 100644 --- a/sources/sqlserver/infoschema.go +++ b/sources/sqlserver/infoschema.go @@ -58,7 +58,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte return nil, nil } -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { return internal.DataflowOutput{}, nil } diff --git a/streaming/cleanup.go b/streaming/cleanup.go index 4f782f4b5..2982c0dbe 100644 --- a/streaming/cleanup.go +++ b/streaming/cleanup.go @@ -43,11 +43,11 @@ type JobCleanupOptions struct { Monitoring bool } -func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds []string, jobCleanupOptions JobCleanupOptions, project string, instance string) { +func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds []string, jobCleanupOptions JobCleanupOptions, migrationProjectId string, spannerProjectId string, instance string) { //initiate resource cleanup if jobCleanupOptions.Dataflow { //fetch dataflow resources - dataflowResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATAFLOW_RESOURCE, dataShardIds, project, instance) + dataflowResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATAFLOW_RESOURCE, dataShardIds, spannerProjectId, instance) if err != nil { logger.Log.Debug(fmt.Sprintf("Unable to fetch dataflow resources for jobId: %s: %v\n", migrationJobId, err)) } @@ -60,13 +60,13 @@ func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds if err != nil { logger.Log.Debug("Unable to read Dataflow metadata for deletion\n") } else { - cleanupDataflowJob(ctx, dataflowResources, project) + cleanupDataflowJob(ctx, dataflowResources, migrationProjectId) } } } if jobCleanupOptions.Datastream { //fetch dataflow resources - datastreamResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATASTREAM_RESOURCE, dataShardIds, project, instance) + datastreamResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATASTREAM_RESOURCE, dataShardIds, spannerProjectId, instance) if err != nil { logger.Log.Debug(fmt.Sprintf("Unable to fetch datastream resources for jobId: %s: %v\n", migrationJobId, err)) } @@ -79,13 +79,13 @@ func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds if err != nil { logger.Log.Debug("Unable to read Datastream metadata for deletion\n") } else { - cleanupDatastream(ctx, datastreamResources, project) + cleanupDatastream(ctx, datastreamResources, migrationProjectId) } } } if jobCleanupOptions.Pubsub { //fetch pubsub resources - pubsubResourcesList, err := FetchResources(ctx, migrationJobId, constants.PUBSUB_RESOURCE, dataShardIds, project, instance) + pubsubResourcesList, err := FetchResources(ctx, migrationJobId, constants.PUBSUB_RESOURCE, dataShardIds, spannerProjectId, instance) if err != nil { logger.Log.Debug(fmt.Sprintf("Unable to fetch pubsub resources for jobId: %s: %v\n", migrationJobId, err)) } @@ -98,17 +98,17 @@ func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds if err != nil { logger.Log.Debug("Unable to read Pubsub metadata for deletion\n") } else { - cleanupPubsubResources(ctx, pubsubResources, project) + cleanupPubsubResources(ctx, pubsubResources, migrationProjectId) } } } if jobCleanupOptions.Monitoring { //fetch monitoring resources - shardMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.MONITORING_RESOURCE, dataShardIds, project, instance) + shardMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.MONITORING_RESOURCE, dataShardIds, spannerProjectId, instance) if err != nil { logger.Log.Debug(fmt.Sprintf("Unable to fetch shard monitoring resources for jobId: %s: %v\n", migrationJobId, err)) } - jobMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.AGG_MONITORING_RESOURCE, dataShardIds, project, instance) + jobMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.AGG_MONITORING_RESOURCE, dataShardIds, spannerProjectId, instance) if err != nil { logger.Log.Debug(fmt.Sprintf("Unable to fetch aggregate monitoring resources for jobId: %s: %v\n", migrationJobId, err)) } @@ -122,14 +122,14 @@ func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds if err != nil { logger.Log.Debug("Unable to read monitoring metadata for deletion\n") } else { - cleanupMonitoringDashboard(ctx, monitoringResources, project) + cleanupMonitoringDashboard(ctx, monitoringResources, migrationProjectId) } } } } -func FetchResources(ctx context.Context, migrationJobId string, resourceType string, dataShardIds []string, project string, instance string) ([]SmtResource, error) { - dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, constants.METADATA_DB) +func FetchResources(ctx context.Context, migrationJobId string, resourceType string, dataShardIds []string, spannerProjectId string, instance string) ([]SmtResource, error) { + dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, instance, constants.METADATA_DB) client, err := utils.GetClient(ctx, dbURI) if err != nil { err = fmt.Errorf("can't create client for db %s: %v", dbURI, err) diff --git a/streaming/streaming.go b/streaming/streaming.go index 8a0e44677..471ec5f83 100644 --- a/streaming/streaming.go +++ b/streaming/streaming.go @@ -521,8 +521,8 @@ func createNotificationOnBucket(ctx context.Context, storageClient *storage.Clie } // LaunchStream populates the parameters from the streaming config and triggers a stream on Cloud Datastream. -func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, projectID string, datastreamCfg DatastreamCfg) error { - projectNumberResource := GetProjectNumberResource(ctx, fmt.Sprintf("projects/%s", projectID)) +func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, migrationProjectId string, datastreamCfg DatastreamCfg) error { + projectNumberResource := GetProjectNumberResource(ctx, fmt.Sprintf("projects/%s", migrationProjectId)) fmt.Println("Launching stream ", fmt.Sprintf("%s/locations/%s", projectNumberResource, datastreamCfg.StreamLocation)) dsClient, err := datastream.NewClient(ctx) if err != nil { @@ -602,15 +602,15 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbL } // LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job. -func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) { - project, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil, &utils.GetUtilInfoImpl{}) +func LaunchDataflowJob(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) { + spannerProjectId, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil, &utils.GetUtilInfoImpl{}) dataflowCfg := streamingCfg.DataflowCfg datastreamCfg := streamingCfg.DatastreamCfg // Rate limit this function to match DataFlow createJob Quota. DATA_FLOW_RL.Take() - fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", project, "-", dataflowCfg.Location) + fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", migrationProjectId, "-", dataflowCfg.Location) c, err := dataflow.NewFlexTemplatesClient(ctx) if err != nil { @@ -627,7 +627,7 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile defer dsClient.Close() // Fetch the GCS path from the destination connection profile. - dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", project, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name) + dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", migrationProjectId, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name) res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf}) if err != nil { return internal.DataflowOutput{}, fmt.Errorf("could not get connection profiles: %v", err) @@ -641,19 +641,19 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile // Initiate runtime environment flags and overrides. var ( - dataflowProjectId = project - dataflowVpcHostProjectId = project + dataflowProjectId = migrationProjectId + dataflowVpcHostProjectId = migrationProjectId gcsTemplatePath = DEFAULT_TEMPLATE_PATH dataflowSubnetwork = "" workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC dataflowUserLabels = make(map[string]string) machineType = "n1-standard-2" ) - // If project override present, use that otherwise default to Spanner project. Useful when customers want to run Dataflow in separate project. + // If project override present, use that otherwise default to Migration project. Useful when customers want to run Dataflow in separate project. if dataflowCfg.ProjectId != "" { dataflowProjectId = dataflowCfg.ProjectId } - // If VPC Host project override present, use that otherwise default to Spanner project. + // If VPC Host project override present, use that otherwise default to Migration project. if dataflowCfg.VpcHostProjectId != "" { dataflowVpcHostProjectId = dataflowCfg.VpcHostProjectId } @@ -706,13 +706,14 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: gcsTemplatePath}, Parameters: map[string]string{ "inputFilePattern": utils.ConcatDirectoryPath(inputFilePattern, "data"), - "streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId), + "streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId), + "projectId": spannerProjectId, "instanceId": instance, "databaseId": dbName, "sessionFilePath": streamingCfg.TmpDir + "session.json", "deadLetterQueueDirectory": inputFilePattern + "dlq", "transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json", - "gcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", project, streamingCfg.PubsubCfg.SubscriptionId), + "gcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.PubsubCfg.SubscriptionId), }, Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{ MaxWorkers: maxWorkers, @@ -750,7 +751,7 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile return internal.DataflowOutput{JobID: respDf.Job.Id, GCloudCmd: gcloudDfCmd}, nil } -func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJobId, gcloudDataflowCmd, project, dataShardId string, gcsBucket internal.GcsResources, dashboardName string) { +func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJobId, gcloudDataflowCmd, migrationProjectId, dataShardId string, gcsBucket internal.GcsResources, dashboardName string) { datastreamCfg := streamingCfg.DatastreamCfg dataflowCfg := streamingCfg.DataflowCfg conv.Audit.StreamingStats.DatastreamResources = internal.DatastreamResources{DatastreamName: datastreamCfg.StreamId, Region: datastreamCfg.StreamLocation} @@ -774,8 +775,8 @@ func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJ conv.Audit.StreamingStats.ShardToShardResourcesMap[dataShardId] = shardResources resourceMutex.Unlock() } - fullStreamName := fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId) - dfJobDetails := fmt.Sprintf("project: %s, location: %s, name: %s, id: %s", project, dataflowCfg.Location, dataflowCfg.JobName, dfJobId) + fullStreamName := fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId) + dfJobDetails := fmt.Sprintf("project: %s, location: %s, name: %s, id: %s", migrationProjectId, dataflowCfg.Location, dataflowCfg.JobName, dfJobId) logger.Log.Info("\n------------------------------------------\n") logger.Log.Info("The Datastream stream: " + fullStreamName + " ,the Dataflow job: " + dfJobDetails + " the Pubsub topic: " + streamingCfg.PubsubCfg.TopicId + " ,the subscription: " + streamingCfg.PubsubCfg.SubscriptionId + @@ -863,7 +864,7 @@ func GetProjectNumberResource(ctx context.Context, projectID string) string { } -func StartDatastream(ctx context.Context, streamingCfg StreamingCfg, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, schemaDetails map[string]internal.SchemaDetails) (StreamingCfg, error) { +func StartDatastream(ctx context.Context, migrationProjectId string, streamingCfg StreamingCfg, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, schemaDetails map[string]internal.SchemaDetails) (StreamingCfg, error) { driver := sourceProfile.Driver var dbList []profiles.LogicalShard switch driver { @@ -874,14 +875,14 @@ func StartDatastream(ctx context.Context, streamingCfg StreamingCfg, sourceProfi case constants.POSTGRES: dbList = append(dbList, profiles.LogicalShard{DbName: streamingCfg.DatastreamCfg.Properties}) } - err := LaunchStream(ctx, sourceProfile, dbList, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg) + err := LaunchStream(ctx, sourceProfile, dbList, migrationProjectId, streamingCfg.DatastreamCfg) if err != nil { return streamingCfg, fmt.Errorf("error launching stream: %v", err) } return streamingCfg, nil } -func StartDataflow(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) { +func StartDataflow(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) { sc, err := storageclient.NewStorageClientImpl(ctx) if err != nil { return internal.DataflowOutput{}, err @@ -906,7 +907,7 @@ func StartDataflow(ctx context.Context, targetProfile profiles.TargetProfile, st if err != nil { return internal.DataflowOutput{}, fmt.Errorf("error while writing to GCS: %v", err) } - dfOutput, err := LaunchDataflowJob(ctx, targetProfile, streamingCfg, conv) + dfOutput, err := LaunchDataflowJob(ctx, migrationProjectId, targetProfile, streamingCfg, conv) if err != nil { return internal.DataflowOutput{}, fmt.Errorf("error launching dataflow: %v", err) } diff --git a/webv2/api/schema.go b/webv2/api/schema.go index 44eeb0996..8844927c8 100644 --- a/webv2/api/schema.go +++ b/webv2/api/schema.go @@ -47,7 +47,7 @@ func init() { utilities.InitObjectId() sessionState.Conv = internal.MakeConv() config := config.TryInitializeSpannerConfig() - session.SetSessionStorageConnectionState(config.GCPProjectID, config.SpannerInstanceID) + session.SetSessionStorageConnectionState(config.GCPProjectID, config.SpannerProjectID, config.SpannerInstanceID) } // ConvertSchemaSQL converts source database to Spanner when using diff --git a/webv2/config.json b/webv2/config.json index 8c5d5e544..ab0352159 100644 --- a/webv2/config.json +++ b/webv2/config.json @@ -1,4 +1,5 @@ { "GCPProjectID": "", + "SpannerProjectID": "", "SpannerInstanceID": "" } \ No newline at end of file diff --git a/webv2/config/config_handler.go b/webv2/config/config_handler.go index 11935e986..c6758ae07 100644 --- a/webv2/config/config_handler.go +++ b/webv2/config/config_handler.go @@ -27,6 +27,7 @@ import ( // Config represents Spanner Configuration for Spanner Session Management. type Config struct { GCPProjectID string `json:"GCPProjectID"` + SpannerProjectID string `json:"SpannerProjectID"` SpannerInstanceID string `json:"SpannerInstanceID"` } @@ -63,11 +64,14 @@ func SetSpannerConfig(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } + if c.SpannerProjectID == "" { + c.SpannerProjectID = c.GCPProjectID + } SaveSpannerConfig(c) - isDbCreated, isConfigValid := session.SetSessionStorageConnectionState(c.GCPProjectID, c.SpannerInstanceID) + isDbCreated, isConfigValid := session.SetSessionStorageConnectionState(c.GCPProjectID, c.SpannerProjectID, c.SpannerInstanceID) configWithMetadata := ConfigWithMetadata{ - Config: Config{c.GCPProjectID, c.SpannerInstanceID}, + Config: Config{c.GCPProjectID, c.SpannerProjectID, c.SpannerInstanceID}, IsMetadataDbCreated: isDbCreated, IsConfigValid: isConfigValid, } diff --git a/webv2/config/config_service.go b/webv2/config/config_service.go index e416d6d7f..42e23887d 100644 --- a/webv2/config/config_service.go +++ b/webv2/config/config_service.go @@ -63,12 +63,18 @@ func TryInitializeSpannerConfig() Config { //Try load spanner config from environment variables and save to config if err != nil || c.GCPProjectID == "" || c.SpannerInstanceID == "" { projectId := os.Getenv("GCPProjectID") + spProjectId := os.Getenv("SpannerProjectID") spInstanceId := os.Getenv("SpannerInstanceID") + if spProjectId == "" { + spProjectId = projectId + } + if projectId == "" || spInstanceId == "" { fmt.Println("Note: To store the sessions please set the environment variables 'GCPProjectID' and 'SpannerInstanceID'. You would set these as part of the migration workflow if you are using the Spanner migration tool Web UI.") } else { c.GCPProjectID = projectId + c.SpannerProjectID = spProjectId c.SpannerInstanceID = spInstanceId SaveSpannerConfig(c) } diff --git a/webv2/profile/profile.go b/webv2/profile/profile.go index 6ddd11acc..4de2377d4 100644 --- a/webv2/profile/profile.go +++ b/webv2/profile/profile.go @@ -264,7 +264,7 @@ func CleanUpStreamingJobs(w http.ResponseWriter, r *http.Request) { Pubsub: true, Monitoring: true, } - streaming.InitiateJobCleanup(ctx, sessionState.Conv.Audit.MigrationRequestId, nil, jobCleanupOptions, sessionState.GCPProjectID, sessionState.SpannerInstanceID) + streaming.InitiateJobCleanup(ctx, sessionState.Conv.Audit.MigrationRequestId, nil, jobCleanupOptions, sessionState.GCPProjectID, sessionState.SpannerProjectId, sessionState.SpannerInstanceID) } type connectionProfileReq struct { diff --git a/webv2/session/session_handler.go b/webv2/session/session_handler.go index 2697e7162..8eabdcf70 100644 --- a/webv2/session/session_handler.go +++ b/webv2/session/session_handler.go @@ -244,8 +244,8 @@ func getLocalConv(versionId string) (ConvWithMetadata, error) { func getMetadataDbUri() string { sessionState := GetSessionState() - if sessionState.GCPProjectID == "" || sessionState.SpannerInstanceID == "" { + if sessionState.SpannerProjectId == "" || sessionState.SpannerInstanceID == "" { return "" } - return helpers.GetSpannerUri(sessionState.GCPProjectID, sessionState.SpannerInstanceID) + return helpers.GetSpannerUri(sessionState.SpannerProjectId, sessionState.SpannerInstanceID) } diff --git a/webv2/session/session_service.go b/webv2/session/session_service.go index c30e9d8a6..d638aa5a6 100644 --- a/webv2/session/session_service.go +++ b/webv2/session/session_service.go @@ -55,18 +55,19 @@ func (ss *SessionService) GetConvWithMetadata(versionId string) (ConvWithMetadat return ss.store.GetConvWithMetadata(ss.context, versionId) } -func SetSessionStorageConnectionState(projectId string, spInstanceId string) (bool, bool) { +func SetSessionStorageConnectionState(migrationProjectId string, spProjectId string, spInstanceId string) (bool, bool) { sessionState := GetSessionState() - sessionState.GCPProjectID = projectId + sessionState.GCPProjectID = migrationProjectId + sessionState.SpannerProjectId = spProjectId sessionState.SpannerInstanceID = spInstanceId - if projectId == "" || spInstanceId == "" { + if spProjectId == "" || spInstanceId == "" { sessionState.IsOffline = true return false, false } else { - if isDbCreated := helpers.CheckOrCreateMetadataDb(projectId, spInstanceId); isDbCreated { + if isDbCreated := helpers.CheckOrCreateMetadataDb(spProjectId, spInstanceId); isDbCreated { sessionState.IsOffline = false isConfigValid := isDbCreated - migrateMetadataDb(projectId, spInstanceId) + migrateMetadataDb(spProjectId, spInstanceId) return isDbCreated, isConfigValid } else { sessionState.IsOffline = true diff --git a/webv2/session/session_test.go b/webv2/session/session_test.go index 57e11de49..d738bffdd 100644 --- a/webv2/session/session_test.go +++ b/webv2/session/session_test.go @@ -342,7 +342,7 @@ func TestIsOfflineSession(t *testing.T) { func TestSetSessionStorageConnectionState(t *testing.T) { - dbCreated, configValid := session.SetSessionStorageConnectionState("", "") + dbCreated, configValid := session.SetSessionStorageConnectionState("", "", "") if dbCreated != false { t.Errorf("Expected dbCreated to be false, but got %v", dbCreated) } @@ -353,7 +353,7 @@ func TestSetSessionStorageConnectionState(t *testing.T) { t.Error("Expected IsOffline to be true, but got false") } - dbCreated, configValid = session.SetSessionStorageConnectionState("my-project-id", "") + dbCreated, configValid = session.SetSessionStorageConnectionState("my-project-id", "", "") if dbCreated != false { t.Errorf("Expected dbCreated to be false, but got %v", dbCreated) } @@ -364,7 +364,7 @@ func TestSetSessionStorageConnectionState(t *testing.T) { t.Error("Expected IsOffline to be true, but got false") } - dbCreated, configValid = session.SetSessionStorageConnectionState("", "my-instance-id") + dbCreated, configValid = session.SetSessionStorageConnectionState("", "", "my-instance-id") if dbCreated != false { t.Errorf("Expected dbCreated to be false, but got %v", dbCreated) } diff --git a/webv2/session/types.go b/webv2/session/types.go index a76a20687..a6997c871 100644 --- a/webv2/session/types.go +++ b/webv2/session/types.go @@ -43,26 +43,27 @@ type SourceDBConnDetails struct { // SessionState stores information for the current migration session. type SessionState struct { - SourceDB *sql.DB // Connection to source database in case of direct connection - SourceDBConnDetails SourceDBConnDetails // Connection details for source database - DbName string // Name of source database - Driver string // Name of Spanner migration tool driver in use - Conv *internal.Conv // Current conversion state - SessionFile string // Path to session file - IsOffline bool // True if the connection to remote metadata database is invalid - GCPProjectID string - SpannerInstanceID string - Dialect string - IsSharded bool - TmpDir string + SourceDB *sql.DB // Connection to source database in case of direct connection + SourceDBConnDetails SourceDBConnDetails // Connection details for source database + DbName string // Name of source database + Driver string // Name of Spanner migration tool driver in use + Conv *internal.Conv // Current conversion state + SessionFile string // Path to session file + IsOffline bool // True if the connection to remote metadata database is invalid + GCPProjectID string // GCP project id where the migration resources are created + SpannerProjectId string // Project id of the spanner instance + SpannerInstanceID string + Dialect string + IsSharded bool + TmpDir string ShardedDbConnDetails []profiles.DirectConnectionConfig - SourceProfileConfig profiles.SourceProfileConfig - Region string - SpannerDatabaseName string - Bucket string - RootPath string - SessionMetadata SessionMetadata - Error error + SourceProfileConfig profiles.SourceProfileConfig + Region string + SpannerDatabaseName string + Bucket string + RootPath string + SessionMetadata SessionMetadata + Error error Counter } diff --git a/webv2/web.go b/webv2/web.go index 0f50716f6..2c620350a 100644 --- a/webv2/web.go +++ b/webv2/web.go @@ -529,6 +529,11 @@ func getSourceDestinationSummary(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() sessionState.Conv.ConvLock.RLock() defer sessionState.Conv.ConvLock.RUnlock() + // GetSourceDestinationSummary is called when the user enters prepare migration page + // Getting and populating SpannerProjectId if it doesn't exist. + if sessionState.SpannerProjectId == "" { + sessionState.SpannerProjectId = sessionState.GCPProjectID + } var sessionSummary types.SessionSummary databaseType, err := helpers.GetSourceDatabaseFromDriver(sessionState.Driver) if err != nil { @@ -557,7 +562,7 @@ func getSourceDestinationSummary(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Error while creating instance admin client : %v", err), http.StatusBadRequest) return } - instanceInfo, err := instanceClient.GetInstance(ctx, &instancepb.GetInstanceRequest{Name: fmt.Sprintf("projects/%s/instances/%s", sessionState.GCPProjectID, sessionState.SpannerInstanceID)}) + instanceInfo, err := instanceClient.GetInstance(ctx, &instancepb.GetInstanceRequest{Name: fmt.Sprintf("projects/%s/instances/%s", sessionState.SpannerProjectId, sessionState.SpannerInstanceID)}) if err != nil { log.Println("get instance error") http.Error(w, fmt.Sprintf("Error while getting instance information : %v", err), http.StatusBadRequest) @@ -622,6 +627,11 @@ func migrate(w http.ResponseWriter, r *http.Request) { sessionState.Conv.Audit.Progress = internal.Progress{} sessionState.Conv.UI = true sourceProfile, targetProfile, ioHelper, dbName, err := getSourceAndTargetProfiles(sessionState, details) + // TODO: Fix UX flow of migration project id + migrationProjectId := sessionState.GCPProjectID + if sessionState.SpannerProjectId == "" { + sessionState.SpannerProjectId = sessionState.GCPProjectID + } if err != nil { log.Println("can't get source and target profile") http.Error(w, fmt.Sprintf("Can't get source and target profiles: %v", err), http.StatusBadRequest) @@ -640,7 +650,7 @@ func migrate(w http.ResponseWriter, r *http.Request) { if details.MigrationMode == helpers.SCHEMA_ONLY { log.Println("Starting schema only migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_SCHEMA_ONLY.Enum() - go cmd.MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, &cmd.SchemaCmd{}, sessionState.Conv, &sessionState.Error) + go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, &cmd.SchemaCmd{}, sessionState.Conv, &sessionState.Error) } else if details.MigrationMode == helpers.DATA_ONLY { dataCmd := &cmd.DataCmd{ SkipForeignKeys: details.SkipForeignKeys, @@ -648,7 +658,7 @@ func migrate(w http.ResponseWriter, r *http.Request) { } log.Println("Starting data only migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_DATA_ONLY.Enum() - go cmd.MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, dataCmd, sessionState.Conv, &sessionState.Error) + go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, dataCmd, sessionState.Conv, &sessionState.Error) } else { schemaAndDataCmd := &cmd.SchemaAndDataCmd{ SkipForeignKeys: details.SkipForeignKeys, @@ -656,7 +666,7 @@ func migrate(w http.ResponseWriter, r *http.Request) { } log.Println("Starting schema and data migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_SCHEMA_AND_DATA.Enum() - go cmd.MigrateDatabase(ctx, targetProfile, sourceProfile, dbName, &ioHelper, schemaAndDataCmd, sessionState.Conv, &sessionState.Error) + go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, schemaAndDataCmd, sessionState.Conv, &sessionState.Error) } w.WriteHeader(http.StatusOK) log.Println("migration completed", "method", r.Method, "path", r.URL.Path, "remoteaddr", r.RemoteAddr) @@ -669,7 +679,7 @@ func getGeneratedResources(w http.ResponseWriter, r *http.Request) { defer sessionState.Conv.ConvLock.RUnlock() generatedResources.MigrationJobId = sessionState.Conv.Audit.MigrationRequestId generatedResources.DatabaseName = sessionState.SpannerDatabaseName - generatedResources.DatabaseUrl = fmt.Sprintf("https://console.cloud.google.com/spanner/instances/%v/databases/%v/details/tables?project=%v", sessionState.SpannerInstanceID, sessionState.SpannerDatabaseName, sessionState.GCPProjectID) + generatedResources.DatabaseUrl = fmt.Sprintf("https://console.cloud.google.com/spanner/instances/%v/databases/%v/details/tables?project=%v", sessionState.SpannerInstanceID, sessionState.SpannerDatabaseName, sessionState.SpannerProjectId) generatedResources.BucketName = sessionState.Bucket + sessionState.RootPath generatedResources.BucketUrl = fmt.Sprintf("https://console.cloud.google.com/storage/browser/%v", sessionState.Bucket+sessionState.RootPath) generatedResources.ShardToShardResourcesMap = map[string][]types.ResourceDetails{} @@ -748,7 +758,7 @@ func getSourceAndTargetProfiles(sessionState *session.SessionState, details type } sessionState.SpannerDatabaseName = details.TargetDetails.TargetDB - targetProfileString := fmt.Sprintf("project=%v,instance=%v,dbName=%v,dialect=%v", sessionState.GCPProjectID, sessionState.SpannerInstanceID, details.TargetDetails.TargetDB, sessionState.Dialect) + targetProfileString := fmt.Sprintf("project=%v,instance=%v,dbName=%v,dialect=%v", sessionState.SpannerProjectId, sessionState.SpannerInstanceID, details.TargetDetails.TargetDB, sessionState.Dialect) if details.MigrationType == helpers.LOW_DOWNTIME_MIGRATION && !details.IsSharded { fileName := sessionState.Conv.Audit.MigrationRequestId + "-streaming.json" sessionState.Bucket, sessionState.RootPath, err = conversion.GetBucketFromDatastreamProfile(sessionState.GCPProjectID, sessionState.Region, details.TargetDetails.TargetConnectionProfileName) @@ -1004,7 +1014,7 @@ func init() { utilities.InitObjectId() sessionState.Conv = internal.MakeConv() config := config.TryInitializeSpannerConfig() - session.SetSessionStorageConnectionState(config.GCPProjectID, config.SpannerInstanceID) + session.SetSessionStorageConnectionState(config.GCPProjectID, config.SpannerProjectID, config.SpannerInstanceID) } // App connects to the web app v2.