Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming go sample test #752

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,34 +583,58 @@
return nil
}

// LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job.
func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
project, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil)
dataflowCfg := streamingCfg.DataflowCfg
datastreamCfg := streamingCfg.DatastreamCfg
type DataflowAccessor interface {
LaunchFlexTemplate(ctx context.Context, req *dataflowpb.LaunchFlexTemplateRequest, opts ...gax.CallOption) (*dataflowpb.LaunchFlexTemplateResponse, error)
}

// Rate limit this function to match DataFlow createJob Quota.
DATA_FLOW_RL.Take()
type DataflowAccessorImpl struct{}

fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", project, "-", dataflowCfg.Location)
func NewDataflowAccessor() DataflowAccessor {
return DataflowAccessorImpl{}

Check warning on line 593 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L592-L593

Added lines #L592 - L593 were not covered by tests
}

c, err := dataflow.NewFlexTemplatesClient(ctx)
func (dfA DataflowAccessorImpl) LaunchFlexTemplate(ctx context.Context, req *dataflowpb.LaunchFlexTemplateRequest, opts ...gax.CallOption) (*dataflowpb.LaunchFlexTemplateResponse, error) {
fmt.Println("Created flex template client...")
dfClient, err := dataflow.NewFlexTemplatesClient(ctx)
defer dfClient.Close()

Check warning on line 599 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L596-L599

Added lines #L596 - L599 were not covered by tests
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not create flex template client: %v", err)
return nil, fmt.Errorf("could not create flex template client: %v", err)

Check warning on line 601 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L601

Added line #L601 was not covered by tests
}
defer c.Close()
fmt.Println("Created flex template client...")
return dfClient.LaunchFlexTemplate(ctx, req)

Check warning on line 603 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L603

Added line #L603 was not covered by tests
}

//Creating datastream client to fetch the gcs bucket using target profile.
type DatastreamAccessor interface {
GetConnectionProfile(ctx context.Context, req *datastreampb.GetConnectionProfileRequest, opts ...gax.CallOption) (*datastreampb.ConnectionProfile, error)
}

type DatastreamAccessorImpl struct{}

func NewDatastreamAccessor() DatastreamAccessor {
return DatastreamAccessorImpl{}

Check warning on line 613 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L612-L613

Added lines #L612 - L613 were not covered by tests
}

func (dsA DatastreamAccessorImpl) GetConnectionProfile(ctx context.Context, req *datastreampb.GetConnectionProfileRequest, opts ...gax.CallOption) (*datastreampb.ConnectionProfile, error) {

Check warning on line 616 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L616

Added line #L616 was not covered by tests
dsClient, err := datastream.NewClient(ctx)
defer dsClient.Close()

Check warning on line 618 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L618

Added line #L618 was not covered by tests
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("datastream client can not be created: %v", err)
return nil, fmt.Errorf("datastream client can not be created: %v", err)

Check warning on line 620 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L620

Added line #L620 was not covered by tests
}
defer dsClient.Close()
return dsClient.GetConnectionProfile(ctx, req)

Check warning on line 622 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L622

Added line #L622 was not covered by tests
}

// LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job.
func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv, dataflowAccessor DataflowAccessor, datastreamAccessor DatastreamAccessor) (internal.DataflowOutput, error) {
project, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil)
dataflowCfg := streamingCfg.DataflowCfg
datastreamCfg := streamingCfg.DatastreamCfg

// Rate limit this function to match DataFlow createJob Quota.
DATA_FLOW_RL.Take()

fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", project, "-", dataflowCfg.Location)
// Fetch the GCS path from the destination connection profile.
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", project, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name)
res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
res, err := datastreamAccessor.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not get connection profiles: %v", err)
}
Expand Down Expand Up @@ -716,8 +740,7 @@
Location: dataflowCfg.Location,
}
fmt.Println("Created flex template request body...")

respDf, err := c.LaunchFlexTemplate(ctx, req)
respDf, err := dataflowAccessor.LaunchFlexTemplate(ctx, req)
if err != nil {
fmt.Printf("flexTemplateRequest: %+v\n", req)
return internal.DataflowOutput{}, fmt.Errorf("unable to launch template: %v", err)
Expand Down Expand Up @@ -877,7 +900,7 @@
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("error while writing to GCS: %v", err)
}
dfOutput, err := LaunchDataflowJob(ctx, targetProfile, streamingCfg, conv)
dfOutput, err := LaunchDataflowJob(ctx, targetProfile, streamingCfg, conv, NewDataflowAccessor(), NewDatastreamAccessor())

Check warning on line 903 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L903

Added line #L903 was not covered by tests
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("error launching dataflow: %v", err)
}
Expand Down
58 changes: 58 additions & 0 deletions streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,73 @@
package streaming

import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"path/filepath"
"sort"
"testing"

"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/googleapis/gax-go/v2"
"github.com/stretchr/testify/assert"
datastreampb "google.golang.org/genproto/googleapis/cloud/datastream/v1"
dataflowpb "google.golang.org/genproto/googleapis/dataflow/v1beta3"
)

type DataflowAccessorMock struct {}
var launchFlexTemplateMock func(ctx context.Context, req *dataflowpb.LaunchFlexTemplateRequest, opts ...gax.CallOption) (*dataflowpb.LaunchFlexTemplateResponse, error)
func (dataflowMock DataflowAccessorMock) LaunchFlexTemplate(ctx context.Context, req *dataflowpb.LaunchFlexTemplateRequest, opts ...gax.CallOption) (*dataflowpb.LaunchFlexTemplateResponse, error) {
return launchFlexTemplateMock(ctx, req)
}

type DatastreamAccessorMock struct {}
var getConnectionProfileMock func(ctx context.Context, req *datastreampb.GetConnectionProfileRequest, opts ...gax.CallOption) (*datastreampb.ConnectionProfile, error)
func (datastreamMocl DatastreamAccessorMock) GetConnectionProfile(ctx context.Context, req *datastreampb.GetConnectionProfileRequest, opts ...gax.CallOption) (*datastreampb.ConnectionProfile, error) {
return getConnectionProfileMock(ctx, req)
}

func TestLaunchDataflowjobWithError(t *testing.T) {
Sp := profiles.TargetProfileConnectionSpanner{
Endpoint: "zz",
Project: "aa",
Instance: "bb",
Dbname: "cc",
Dialect: "dd",
}
conn := profiles.TargetProfileConnection{Ty: profiles.TargetProfileConnectionTypeSpanner, Sp: Sp}
targetProfile := profiles.TargetProfile{
Ty: profiles.TargetProfileTypeConnection,
Conn: conn,
}
path := filepath.Join("..", "test_data", "streamingcfg.json")
cfgFile, _ := ioutil.ReadFile(path)
streamingCfg := StreamingCfg{}
json.Unmarshal(cfgFile, &streamingCfg)
dfMock := DataflowAccessorMock{}
dsMock := DatastreamAccessorMock{}
launchFlexTemplateMock = func(ctx context.Context, req *dataflowpb.LaunchFlexTemplateRequest, opts ...gax.CallOption) (*dataflowpb.LaunchFlexTemplateResponse, error) {
return &dataflowpb.LaunchFlexTemplateResponse{
Job: nil,
}, errors.New("this is a mocked error")
}
getConnectionProfileMock = func(ctx context.Context, req *datastreampb.GetConnectionProfileRequest, opts ...gax.CallOption) (*datastreampb.ConnectionProfile, error) {
gcsProfile := datastreampb.GcsProfile{
Bucket: "test",
RootPath: "test",
}
return &datastreampb.ConnectionProfile{
Name: "test",
Profile: &datastreampb.ConnectionProfile_GcsProfile{GcsProfile: &gcsProfile,
},
}, nil
}
_, err := LaunchDataflowJob(context.Background(), targetProfile, streamingCfg, nil, dfMock, dsMock)
assert.Equal(t, "unable to launch template: this is a mocked error", err.Error())
}

func TestGetPostgreSQLSourceStreamConfig(t *testing.T) {
testCases := []struct {
name string
Expand Down
48 changes: 48 additions & 0 deletions test_data/streamingcfg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"datastreamCfg": {
"streamId": "",
"streamLocation": "us-central1",
"streamDisplayName": "",
"sourceConnectionConfig": {
"Name": "stalltestxx",
"Location": "us-central1"
},
"destinationConnectionConfig": {
"Name": "stalltestyy3",
"Location": "us-central1",
"Prefix": ""
},
"properties": "",
"maxConcurrentBackfillTasks": "",
"maxConcurrentCdcTasks": ""
},
"gcsCfg": {
"ttlInDays": 0,
"ttlInDaysSet": false
},
"dataflowCfg": {
"projectId": "",
"jobName": "",
"location": "us-central1",
"hostProjectId": "",
"network": "",
"subnetwork": "",
"maxWorkers": "",
"numWorkers": "",
"serviceAccountEmail": "",
"machineType": "",
"additionalUserLabels": "",
"kmsKeyName": "",
"gcsTemplatePath": "",
"dbNameToShardIdMap": null
},
"tmpDir": "gs://smt-job-6a99-2bcc/",
"pubsubCfg": {
"TopicId": "",
"SubscriptionId": "",
"NotificationId": "",
"BucketName": "",
"Region": ""
},
"dataShardId": ""
}
Loading