Skip to content

Commit

Permalink
Dataflow version bump and project separation (#805)
Browse files Browse the repository at this point in the history
* Dataflow version bump and project separation

* fixed the description of projectId
  • Loading branch information
aksharauke committed Apr 1, 2024
1 parent 796e8d8 commit c5e5b2e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
2 changes: 2 additions & 0 deletions docs/reverse-replication/ReverseReplicationUserGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The Dataflow job that writes to source database exposes the following per shard
|replication_lag_in_seconds_\<logical shard name\>| Replication lag min,max and count value for the shard|
| metadata_file_create_lag_retry_\<logical shard name\> | Count of file lookup retries done when the job that writes to GCS is lagging |
| mySQL_retry_\<logical shard name\> | Number of retries done when MySQL is not reachable|
| shard_failed_\<logical shard name\> | Published when there is a failure while processing the shard |

These can be used to track the pipeline progress.
However, there is a limit of 100 on the total number of metrics per project. So if this limit is exhausted, the Dataflow job will give a message like so:
Expand Down Expand Up @@ -199,6 +200,7 @@ In this case, check if you observe the following:
2. The primary key value was not present in the change stream data
3. When there is no data written to Spanner for a given interval for a given shard, no file is created in GCS. In such a case, the interval is skipped by the writer Dataflow job. This can be verified in the logs by searching for the text ```skipping the file```. If a file is marked as skipped in the logs but it exists in GCS - this indicates a data loss scenario - please raise a bug.
4. Check the shard_file_process_progress table in the metadata database. If it is lagging, then wait for the pipeline to catch up so such that data gets reverse replicated.
5. Check if the shard_failed_\<logical shard name\> metric is present, this indicates there was a failure when processing the shard. Look at the logs for the failure details.


#### There is higher load than the expected QPS on spanner instance post cutover
Expand Down
3 changes: 2 additions & 1 deletion docs/reverse-replication/RunnigReverseReplication.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ The script takes in multiple arguments to orchestrate the pipeline. They are:
- `metadataInstance`: Spanner instance name to store changestream metadata. Defaults to target spanner instance id.
- `metadataTableSuffix`: The suffix to apply when creating metadata tables.Helpful in case of multiple runs.Default is no suffix.
- `networkTags`: network tags addded to the Dataflow jobs worker and launcher VMs.
- `projectId`: Project id of the Spanner instance.
- `projectId`: projectId for Dataflow jobs. If spannerProjectId is not specified, this value is used for Cloud Spanner project id as well.
- `sessionFilePath`: GCS file path for session file generated via Spanner migration tool.
- `serviceAccountEmail`: the email address of the service account to run the job as.
- `skipChangeStreamCreation`: whether to skip the change stream creation. Default is false.
- `skipMetadataDatabaseCreation`: whether to skip Metadata database creation.Default is false.
- `sourceDbTimezoneOffset`: the timezone offset with respect to UTC for the source database.Defaults to +00:00.
- `sourceShardsFilePath`: GCS file path for file containing shard info. Details on structure mentioned later.
- `sourceWriterTemplateLocation` : the dataflow template location for the Source writer job.
- `spannerProjectId`: the project id where Cloud Spanner resides, for use case when Cloud Spanner is in a different project than where Dataflow would run.
- `spannerReaderTemplateLocation`: the dataflow template location for the Spanner reader job
- `startTimestamp`: Timestamp from which the changestream should start reading changes in RFC 3339 format, defaults to empty string which is equivalent to the current timestamp.
- `readerMaxWorkers`: Number of maximum workers for the reader job.
Expand Down
27 changes: 17 additions & 10 deletions reverse_replication/reverse-replication-runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ var (
networkTags string
runIdentifier string
readerMaxWorkers int
spannerProjectId string
)

const (
ALREADY_EXISTS_ERROR = "code = AlreadyExists"
)

func setupGlobalFlags() {
flag.StringVar(&projectId, "projectId", "", "ProjectId.")
flag.StringVar(&projectId, "projectId", "", "ProjectId for Dataflow jobs. If spannerProjectId is not specified, this value is used for Cloud Spanner project id as well.")
flag.StringVar(&dataflowRegion, "dataflowRegion", "", "Region for dataflow jobs.")
flag.StringVar(&jobNamePrefix, "jobNamePrefix", "smt-reverse-replication", "Job name prefix for the dataflow jobs, defaults to reverse-rep. Automatically converted to lower case due to Dataflow name constraints.")
flag.StringVar(&changeStreamName, "changeStreamName", "reverseReplicationStream", "Change stream name, defaults to reverseReplicationStream.")
Expand All @@ -90,8 +91,8 @@ func setupGlobalFlags() {
flag.StringVar(&serviceAccountEmail, "serviceAccountEmail", "", "The email address of the service account to run the job as.")
flag.IntVar(&readerWorkers, "readerWorkers", 5, "Number of workers for reader job.")
flag.IntVar(&writerWorkers, "writerWorkers", 5, "Number of workers for writer job.")
flag.StringVar(&spannerReaderTemplateLocation, "spannerReaderTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-06-00_RC00/flex/Spanner_Change_Streams_to_Sharded_File_Sink", "The dataflow template location for the Spanner reader job.")
flag.StringVar(&sourceWriterTemplateLocation, "sourceWriterTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-06-00_RC00/flex/GCS_to_Sourcedb", "The dataflow template location for the Source writer job.")
flag.StringVar(&spannerReaderTemplateLocation, "spannerReaderTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-27-00_RC00/flex/Spanner_Change_Streams_to_Sharded_File_Sink", "The dataflow template location for the Spanner reader job.")
flag.StringVar(&sourceWriterTemplateLocation, "sourceWriterTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-27-00_RC00/flex/GCS_to_Sourcedb", "The dataflow template location for the Source writer job.")
flag.StringVar(&jobsToLaunch, "jobsToLaunch", "both", "Whether to launch the spanner reader job or the source writer job or both. Default is both. Support values are both,reader,writer.")
flag.BoolVar(&skipChangeStreamCreation, "skipChangeStreamCreation", false, "Whether to skip the change stream creation. Default is false.")
flag.BoolVar(&skipMetadataDatabaseCreation, "skipMetadataDatabaseCreation", false, "Whether to skip Metadata database creation.Default is false.")
Expand All @@ -101,6 +102,7 @@ func setupGlobalFlags() {
flag.StringVar(&runIdentifier, "runIdentifier", "", "The run identifier for the Dataflow jobs.")
flag.StringVar(&readerShardingCustomParameters, "readerShardingCustomParameters", "", "Any custom parameters to be supplied to custom sharding class.")
flag.IntVar(&readerMaxWorkers, "readerMaxWorkers", 20, "Number of max workers for reader job.")
flag.StringVar(&spannerProjectId, "spannerProjectId", "", "The project id where Cloud Spanner resides, for use case when Cloud Spanner is in a different project than where Dataflow would run.")

}

Expand Down Expand Up @@ -175,6 +177,11 @@ func prechecks() error {
return fmt.Errorf("please specify a valid GCS path for readerShardingCustomJarPath, like gs://<>")
}

if spannerProjectId == "" {
fmt.Println("Setting the Spanner Project Id to Dataflow project id: ", projectId)
spannerProjectId = projectId
}

return nil
}

Expand All @@ -190,7 +197,7 @@ func main() {
return
}

dbUri := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, dbName)
dbUri := fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, instanceId, dbName)

ctx := context.Background()
adminClient, _ := database.NewDatabaseAdminClient(ctx)
Expand Down Expand Up @@ -223,7 +230,7 @@ func main() {

if !skipMetadataDatabaseCreation {
createDbReq := &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectId, metadataInstance),
Parent: fmt.Sprintf("projects/%s/instances/%s", spannerProjectId, metadataInstance),
CreateStatement: fmt.Sprintf("CREATE DATABASE `%s`", metadataDatabase),
}

Expand All @@ -233,18 +240,18 @@ func main() {
fmt.Printf("Cannot submit create database request for metadata db: %v\n", err)
return
} else {
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
}
} else {
if _, err := createDbOp.Wait(ctx); err != nil {
if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
fmt.Printf("create database request failed for metadata db: %v\n", err)
return
} else {
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
}
} else {
fmt.Println("Created metadata db", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
fmt.Println("Created metadata db", fmt.Sprintf("projects/%s/instances/%s/databases/%s", spannerProjectId, metadataInstance, metadataDatabase))
}
}
}
Expand Down Expand Up @@ -288,7 +295,7 @@ func main() {
"changeStreamName": changeStreamName,
"instanceId": instanceId,
"databaseId": dbName,
"spannerProjectId": projectId,
"spannerProjectId": spannerProjectId,
"metadataInstance": metadataInstance,
"metadataDatabase": metadataDatabase,
"startTimestamp": startTimestamp,
Expand Down Expand Up @@ -356,7 +363,7 @@ func main() {
"sourceDbTimezoneOffset": sourceDbTimezoneOffset,
"metadataTableSuffix": metadataTableSuffix,
"GCSInputDirectoryPath": gcsPath,
"spannerProjectId": projectId,
"spannerProjectId": spannerProjectId,
"metadataInstance": metadataInstance,
"metadataDatabase": metadataDatabase,
"runMode": writerRunMode,
Expand Down

0 comments on commit c5e5b2e

Please sign in to comment.