Skip to content

Commit

Permalink
Disable public IP when network is specified (#565)
Browse files Browse the repository at this point in the history
  • Loading branch information
aksharauke authored Jun 28, 2023
1 parent b0da55e commit e4e33b1
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ func getMysqlSourceStreamConfig(dbList []profiles.LogicalShard, tableList []stri
for _, db := range dbList {
//create include db object
includeDb := &datastreampb.MysqlDatabase{
Database: db.DbName,
Database: db.DbName,
MysqlTables: mysqlTables,
}
includeDbList = append(includeDbList, includeDb)
}
//TODO: Clean up fmt.Printf logs and replace them with zap logger.
fmt.Printf("Include DB List for datastream: %+v\n", includeDbList)
mysqlSrcCfg := &datastreampb.MysqlSourceConfig{
IncludeObjects: &datastreampb.MysqlRdbms{MysqlDatabases: includeDbList},
IncludeObjects: &datastreampb.MysqlRdbms{MysqlDatabases: includeDbList},
MaxConcurrentBackfillTasks: 50,
}
return &datastreampb.SourceConfig_MysqlSourceConfig{MysqlSourceConfig: mysqlSrcCfg}
Expand Down Expand Up @@ -242,7 +242,7 @@ func getPostgreSQLSourceStreamConfig(properties string) (*datastreampb.SourceCon
func getSourceStreamConfig(srcCfg *datastreampb.SourceConfig, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, datastreamCfg DatastreamCfg) error {
switch sourceProfile.Driver {
case constants.MYSQL:
// For MySQL, it supports sharded migrations and batching databases in a physical machine into a single
// For MySQL, it supports sharded migrations and batching databases in a physical machine into a single
//Datastream, so dbList is passed.
srcCfg.SourceStreamConfig = getMysqlSourceStreamConfig(dbList, datastreamCfg.tableList)
return nil
Expand Down Expand Up @@ -445,16 +445,22 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
} else {
dataflowHostProjectId = dataflowCfg.HostProjectId
}

dataflowSubnetwork := ""

// If custom network is not selected, use public IP. Typical for internal testing flow.
workerIpAddressConfig := dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC

if dataflowCfg.Network != "" {
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
if dataflowCfg.Subnetwork == "" {
return fmt.Errorf("if network is specified, subnetwork cannot be empty")
} else {
dataflowSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", dataflowHostProjectId, dataflowCfg.Location, dataflowCfg.Subnetwork)
}
}

launchParameters := createLaunchParameters(dataflowCfg, inputFilePattern, project, datastreamCfg, instance, dbName, streamingCfg, dataflowSubnetwork)
launchParameters := createLaunchParameters(dataflowCfg, inputFilePattern, project, datastreamCfg, instance, dbName, streamingCfg, dataflowSubnetwork, workerIpAddressConfig)

req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: project,
Expand Down Expand Up @@ -489,7 +495,7 @@ func storeGeneratedResources(conv *internal.Conv, datastreamCfg DatastreamCfg, r
" will have to be manually cleaned up via the UI. HarbourBridge will not delete them post completion of the migration.")
}

func createLaunchParameters(dataflowCfg DataflowCfg, inputFilePattern string, project string, datastreamCfg DatastreamCfg, instance string, dbName string, streamingCfg StreamingCfg, dataflowSubnetwork string) *dataflowpb.LaunchFlexTemplateParameter {
func createLaunchParameters(dataflowCfg DataflowCfg, inputFilePattern string, project string, datastreamCfg DatastreamCfg, instance string, dbName string, streamingCfg StreamingCfg, dataflowSubnetwork string, workerIpAddressConfig dataflowpb.WorkerIPAddressConfiguration) *dataflowpb.LaunchFlexTemplateParameter {
return &dataflowpb.LaunchFlexTemplateParameter{
JobName: dataflowCfg.JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: "gs://dataflow-templates-southamerica-west1/2023-03-07-00_RC00/flex/Cloud_Datastream_to_Spanner"},
Expand All @@ -508,6 +514,7 @@ func createLaunchParameters(dataflowCfg DataflowCfg, inputFilePattern string, pr
EnableStreamingEngine: true,
Network: dataflowCfg.Network,
Subnetwork: dataflowSubnetwork,
IpConfiguration: workerIpAddressConfig,
},
}
}
Expand Down

0 comments on commit e4e33b1

Please sign in to comment.