Skip to content

Commit

Permalink
Merge branch 'main' into cloud_billing
Browse files Browse the repository at this point in the history
  • Loading branch information
pjain1 committed Jun 28, 2024
2 parents 4d27a70 + 2f3b78a commit 4f27c74
Show file tree
Hide file tree
Showing 141 changed files with 3,934 additions and 8,215 deletions.
5 changes: 4 additions & 1 deletion admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"cloud.google.com/go/storage"
"github.com/rilldata/rill/admin/ai"
"github.com/rilldata/rill/admin/billing"
"github.com/rilldata/rill/admin/database"
Expand Down Expand Up @@ -32,6 +33,7 @@ type Service struct {
Email *email.Client
Github Github
AI ai.Client
Assets *storage.BucketHandle
Used *usedFlusher
Logger *zap.Logger
opts *Options
Expand All @@ -43,7 +45,7 @@ type Service struct {
Biller billing.Biller
}

func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Issuer, emailClient *email.Client, github Github, aiClient ai.Client, biller billing.Biller) (*Service, error) {
func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Issuer, emailClient *email.Client, github Github, aiClient ai.Client, assets *storage.BucketHandle, biller billing.Biller) (*Service, error) {
// Init db
db, err := database.Open(opts.DatabaseDriver, opts.DatabaseDSN)
if err != nil {
Expand Down Expand Up @@ -97,6 +99,7 @@ func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Is
Email: emailClient,
Github: github,
AI: aiClient,
Assets: assets,
Used: newUsedFlusher(logger, db),
Logger: logger,
opts: opts,
Expand Down
2 changes: 2 additions & 0 deletions admin/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ type DB interface {
DeleteExpiredVirtualFiles(ctx context.Context, retention time.Duration) error

FindAsset(ctx context.Context, id string) (*Asset, error)
FindUnusedAssets(ctx context.Context, limit int) ([]*Asset, error)
InsertAsset(ctx context.Context, organizationID, path, ownerID string) (*Asset, error)
DeleteAssets(ctx context.Context, ids []string) error

FindOrganizationIDsWithBilling(ctx context.Context) ([]string, error)
// CountBillingProjectsForOrganization counts the projects which are not hibernated and created before the given time
Expand Down
22 changes: 22 additions & 0 deletions admin/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,28 @@ func (c *connection) InsertAsset(ctx context.Context, organizationID, path, owne
return res, nil
}

func (c *connection) FindUnusedAssets(ctx context.Context, limit int) ([]*database.Asset, error) {
var res []*database.Asset
// We skip unused assets created in last 6 hours to prevent race condition
// where somebody just created an asset but is yet to use it
err := c.getDB(ctx).SelectContext(ctx, &res, `
SELECT a.* FROM assets a
WHERE a.created_on < now() - INTERVAL '6 hours'
AND NOT EXISTS
(SELECT 1 FROM projects p WHERE p.archive_asset_id = a.id)
ORDER BY a.created_on DESC LIMIT $1
`, limit)
if err != nil {
return nil, parseErr("assets", err)
}
return res, nil
}

func (c *connection) DeleteAssets(ctx context.Context, ids []string) error {
_, err := c.getDB(ctx).ExecContext(ctx, "DELETE FROM assets WHERE id=ANY($1)", ids)
return parseErr("asset", err)
}

func (c *connection) FindOrganizationIDsWithBilling(ctx context.Context) ([]string, error) {
var res []string
err := c.getDB(ctx).SelectContext(ctx, &res, `SELECT id FROM orgs WHERE billing_customer_id <> ''`)
Expand Down
1 change: 1 addition & 0 deletions admin/server/admin_rbac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestAdmin_RBAC(t *testing.T) {
emailClient,
github,
ai.NewNoop(),
nil,
billing.NewNoop(),
)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion admin/server/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *Server) CreateAsset(ctx context.Context, req *adminv1.CreateAssetReques
Headers: signingHeaders,
Expires: time.Now().Add(15 * time.Minute),
}
signedURL, err := s.assetsBucket.SignedURL(object, opts)
signedURL, err := s.admin.Assets.SignedURL(object, opts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion admin/server/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Server) generateV4GetObjectSignedURL(objectpath string) (string, error)
Expires: time.Now().Add(15 * time.Minute),
}

signedURL, err := s.assetsBucket.SignedURL(strings.TrimPrefix(u.Path, "/"), opts)
signedURL, err := s.admin.Assets.SignedURL(strings.TrimPrefix(u.Path, "/"), opts)
if err != nil {
return "", err
}
Expand Down
5 changes: 1 addition & 4 deletions admin/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"
"time"

"cloud.google.com/go/storage"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
Expand Down Expand Up @@ -78,7 +77,6 @@ type Server struct {
urls *externalURLs
limiter ratelimit.Limiter
activity *activity.Client
assetsBucket *storage.BucketHandle
}

var _ adminv1.AdminServiceServer = (*Server)(nil)
Expand All @@ -87,7 +85,7 @@ var _ adminv1.AIServiceServer = (*Server)(nil)

var _ adminv1.TelemetryServiceServer = (*Server)(nil)

func New(logger *zap.Logger, adm *admin.Service, issuer *runtimeauth.Issuer, limiter ratelimit.Limiter, activityClient *activity.Client, assetsBucket *storage.BucketHandle, opts *Options) (*Server, error) {
func New(logger *zap.Logger, adm *admin.Service, issuer *runtimeauth.Issuer, limiter ratelimit.Limiter, activityClient *activity.Client, opts *Options) (*Server, error) {
externalURL, err := url.Parse(opts.ExternalURL)
if err != nil {
return nil, fmt.Errorf("failed to parse external URL: %w", err)
Expand Down Expand Up @@ -144,7 +142,6 @@ func New(logger *zap.Logger, adm *admin.Service, issuer *runtimeauth.Issuer, lim
urls: newURLRegistry(opts),
limiter: limiter,
activity: activityClient,
assetsBucket: assetsBucket,
}, nil
}

Expand Down
64 changes: 64 additions & 0 deletions admin/worker/delete_unsued_assets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package worker

import (
"context"
"errors"
"fmt"
"net/url"
"strings"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
)

const _unusedAssetsPageSize = 100

func (w *Worker) deleteUnusedAssets(ctx context.Context) error {
for {
// 1. Fetch unused assets
assets, err := w.admin.DB.FindUnusedAssets(ctx, _unusedAssetsPageSize)
if err != nil {
return err
}
if len(assets) == 0 {
return nil
}

// 2. Delete objects from cloud storage
// Limit the number of concurrent deletes to 8
// TODO: Use batch API once google-cloud-go supports it
group, cctx := errgroup.WithContext(ctx)
group.SetLimit(8)
var ids []string
for _, asset := range assets {
ids = append(ids, asset.ID)
group.Go(func() error {
parsed, err := url.Parse(asset.Path)
if err != nil {
return fmt.Errorf("failed to parse asset path %q: %w", asset.Path, err)
}
err = w.admin.Assets.Object(strings.TrimPrefix(parsed.Path, "/")).Delete(cctx)
if err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
return fmt.Errorf("failed to delete asset %q: %w", asset.Path, err)
}
return nil
})
}
err = group.Wait()
if err != nil {
return err
}

// 3. Delete the assets in the DB
err = w.admin.DB.DeleteAssets(ctx, ids)
if err != nil {
return err
}

if len(assets) < _unusedAssetsPageSize {
// no more assets to delete
return nil
}
// fetch again could be more unused assets
}
}
13 changes: 10 additions & 3 deletions admin/worker/run_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (w *Worker) runAutoscaler(ctx context.Context) error {

projectOrg, err := w.admin.DB.FindOrganization(ctx, targetProject.OrganizationID)
if err != nil {
w.logger.Error("failed to find org for the project", zap.String("project_name", targetProject.Name), zap.String("org_id", targetProject.OrganizationID), zap.Error(err))
w.logger.Error("failed to autoscale: unable to find org for the project", zap.String("project_name", targetProject.Name), zap.String("org_id", targetProject.OrganizationID), zap.Error(err))
continue
}

Expand Down Expand Up @@ -75,11 +75,18 @@ func (w *Worker) runAutoscaler(ctx context.Context) error {
Annotations: targetProject.Annotations,
})
if err != nil {
w.logger.Error("failed to autoscale", zap.String("project_name", targetProject.Name), zap.String("org_name", projectOrg.Name), zap.Error(err))
w.logger.Error("failed to autoscale: error updating the project", zap.String("project_name", targetProject.Name), zap.String("org_name", projectOrg.Name), zap.Error(err))
continue
}

w.logger.Info("succeeded in autoscaling",
scaleMsg := "succeeded in autoscaling "
if updatedProject.ProdSlots > targetProject.ProdSlots {
scaleMsg += "up"
} else {
scaleMsg += "down"
}

w.logger.Info(scaleMsg,
zap.String("project_name", updatedProject.Name),
zap.Int("updated_slots", updatedProject.ProdSlots),
zap.Int("prev_slots", targetProject.ProdSlots),
Expand Down
3 changes: 3 additions & 0 deletions admin/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (w *Worker) Run(ctx context.Context) error {
group.Go(func() error {
return w.scheduleCron(ctx, "run_autoscaler", w.runAutoscaler, w.admin.AutoscalerCron)
})
group.Go(func() error {
return w.schedule(ctx, "delete_unused_assets", w.deleteUnusedAssets, 6*time.Hour)
})

if w.admin.Biller.GetReportingWorkerCron() != "" {
group.Go(func() error {
Expand Down
24 changes: 13 additions & 11 deletions cli/cmd/admin/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,17 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
aiClient = ai.NewNoop()
}

// Init AssetsBucket handle
var clientOpts []option.ClientOption
if conf.AssetsBucketGoogleCredentialsJSON != "" {
clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(conf.AssetsBucketGoogleCredentialsJSON)))
}
storageClient, err := storage.NewClient(cmd.Context(), clientOpts...)
if err != nil {
logger.Fatal("failed to create assets bucket handle", zap.Error(err))
}
assetsBucket := storageClient.Bucket(conf.AssetsBucket)

// Parse metrics project name
var metricsProjectOrg, metricsProjectName string
if conf.MetricsProject != "" {
Expand Down Expand Up @@ -262,7 +273,7 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
MetricsProjectName: metricsProjectName,
AutoscalerCron: conf.AutoscalerCron,
}
adm, err := admin.New(cmd.Context(), admOpts, logger, issuer, emailClient, gh, aiClient, biller)
adm, err := admin.New(cmd.Context(), admOpts, logger, issuer, emailClient, gh, aiClient, assetsBucket, biller)
if err != nil {
logger.Fatal("error creating service", zap.Error(err))
}
Expand Down Expand Up @@ -301,16 +312,7 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
limiter = ratelimit.NewRedis(redis.NewClient(opts))
}

var clientOpts []option.ClientOption
if conf.AssetsBucketGoogleCredentialsJSON != "" {
clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(conf.AssetsBucketGoogleCredentialsJSON)))
}
storageClient, err := storage.NewClient(cmd.Context(), clientOpts...)
if err != nil {
logger.Fatal("failed to create assets bucket handle", zap.Error(err))
}

srv, err := server.New(logger, adm, issuer, limiter, activityClient, storageClient.Bucket(conf.AssetsBucket), &server.Options{
srv, err := server.New(logger, adm, issuer, limiter, activityClient, &server.Options{
HTTPPort: conf.HTTPPort,
GRPCPort: conf.GRPCPort,
ExternalURL: conf.ExternalURL,
Expand Down
47 changes: 13 additions & 34 deletions cli/pkg/local/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ import (
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"time"

"github.com/bmatcuk/doublestar/v4"
"github.com/c2h5oh/datasize"
"github.com/rilldata/rill/cli/pkg/browser"
"github.com/rilldata/rill/cli/pkg/cmdutil"
Expand Down Expand Up @@ -203,26 +201,22 @@ func NewApp(ctx context.Context, opts *AppOptions) (*App, error) {

// If the OLAP is the default OLAP (DuckDB in stage.db), we make it relative to the project directory (not the working directory)
defaultOLAP := false
olapDSN := opts.OlapDSN
olapCfg := make(map[string]string)
if opts.OlapDriver == DefaultOLAPDriver && olapDSN == DefaultOLAPDSN {
if opts.OlapDriver == DefaultOLAPDriver && opts.OlapDSN == DefaultOLAPDSN {
defaultOLAP = true
olapDSN = path.Join(dbDirPath, olapDSN)
// Set path which overrides the duckdb's default behaviour to store duckdb data in data_dir/<instance_id>/<connector> directory which is not backward compatible
olapCfg["path"] = olapDSN
val, err := isExternalStorageEnabled(dbDirPath, vars)
val, err := isExternalStorageEnabled(vars)
if err != nil {
return nil, err
}

olapCfg["external_table_storage"] = strconv.FormatBool(val)
}

// Set default DuckDB pool size to 4
olapCfg["dsn"] = olapDSN
if opts.OlapDriver == "duckdb" {
// Set default DuckDB pool size to 4
olapCfg["pool_size"] = "4"
if !defaultOLAP {
// dsn is automatically computed by duckdb driver so we set only when non default dsn is passed
olapCfg["dsn"] = opts.OlapDSN
olapCfg["error_on_incompatible_version"] = "true"
}
}
Expand Down Expand Up @@ -621,27 +615,12 @@ func (s skipFieldZapEncoder) AddString(key, val string) {
}

// isExternalStorageEnabled determines if external storage can be enabled.
// we can't always enable `external_table_storage` if the project dir already has a db file
// it could have been created with older logic where every source was a table in the main db
func isExternalStorageEnabled(dbPath string, variables map[string]string) (bool, error) {
_, err := os.Stat(filepath.Join(dbPath, DefaultOLAPDSN))
if err != nil {
// fresh project
// check if flag explicitly passed
val, ok := variables["connector.duckdb.external_table_storage"]
if !ok {
// mark enabled by default
return true, nil
}
return strconv.ParseBool(val)
}

fsRoot := os.DirFS(dbPath)
glob := path.Clean(path.Join("./", filepath.Join("*", "version.txt")))

matches, err := doublestar.Glob(fsRoot, glob)
if err != nil {
return false, err
}
return len(matches) > 0, nil
func isExternalStorageEnabled(variables map[string]string) (bool, error) {
// check if flag explicitly passed
val, ok := variables["connector.duckdb.external_table_storage"]
if !ok {
// mark enabled by default
return true, nil
}
return strconv.ParseBool(val)
}
10 changes: 2 additions & 8 deletions cli/pkg/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"time"

"cloud.google.com/go/storage"
"github.com/google/go-github/v50/github"
"github.com/rilldata/rill/admin"
"github.com/rilldata/rill/admin/ai"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/rilldata/rill/runtime/pkg/ratelimit"
runtimeauth "github.com/rilldata/rill/runtime/server/auth"
"go.uber.org/zap"
"google.golang.org/api/option"
)

func AdminService(ctx context.Context, logger *zap.Logger, databaseURL string) (*admin.Service, error) {
Expand Down Expand Up @@ -50,7 +48,7 @@ func AdminService(ctx context.Context, logger *zap.Logger, databaseURL string) (
VersionCommit: "",
}

adm, err := admin.New(ctx, admOpts, logger, issuer, emailClient, gh, ai.NewNoop(), billing.NewNoop())
adm, err := admin.New(ctx, admOpts, logger, issuer, emailClient, gh, ai.NewNoop(), nil, billing.NewNoop())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -85,11 +83,7 @@ func AdminServer(ctx context.Context, logger *zap.Logger, adm *admin.Service) (*
}

limiter := ratelimit.NewNoop()
client, err := storage.NewClient(ctx, option.WithoutAuthentication())
if err != nil {
return nil, err
}
srv, err := server.New(logger, adm, issuer, limiter, activity.NewNoopClient(), client.Bucket("mock"), &server.Options{
srv, err := server.New(logger, adm, issuer, limiter, activity.NewNoopClient(), &server.Options{
HTTPPort: conf.HTTPPort,
GRPCPort: conf.GRPCPort,
ExternalURL: conf.ExternalURL,
Expand Down
Loading

0 comments on commit 4f27c74

Please sign in to comment.