Skip to content

Commit

Permalink
changed dataflow path (#577)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyakhajanchi authored Jul 10, 2023
1 parent 31f2193 commit 991e81f
Show file tree
Hide file tree
Showing 6 changed files with 665 additions and 27 deletions.
17 changes: 8 additions & 9 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,16 +498,15 @@ func storeGeneratedResources(conv *internal.Conv, datastreamCfg DatastreamCfg, r
func createLaunchParameters(dataflowCfg DataflowCfg, inputFilePattern string, project string, datastreamCfg DatastreamCfg, instance string, dbName string, streamingCfg StreamingCfg, dataflowSubnetwork string, workerIpAddressConfig dataflowpb.WorkerIPAddressConfiguration) *dataflowpb.LaunchFlexTemplateParameter {
return &dataflowpb.LaunchFlexTemplateParameter{
JobName: dataflowCfg.JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: "gs://dataflow-templates-southamerica-west1/2023-03-07-00_RC00/flex/Cloud_Datastream_to_Spanner"},
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: "gs://dataflow-templates-southamerica-west1/2023-07-04-00_RC00/flex/Cloud_Datastream_to_Spanner"},
Parameters: map[string]string{
"inputFilePattern": inputFilePattern,
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"instanceId": instance,
"databaseId": dbName,
"sessionFilePath": streamingCfg.TmpDir + "session.json",
"deadLetterQueueDirectory": inputFilePattern + "dlq",
// TODO(khajanchi): Uncomment this one dataflow template is released
// "transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json",
"inputFilePattern": inputFilePattern,
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"instanceId": instance,
"databaseId": dbName,
"sessionFilePath": streamingCfg.TmpDir + "session.json",
"deadLetterQueueDirectory": inputFilePattern + "dlq",
"transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json",
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
MaxWorkers: maxWorkers,
Expand Down
Loading

0 comments on commit 991e81f

Please sign in to comment.