Skip to content

Commit

Permalink
fix: store, retrieve asset.url (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudo-suhas authored Feb 22, 2023
1 parent c96e577 commit 495c666
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 18 deletions.
1 change: 1 addition & 0 deletions core/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Asset struct {
Name string `json:"name" diff:"name"`
Description string `json:"description" diff:"description"`
Data map[string]interface{} `json:"data" diff:"data"`
URL string `json:"url" diff:"url"`
Labels map[string]string `json:"labels" diff:"labels"`
Owners []user.User `json:"owners,omitempty" diff:"owners"`
CreatedAt time.Time `json:"created_at" diff:"-"`
Expand Down
5 changes: 5 additions & 0 deletions core/asset/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func TestAssetPatch(t *testing.T) {
"service": "firehose",
"description": "new-description",
"name": "new-name",
"url": "https://sample-url.com",
"labels": map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand All @@ -307,6 +308,7 @@ func TestAssetPatch(t *testing.T) {
Service: "firehose",
Description: "new-description",
Name: "new-name",
URL: "https://sample-url.com",
Labels: map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand All @@ -325,6 +327,7 @@ func TestAssetPatch(t *testing.T) {
Service: "optimus",
Description: "sample-description",
Name: "old-name",
URL: "https://sample-url-old.com",
Labels: map[string]string{
"foo": "bar",
},
Expand All @@ -338,6 +341,7 @@ func TestAssetPatch(t *testing.T) {
"service": "firehose",
"description": "new-description",
"name": "new-name",
"url": "https://sample-url.com",
"labels": map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand All @@ -353,6 +357,7 @@ func TestAssetPatch(t *testing.T) {
Service: "firehose",
Description: "new-description",
Name: "new-name",
URL: "https://sample-url.com",
Labels: map[string]string{
"bar": "foo",
"bar2": "foo2",
Expand Down
1 change: 1 addition & 0 deletions core/asset/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func patchAsset(a *Asset, patchData map[string]interface{}) {
a.Service = patchString("service", patchData, a.Service)
a.Name = patchString("name", patchData, a.Name)
a.Description = patchString("description", patchData, a.Description)
a.URL = patchString("url", patchData, a.URL)

labels, exists := patchData["labels"]
if exists {
Expand Down
5 changes: 5 additions & 0 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (server *APIServer) buildAsset(baseAsset *compassv1beta1.UpsertAssetRequest
Name: baseAsset.GetName(),
Description: baseAsset.GetDescription(),
Data: baseAsset.GetData().AsMap(),
URL: baseAsset.Url,
Labels: baseAsset.GetLabels(),
}

Expand Down Expand Up @@ -500,6 +501,9 @@ func decodePatchAssetToMap(pb *compassv1beta1.UpsertPatchAssetRequest_Asset) map
if pb.GetData() != nil {
m["data"] = pb.GetData().AsMap()
}
if len(pb.Url) > 0 {
m["url"] = pb.Url
}
if pb.GetLabels() != nil {
m["labels"] = pb.GetLabels()
}
Expand Down Expand Up @@ -578,6 +582,7 @@ func assetToProto(a asset.Asset, withChangelog bool) (assetPB *compassv1beta1.As
Name: a.Name,
Description: a.Description,
Data: data,
Url: a.URL,
Labels: a.Labels,
Owners: owners,
Version: a.Version,
Expand Down
7 changes: 7 additions & 0 deletions internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func TestUpsertAsset(t *testing.T) {
Name: "new-name",
Service: "kafka",
Data: &structpb.Struct{},
Url: "https://sample-url.com",
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
},
Upstreams: []*compassv1beta1.LineageNode{
Expand Down Expand Up @@ -450,6 +451,7 @@ func TestUpsertAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
}
upstreams := []string{"upstream-1"}
Expand Down Expand Up @@ -520,6 +522,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Url: "https://sample-url.com",
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
},
Upstreams: []*compassv1beta1.LineageNode{
Expand Down Expand Up @@ -549,6 +552,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
}
)
Expand Down Expand Up @@ -652,6 +656,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
}
upstreams := []string{"upstream-1"}
Expand Down Expand Up @@ -688,6 +693,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
}

Expand Down Expand Up @@ -733,6 +739,7 @@ func TestUpsertPatchAsset(t *testing.T) {
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
URL: "https://sample-url-old.com",
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
}

Expand Down
2 changes: 2 additions & 0 deletions internal/store/postgres/asset_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type AssetModel struct {
Service string `db:"service"`
Description string `db:"description"`
Data JSONMap `db:"data"`
URL string `db:"url"`
Labels JSONMap `db:"labels"`
Version string `db:"version"`
UpdatedBy UserModel `db:"updated_by"`
Expand All @@ -41,6 +42,7 @@ func (a *AssetModel) toAsset(owners []user.User) asset.Asset {
Service: a.Service,
Description: a.Description,
Data: a.Data,
URL: a.URL,
Labels: a.buildLabels(),
Owners: owners,
Version: a.Version,
Expand Down
38 changes: 20 additions & 18 deletions internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ func (r *AssetRepository) deleteWithPredicate(ctx context.Context, pred sq.Eq) (
func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id string, err error) {
err = r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
query, args, err := sq.Insert("assets").
Columns("urn", "type", "service", "name", "description", "data", "labels", "updated_by", "version").
Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.Labels, ast.UpdatedBy.ID, asset.BaseVersion).
Columns("urn", "type", "service", "name", "description", "data", "url", "labels", "updated_by", "version").
Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.URL, ast.Labels, ast.UpdatedBy.ID, asset.BaseVersion).
Suffix("RETURNING \"id\"").
PlaceholderFormat(sq.Dollar).
ToSql()
Expand Down Expand Up @@ -490,7 +490,6 @@ func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id stri
}

func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset *asset.Asset, oldAsset *asset.Asset, clog diff.Changelog) error {

if !isValidUUID(assetID) {
return asset.InvalidError{AssetID: assetID}
}
Expand All @@ -508,22 +507,24 @@ func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset *
newAsset.Version = newVersion
newAsset.ID = oldAsset.ID

err = r.execContext(ctx, tx,
`UPDATE assets
SET urn = $1,
type = $2,
service = $3,
name = $4,
description = $5,
data = $6,
labels = $7,
updated_at = $8,
updated_by = $9,
version = $10
WHERE id = $11;
`,
newAsset.URN, newAsset.Type, newAsset.Service, newAsset.Name, newAsset.Description, newAsset.Data, newAsset.Labels, time.Now(), newAsset.UpdatedBy.ID, newAsset.Version, assetID)
query, args, err := r.buildSQL(sq.Update("assets").
Set("urn", newAsset.URN).
Set("type", newAsset.Type).
Set("service", newAsset.Service).
Set("name", newAsset.Name).
Set("description", newAsset.Description).
Set("data", newAsset.Data).
Set("url", newAsset.URL).
Set("labels", newAsset.Labels).
Set("updated_at", time.Now()).
Set("updated_by", newAsset.UpdatedBy.ID).
Set("version", newAsset.Version).
Where(sq.Eq{"id": assetID}))
if err != nil {
return fmt.Errorf("build query: %w", err)
}

if err := r.execContext(ctx, tx, query, args...); err != nil {
return fmt.Errorf("error running update asset query: %w", err)
}

Expand Down Expand Up @@ -775,6 +776,7 @@ func (r *AssetRepository) getAssetSQL() sq.SelectBuilder {
a.service as service,
a.description as description,
a.data as data,
COALESCE(a.url, '') as url,
a.labels as labels,
a.version as version,
a.created_at as created_at,
Expand Down
31 changes: 31 additions & 0 deletions internal/store/postgres/asset_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
Type: "table",
Service: "bigquery",
Version: "0.1",
URL: "https://sample-url.com",
UpdatedBy: r.users[0],
}
id, err := r.repository.Upsert(r.ctx, &ast)
Expand Down Expand Up @@ -1019,6 +1020,36 @@ func (r *AssetRepositoryTestSuite) TestUpsert() {
r.Equal(ast.ID, identicalAsset.ID)
})

r.Run("should update the asset if asset is not identical", func() {
ast := asset.Asset{
URN: "urn-u-2",
Type: "table",
Service: "bigquery",
URL: "https://sample-url-old.com",
UpdatedBy: r.users[0],
}

id, err := r.repository.Upsert(r.ctx, &ast)
r.Require().NoError(err)
r.NotEmpty(id)
ast.ID = id

updated := ast
updated.URL = "https://sample-url.com"

id, err = r.repository.Upsert(r.ctx, &updated)
r.Require().NoError(err)
r.NotEmpty(id)
updated.ID = id

r.Equal(ast.ID, updated.ID)

actual, err := r.repository.GetByID(r.ctx, ast.ID)
r.NoError(err)

r.Equal(updated.URL, actual.URL)
})

r.Run("should delete old owners if it does not exist on new asset", func() {
ast := asset.Asset{
URN: "urn-u-4",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE assets DROP COLUMN IF EXISTS url;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE assets ADD COLUMN IF NOT EXISTS url TEXT;

0 comments on commit 495c666

Please sign in to comment.