Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anshul committed Jun 27, 2024
1 parent 03fb00b commit cbabea4
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 87 deletions.
19 changes: 12 additions & 7 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ type Options struct {
}

type Service struct {
DB database.DB
ProvisionerSet map[string]provisioner.Provisioner
Email *email.Client
Github Github
AI ai.Client
AssetsBucket *storage.BucketHandle
DB database.DB
ProvisionerSet map[string]provisioner.Provisioner
Email *email.Client
Github Github
AI ai.Client
// Assets as reduced surface of storage.BucketHandle
// enables us to use a mock in tests.
Assets interface {
SignedURL(object string, opts *storage.SignedURLOptions) (string, error)
Object(path string) *storage.ObjectHandle
}
Used *usedFlusher
Logger *zap.Logger
opts *Options
Expand Down Expand Up @@ -105,7 +110,7 @@ func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Is
VersionCommit: opts.VersionCommit,
metricsProjectID: metricsProjectID,
AutoscalerCron: opts.AutoscalerCron,
AssetsBucket: assetsBucket,
Assets: assetsBucket,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion admin/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type DB interface {
DeleteExpiredVirtualFiles(ctx context.Context, retention time.Duration) error

FindAsset(ctx context.Context, id string) (*Asset, error)
FindUnusedAssets(ctx context.Context, createdBefore time.Time, limit int) ([]*Asset, error)
FindUnusedAssets(ctx context.Context, limit int) ([]*Asset, error)
InsertAsset(ctx context.Context, organizationID, path, ownerID string) (*Asset, error)
DeleteAssets(ctx context.Context, ids []string) error
}
Expand Down
1 change: 0 additions & 1 deletion admin/database/postgres/migrations/0031.sql

This file was deleted.

40 changes: 11 additions & 29 deletions admin/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,38 +1580,20 @@ func (c *connection) InsertAsset(ctx context.Context, organizationID, path, owne
return res, nil
}

func (c *connection) FindUnusedAssets(ctx context.Context, createdBefore time.Time, limit int) ([]*database.Asset, error) {
query := `SELECT a.* FROM assets a
LEFT JOIN projects p
ON a.id = p.archive_asset_id
WHERE p.archive_asset_id IS NULL`
var args []any
if createdBefore.IsZero() {
query += " ORDER BY a.created_on DESC LIMIT $1"
args = []any{limit}
} else {
query += " AND a.created_on < $1 ORDER BY a.created_on DESC LIMIT $2"
args = []any{createdBefore, limit}
}

rows, err := c.getDB(ctx).QueryxContext(ctx, query, args...)
func (c *connection) FindUnusedAssets(ctx context.Context, limit int) ([]*database.Asset, error) {
var res []*database.Asset
// We skip unused assets created in last 6 hours to prevent race condition
// where somebody just created an asset but is yet to use it
err := c.getDB(ctx).SelectContext(ctx, &res, `
SELECT a.* FROM assets a
WHERE a.created_on < now() - INTERVAL '6 hours'
AND NOT EXISTS
(SELECT 1 FROM projects p WHERE p.archive_asset_id = a.id)
ORDER BY a.created_on DESC LIMIT $1
`, limit)
if err != nil {
return nil, parseErr("assets", err)
}
defer rows.Close()

var res []*database.Asset
for rows.Next() {
var asset database.Asset
err = rows.StructScan(&asset)
if err != nil {
return nil, parseErr("assets", err)
}
res = append(res, &asset)
}
if rows.Err() != nil {
return nil, parseErr("assets", rows.Err())
}
return res, nil
}

Expand Down
7 changes: 1 addition & 6 deletions admin/server/admin_rbac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"testing"

"cloud.google.com/go/storage"
"github.com/google/go-github/v50/github"
"github.com/rilldata/rill/admin"
"github.com/rilldata/rill/admin/ai"
Expand All @@ -19,7 +18,6 @@ import (
runtimeauth "github.com/rilldata/rill/runtime/server/auth"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -48,9 +46,6 @@ func TestAdmin_RBAC(t *testing.T) {

provisionerSetJSON := "{\"static\":{\"type\":\"static\",\"spec\":{\"runtimes\":[{\"host\":\"http://localhost:9091\",\"slots\":50,\"data_dir\":\"\",\"audience_url\":\"http://localhost:8081\"}]}}}"

client, err := storage.NewClient(ctx, option.WithoutAuthentication())
require.NoError(t, err)

service, err := admin.New(context.Background(),
&admin.Options{
DatabaseDriver: "postgres",
Expand All @@ -64,7 +59,7 @@ func TestAdmin_RBAC(t *testing.T) {
emailClient,
github,
ai.NewNoop(),
client.Bucket("mock"),
nil,
)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion admin/server/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *Server) CreateAsset(ctx context.Context, req *adminv1.CreateAssetReques
Headers: signingHeaders,
Expires: time.Now().Add(15 * time.Minute),
}
signedURL, err := s.admin.AssetsBucket.SignedURL(object, opts)
signedURL, err := s.admin.Assets.SignedURL(object, opts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion admin/server/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Server) generateV4GetObjectSignedURL(objectpath string) (string, error)
Expires: time.Now().Add(15 * time.Minute),
}

signedURL, err := s.admin.AssetsBucket.SignedURL(strings.TrimPrefix(u.Path, "/"), opts)
signedURL, err := s.admin.Assets.SignedURL(strings.TrimPrefix(u.Path, "/"), opts)
if err != nil {
return "", err
}
Expand Down
46 changes: 14 additions & 32 deletions admin/worker/delete_unsued_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,78 +3,60 @@ package worker
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"time"

"cloud.google.com/go/storage"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const _defaultPageSize = 20
const _unusedAssetsPageSize = 100

func (w *Worker) deleteUnusedAssets(ctx context.Context) error {
// We skip unused assets created in last 15 minutes to prevent race condition
// where somebody just created an asset but is yet to use it
createdBefore := time.Now().Add(-15 * time.Minute)
for {
// 1. Fetch unused assets
assets, err := w.admin.DB.FindUnusedAssets(ctx, createdBefore, _defaultPageSize)
assets, err := w.admin.DB.FindUnusedAssets(ctx, _unusedAssetsPageSize)
if err != nil {
return err
}
if len(assets) == 0 {
return nil
}
createdBefore = assets[len(assets)-1].CreatedOn
ids := make([]string, len(assets))

// 2. Delete objects from cloud storage
// Limit the number of concurrent deletes to 8
// TODO: Use batch API once google-cloud-go supports it
group, cctx := errgroup.WithContext(ctx)
group.SetLimit(8)
for j := 0; j < len(assets); j++ {
i := j
var ids []string
for _, asset := range assets {
asset := asset
ids = append(ids, asset.ID)
group.Go(func() error {
parsed, err := url.Parse(assets[i].Path)
parsed, err := url.Parse(asset.Path)
if err != nil {
w.logger.Warn("failed to parse asset path", zap.String("path", assets[i].Path), zap.Error(err))
return nil
return fmt.Errorf("failed to parse asset path %q: %w", asset.Path, err)
}
err = w.admin.AssetsBucket.Object(strings.TrimPrefix(parsed.Path, "/")).Delete(cctx)
err = w.admin.Assets.Object(strings.TrimPrefix(parsed.Path, "/")).Delete(cctx)
if err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
w.logger.Warn("failed to delete asset", zap.String("path", assets[i].Path), zap.Error(err))
return nil
return fmt.Errorf("failed to delete asset %q: %w", asset.Path, err)
}
// collect ids for which delete was successful or object was not found
ids[i] = assets[i].ID
return nil
})
}
_ = group.Wait()

// 3. Delete the assets in the DB
var finalIDs []string
for _, id := range ids {
if id != "" {
finalIDs = append(finalIDs, id)
}
}
if len(finalIDs) == 0 {
// No assets were safely deleted so could be an issue with google cloud storage,network etc
// we return and execute again in the next run of this job
return nil
}
err = w.admin.DB.DeleteAssets(ctx, finalIDs)
err = w.admin.DB.DeleteAssets(ctx, ids)
if err != nil {
return err
}

if len(assets) < _defaultPageSize {
if len(assets) < _unusedAssetsPageSize {
// no more assets to delete
return nil
}
// fetch again could be more unused assets
}
}
10 changes: 1 addition & 9 deletions cli/pkg/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"time"

"cloud.google.com/go/storage"
"github.com/google/go-github/v50/github"
"github.com/rilldata/rill/admin"
"github.com/rilldata/rill/admin/ai"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/rilldata/rill/runtime/pkg/ratelimit"
runtimeauth "github.com/rilldata/rill/runtime/server/auth"
"go.uber.org/zap"
"google.golang.org/api/option"
)

func AdminService(ctx context.Context, logger *zap.Logger, databaseURL string) (*admin.Service, error) {
Expand All @@ -37,12 +35,6 @@ func AdminService(ctx context.Context, logger *zap.Logger, databaseURL string) (
}

provisionerSetJSON := "{\"static\":{\"type\":\"static\",\"spec\":{\"runtimes\":[{\"host\":\"http://localhost:9091\",\"slots\":50,\"data_dir\":\"\",\"audience_url\":\"http://localhost:8081\"}]}}}"

client, err := storage.NewClient(ctx, option.WithoutAuthentication())
if err != nil {
return nil, err
}

// Init admin service
admOpts := &admin.Options{
DatabaseDriver: "postgres",
Expand All @@ -54,7 +46,7 @@ func AdminService(ctx context.Context, logger *zap.Logger, databaseURL string) (
VersionCommit: "",
}

adm, err := admin.New(ctx, admOpts, logger, issuer, emailClient, gh, ai.NewNoop(), client.Bucket("mock"))
adm, err := admin.New(ctx, admOpts, logger, issuer, emailClient, gh, ai.NewNoop(), nil)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit cbabea4

Please sign in to comment.