Skip to content

Commit

Permalink
feat: Asset cleanup (#5152)
Browse files Browse the repository at this point in the history
* assets cleanup

* assets cleanup - prevent recent assets from getting deleted

* use pagination and fail safe approach

* lint fix

* review comments

* check for error

* small change

* remove inline interface
  • Loading branch information
k-anshul committed Jun 27, 2024
1 parent 208ede5 commit edec171
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 26 deletions.
5 changes: 4 additions & 1 deletion admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"cloud.google.com/go/storage"
"github.com/rilldata/rill/admin/ai"
"github.com/rilldata/rill/admin/database"
"github.com/rilldata/rill/admin/provisioner"
Expand Down Expand Up @@ -31,6 +32,7 @@ type Service struct {
Email *email.Client
Github Github
AI ai.Client
Assets *storage.BucketHandle
Used *usedFlusher
Logger *zap.Logger
opts *Options
Expand All @@ -41,7 +43,7 @@ type Service struct {
AutoscalerCron string
}

func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Issuer, emailClient *email.Client, github Github, aiClient ai.Client) (*Service, error) {
func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Issuer, emailClient *email.Client, github Github, aiClient ai.Client, assets *storage.BucketHandle) (*Service, error) {
// Init db
db, err := database.Open(opts.DatabaseDriver, opts.DatabaseDSN)
if err != nil {
Expand Down Expand Up @@ -95,6 +97,7 @@ func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Is
Email: emailClient,
Github: github,
AI: aiClient,
Assets: assets,
Used: newUsedFlusher(logger, db),
Logger: logger,
opts: opts,
Expand Down
2 changes: 2 additions & 0 deletions admin/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ type DB interface {
DeleteExpiredVirtualFiles(ctx context.Context, retention time.Duration) error

FindAsset(ctx context.Context, id string) (*Asset, error)
FindUnusedAssets(ctx context.Context, limit int) ([]*Asset, error)
InsertAsset(ctx context.Context, organizationID, path, ownerID string) (*Asset, error)
DeleteAssets(ctx context.Context, ids []string) error
}

// Tx represents a database transaction. It can only be used to commit and rollback transactions.
Expand Down
22 changes: 22 additions & 0 deletions admin/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,28 @@ func (c *connection) InsertAsset(ctx context.Context, organizationID, path, owne
return res, nil
}

func (c *connection) FindUnusedAssets(ctx context.Context, limit int) ([]*database.Asset, error) {
var res []*database.Asset
// We skip unused assets created in last 6 hours to prevent race condition
// where somebody just created an asset but is yet to use it
err := c.getDB(ctx).SelectContext(ctx, &res, `
SELECT a.* FROM assets a
WHERE a.created_on < now() - INTERVAL '6 hours'
AND NOT EXISTS
(SELECT 1 FROM projects p WHERE p.archive_asset_id = a.id)
ORDER BY a.created_on DESC LIMIT $1
`, limit)
if err != nil {
return nil, parseErr("assets", err)
}
return res, nil
}

func (c *connection) DeleteAssets(ctx context.Context, ids []string) error {
_, err := c.getDB(ctx).ExecContext(ctx, "DELETE FROM assets WHERE id=ANY($1)", ids)
return parseErr("asset", err)
}

// projectDTO wraps database.Project, using the pgtype package to handle types that pgx can't read directly into their native Go types.
type projectDTO struct {
*database.Project
Expand Down
1 change: 1 addition & 0 deletions admin/server/admin_rbac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestAdmin_RBAC(t *testing.T) {
emailClient,
github,
ai.NewNoop(),
nil,
)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion admin/server/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *Server) CreateAsset(ctx context.Context, req *adminv1.CreateAssetReques
Headers: signingHeaders,
Expires: time.Now().Add(15 * time.Minute),
}
signedURL, err := s.assetsBucket.SignedURL(object, opts)
signedURL, err := s.admin.Assets.SignedURL(object, opts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion admin/server/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Server) generateV4GetObjectSignedURL(objectpath string) (string, error)
Expires: time.Now().Add(15 * time.Minute),
}

signedURL, err := s.assetsBucket.SignedURL(strings.TrimPrefix(u.Path, "/"), opts)
signedURL, err := s.admin.Assets.SignedURL(strings.TrimPrefix(u.Path, "/"), opts)
if err != nil {
return "", err
}
Expand Down
5 changes: 1 addition & 4 deletions admin/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"
"time"

"cloud.google.com/go/storage"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
Expand Down Expand Up @@ -78,7 +77,6 @@ type Server struct {
urls *externalURLs
limiter ratelimit.Limiter
activity *activity.Client
assetsBucket *storage.BucketHandle
}

var _ adminv1.AdminServiceServer = (*Server)(nil)
Expand All @@ -87,7 +85,7 @@ var _ adminv1.AIServiceServer = (*Server)(nil)

var _ adminv1.TelemetryServiceServer = (*Server)(nil)

func New(logger *zap.Logger, adm *admin.Service, issuer *runtimeauth.Issuer, limiter ratelimit.Limiter, activityClient *activity.Client, assetsBucket *storage.BucketHandle, opts *Options) (*Server, error) {
func New(logger *zap.Logger, adm *admin.Service, issuer *runtimeauth.Issuer, limiter ratelimit.Limiter, activityClient *activity.Client, opts *Options) (*Server, error) {
externalURL, err := url.Parse(opts.ExternalURL)
if err != nil {
return nil, fmt.Errorf("failed to parse external URL: %w", err)
Expand Down Expand Up @@ -144,7 +142,6 @@ func New(logger *zap.Logger, adm *admin.Service, issuer *runtimeauth.Issuer, lim
urls: newURLRegistry(opts),
limiter: limiter,
activity: activityClient,
assetsBucket: assetsBucket,
}, nil
}

Expand Down
64 changes: 64 additions & 0 deletions admin/worker/delete_unsued_assets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package worker

import (
"context"
"errors"
"fmt"
"net/url"
"strings"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
)

const _unusedAssetsPageSize = 100

func (w *Worker) deleteUnusedAssets(ctx context.Context) error {
for {
// 1. Fetch unused assets
assets, err := w.admin.DB.FindUnusedAssets(ctx, _unusedAssetsPageSize)
if err != nil {
return err
}
if len(assets) == 0 {
return nil
}

// 2. Delete objects from cloud storage
// Limit the number of concurrent deletes to 8
// TODO: Use batch API once google-cloud-go supports it
group, cctx := errgroup.WithContext(ctx)
group.SetLimit(8)
var ids []string
for _, asset := range assets {
ids = append(ids, asset.ID)
group.Go(func() error {
parsed, err := url.Parse(asset.Path)
if err != nil {
return fmt.Errorf("failed to parse asset path %q: %w", asset.Path, err)
}
err = w.admin.Assets.Object(strings.TrimPrefix(parsed.Path, "/")).Delete(cctx)
if err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
return fmt.Errorf("failed to delete asset %q: %w", asset.Path, err)
}
return nil
})
}
err = group.Wait()
if err != nil {
return err
}

// 3. Delete the assets in the DB
err = w.admin.DB.DeleteAssets(ctx, ids)
if err != nil {
return err
}

if len(assets) < _unusedAssetsPageSize {
// no more assets to delete
return nil
}
// fetch again could be more unused assets
}
}
3 changes: 3 additions & 0 deletions admin/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (w *Worker) Run(ctx context.Context) error {
group.Go(func() error {
return w.scheduleCron(ctx, "run_autoscaler", w.runAutoscaler, w.admin.AutoscalerCron)
})
group.Go(func() error {
return w.schedule(ctx, "delete_unused_assets", w.deleteUnusedAssets, 6*time.Hour)
})

// NOTE: Add new scheduled jobs here

Expand Down
24 changes: 13 additions & 11 deletions cli/cmd/admin/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,17 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
aiClient = ai.NewNoop()
}

// Init AssetsBucket handle
var clientOpts []option.ClientOption
if conf.AssetsBucketGoogleCredentialsJSON != "" {
clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(conf.AssetsBucketGoogleCredentialsJSON)))
}
storageClient, err := storage.NewClient(cmd.Context(), clientOpts...)
if err != nil {
logger.Fatal("failed to create assets bucket handle", zap.Error(err))
}
assetsBucket := storageClient.Bucket(conf.AssetsBucket)

// Parse metrics project name
var metricsProjectOrg, metricsProjectName string
if conf.MetricsProject != "" {
Expand All @@ -253,7 +264,7 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
MetricsProjectName: metricsProjectName,
AutoscalerCron: conf.AutoscalerCron,
}
adm, err := admin.New(cmd.Context(), admOpts, logger, issuer, emailClient, gh, aiClient)
adm, err := admin.New(cmd.Context(), admOpts, logger, issuer, emailClient, gh, aiClient, assetsBucket)
if err != nil {
logger.Fatal("error creating service", zap.Error(err))
}
Expand Down Expand Up @@ -292,16 +303,7 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
limiter = ratelimit.NewRedis(redis.NewClient(opts))
}

var clientOpts []option.ClientOption
if conf.AssetsBucketGoogleCredentialsJSON != "" {
clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(conf.AssetsBucketGoogleCredentialsJSON)))
}
storageClient, err := storage.NewClient(cmd.Context(), clientOpts...)
if err != nil {
logger.Fatal("failed to create assets bucket handle", zap.Error(err))
}

srv, err := server.New(logger, adm, issuer, limiter, activityClient, storageClient.Bucket(conf.AssetsBucket), &server.Options{
srv, err := server.New(logger, adm, issuer, limiter, activityClient, &server.Options{
HTTPPort: conf.HTTPPort,
GRPCPort: conf.GRPCPort,
ExternalURL: conf.ExternalURL,
Expand Down
10 changes: 2 additions & 8 deletions cli/pkg/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"time"

"cloud.google.com/go/storage"
"github.com/google/go-github/v50/github"
"github.com/rilldata/rill/admin"
"github.com/rilldata/rill/admin/ai"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/rilldata/rill/runtime/pkg/ratelimit"
runtimeauth "github.com/rilldata/rill/runtime/server/auth"
"go.uber.org/zap"
"google.golang.org/api/option"
)

func AdminService(ctx context.Context, logger *zap.Logger, databaseURL string) (*admin.Service, error) {
Expand Down Expand Up @@ -49,7 +47,7 @@ func AdminService(ctx context.Context, logger *zap.Logger, databaseURL string) (
VersionCommit: "",
}

adm, err := admin.New(ctx, admOpts, logger, issuer, emailClient, gh, ai.NewNoop())
adm, err := admin.New(ctx, admOpts, logger, issuer, emailClient, gh, ai.NewNoop(), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -84,11 +82,7 @@ func AdminServer(ctx context.Context, logger *zap.Logger, adm *admin.Service) (*
}

limiter := ratelimit.NewNoop()
client, err := storage.NewClient(ctx, option.WithoutAuthentication())
if err != nil {
return nil, err
}
srv, err := server.New(logger, adm, issuer, limiter, activity.NewNoopClient(), client.Bucket("mock"), &server.Options{
srv, err := server.New(logger, adm, issuer, limiter, activity.NewNoopClient(), &server.Options{
HTTPPort: conf.HTTPPort,
GRPCPort: conf.GRPCPort,
ExternalURL: conf.ExternalURL,
Expand Down

0 comments on commit edec171

Please sign in to comment.