Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deprecate input file pattern #843

Merged
merged 4 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions accessors/dataflow/dataflow_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ func TestMain(m *testing.M) {

func getParameters() map[string]string {
return map[string]string{
"inputFilePattern": "gs://inputFilePattern",
"streamName": "my-stream",
"instanceId": "my-instance",
"databaseId": "my-dbName",
"sessionFilePath": "gs://session.json",
"deadLetterQueueDirectory": "gs://dlq",
"transformationContextFilePath": "gs://transformationContext.json",
"directoryWatchDurationInMinutes": "480", // Setting directory watch timeout to 8 hours
"gcsPubSubSubscription": "projects/my-project/subscriptions/my-subscription",
}
}

Expand Down Expand Up @@ -106,7 +105,7 @@ func getExpectedGcloudCmd1() string {
"--dataflow-kms-key sample-kms-key --disable-public-ips " +
"--enable-streaming-engine " +
"--parameters databaseId=my-dbName,deadLetterQueueDirectory=gs://dlq," +
"directoryWatchDurationInMinutes=480,inputFilePattern=gs://inputFilePattern," +
"gcsPubSubSubscription=projects/my-project/subscriptions/my-subscription," +
"instanceId=my-instance,sessionFilePath=gs://session.json,streamName=my-stream," +
"transformationContextFilePath=gs://transformationContext.json"
}
Expand Down Expand Up @@ -142,7 +141,8 @@ func getTemplateDfRequest2() *dataflowpb.LaunchFlexTemplateRequest {
}

func getExpectedGcloudCmd2() string {
return "gcloud dataflow flex-template run test-job " +
return ""+
"gcloud dataflow flex-template run test-job " +
"--project=test-project --region=us-central1 " +
"--template-file-gcs-location=gs://template/Cloud_Datastream_to_Spanner " +
"--num-workers 10 --max-workers 50 --service-account-email [email protected] " +
Expand All @@ -153,7 +153,7 @@ func getExpectedGcloudCmd2() string {
"--worker-zone test-worker-zone --enable-streaming-engine " +
"--flexrs-goal FLEXRS_SPEED_OPTIMIZED --staging-location gs://staging-location " +
"--parameters databaseId=my-dbName,deadLetterQueueDirectory=gs://dlq," +
"directoryWatchDurationInMinutes=480,inputFilePattern=gs://inputFilePattern," +
"gcsPubSubSubscription=projects/my-project/subscriptions/my-subscription," +
"instanceId=my-instance,sessionFilePath=gs://session.json,streamName=my-stream," +
"transformationContextFilePath=gs://transformationContext.json"
}
Expand Down
6 changes: 3 additions & 3 deletions docs/troubleshoot/minimal.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ gcloud dataflow flex-template run <jobName> \
--template-file-gcs-location=gs://dataflow-templates-southamerica-west1/2023-09-12-00_RC00/flex/Cloud_Datastream_to_Spanner \
--num-workers 1 --max-workers 50 \
--enable-streaming-engine \
--parameters databaseId=<database id>,deadLetterQueueDirectory=<GCS location of the DLQ directory>,inputFilePattern=<gcs location of the datastream output>,instanceId=<spanner-instance-id>,sessionFilePath=<GCS location of the session json>,streamName=<data stream name>,transformationContextFilePath=<path to transformation context json>
--parameters databaseId=<database id>,deadLetterQueueDirectory=<GCS location of the DLQ directory>,gcsPubSubSubscription=<pubsub subscription being used in a gcs notification policy>,instanceId=<spanner-instance-id>,sessionFilePath=<GCS location of the session json>,streamName=<data stream name>,transformationContextFilePath=<path to transformation context json>

```

Expand All @@ -119,7 +119,7 @@ gcloud dataflow flex-template run <jobname> \
--region=<the region where the dataflow job must run> \
--template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner \
--additional-experiments=use_runner_v2 \
--parameters inputFilePattern=<GCS location of the input file pattern>,streamName=<Datastream name>, \
--parameters gcsPubSubSubscription=<pubsub subscription being used in a gcs notification policy>,streamName=<Datastream name>, \
instanceId=<Spanner Instance Id>,databaseId=<Spanner Database Id>,sessionFilePath=<GCS path to session file>, \
deadLetterQueueDirectory=<GCS path to the DLQ>,runMode=retryDLQ
```
Expand All @@ -128,7 +128,7 @@ The following parameters can be taken from the regular forward migration Dataflo

```sh
region
inputFilePattern
gcsPubSubSubscription
streamName
instanceId
databaseId
Expand Down
1 change: 0 additions & 1 deletion streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,6 @@ func LaunchDataflowJob(ctx context.Context, migrationProjectId string, targetPro
JobName: dataflowCfg.JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: gcsTemplatePath},
Parameters: map[string]string{
"inputFilePattern": utils.ConcatDirectoryPath(inputFilePattern, "data"),
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"projectId": spannerProjectId,
"instanceId": instance,
Expand Down
Loading