Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/cli-go/internal/db/diff/managed_schemas_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package diff

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/supabase/cli/internal/utils"
)

func TestManagedDiffSchemas(t *testing.T) {
t.Run("excludes base managed schemas when stripe sync disabled", func(t *testing.T) {
utils.Config.StripeSync.Enabled = false
schemas := managedDiffSchemas()
assert.Equal(t, managedSchemas, schemas)
assert.NotContains(t, schemas, "stripe")
})

t.Run("excludes stripe schema when stripe sync enabled", func(t *testing.T) {
utils.Config.StripeSync.Enabled = true
utils.Config.StripeSync.Schema = "stripe"
t.Cleanup(func() { utils.Config.StripeSync.Enabled = false })
schemas := managedDiffSchemas()
assert.Contains(t, schemas, "stripe")
// Base managed schemas are preserved and not mutated.
assert.Subset(t, schemas, managedSchemas)
assert.NotContains(t, managedSchemas, "stripe")
})

t.Run("respects a custom stripe schema name", func(t *testing.T) {
utils.Config.StripeSync.Enabled = true
utils.Config.StripeSync.Schema = "billing"
t.Cleanup(func() {
utils.Config.StripeSync.Enabled = false
utils.Config.StripeSync.Schema = "stripe"
})
schemas := managedDiffSchemas()
assert.Contains(t, schemas, "billing")
})
}
14 changes: 13 additions & 1 deletion apps/cli-go/internal/db/diff/migra.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
_ "embed"
"slices"
"strings"

"github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -56,6 +57,17 @@ var (
}
)

// managedDiffSchemas returns the schemas excluded from schema diffing. When the
// Stripe Sync Engine is enabled, its schema is owned by the engine (which
// recreates it via its own migrations), so it must be excluded too — otherwise
// db diff / db pull would try to manage tables the engine owns.
func managedDiffSchemas() []string {
if utils.Config.StripeSync.Enabled && len(utils.Config.StripeSync.Schema) > 0 {
return append(slices.Clone(managedSchemas), utils.Config.StripeSync.Schema)
}
return managedSchemas
}

// Diffs local database schema against shadow, dumps output to stdout.
func DiffSchemaMigraBash(ctx context.Context, source, target pgconn.Config, schema []string, options ...func(*pgx.ConnConfig)) (string, error) {
// Load all user defined schemas
Expand Down Expand Up @@ -134,7 +146,7 @@ func DiffSchemaMigra(ctx context.Context, source, target pgconn.Config, schema [
if len(schema) > 0 {
env = append(env, "INCLUDED_SCHEMAS="+strings.Join(schema, ","))
} else {
env = append(env, "EXCLUDED_SCHEMAS="+strings.Join(managedSchemas, ","))
env = append(env, "EXCLUDED_SCHEMAS="+strings.Join(managedDiffSchemas(), ","))
}
// Migra also executes via Edge Runtime because the TypeScript implementation
// shares the same containerized execution environment as other diff engines.
Expand Down
2 changes: 1 addition & 1 deletion apps/cli-go/internal/db/diff/pgschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func DiffPgSchema(ctx context.Context, source, target pgconn.Config, schema []st
opts = append(opts, pgschema.WithIncludeSchemas(schema...))
} else {
opts = append(opts,
pgschema.WithExcludeSchemas(managedSchemas...),
pgschema.WithExcludeSchemas(managedDiffSchemas()...),
pgschema.WithExcludeSchemas(
"topology", // unsupported due to views
"realtime", // unsupported due to partitioned table
Expand Down
4 changes: 4 additions & 0 deletions apps/cli-go/internal/db/reset/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func resetDatabase14(ctx context.Context, version string, fsys afero.Fs, options
if err := RestartDatabase(ctx, os.Stderr); err != nil {
return err
}
// Recreate the Stripe schema before applying migrations that may reference it.
if err := start.StartStripeSyncEngine(ctx, os.Stderr); err != nil {
return err
}
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{}, options...)
if err != nil {
return err
Expand Down
83 changes: 83 additions & 0 deletions apps/cli-go/internal/db/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ func SetupLocalDatabase(ctx context.Context, version string, fsys afero.Fs, w io
if err := SetupDatabase(ctx, conn, utils.DbId, w, fsys); err != nil {
return err
}
// The Stripe Sync Engine owns the `stripe` schema and recreates it via its own
// migrations on startup. Bring it up before applying user migrations so that
// migrations referencing Stripe tables (e.g. foreign keys) can resolve them.
if err := StartStripeSyncEngine(ctx, w); err != nil {
return err
}
if err := apply.MigrateAndSeed(ctx, version, conn, fsys); err != nil {
return err
}
Expand All @@ -380,6 +386,83 @@ func SetupLocalDatabase(ctx context.Context, version string, fsys afero.Fs, w io
return nil
}

// stripeSyncEnginePort is the port the Stripe Sync Engine webhook server listens
// on inside the container. The host port is configurable via config.toml.
const stripeSyncEnginePort = 8080

// StartStripeSyncEngine (re)creates the Stripe Sync Engine container and waits for
// it to become healthy. The engine runs its own migrations on startup to
// (re)create the configured schema, so it must be started before applying user
// migrations that reference Stripe tables. Any existing container is removed
// first so that `db reset` re-runs those migrations against the freshly
// recreated database. It is a no-op when the service is disabled.
func StartStripeSyncEngine(ctx context.Context, w io.Writer) error {
if !utils.Config.StripeSync.Enabled {
return nil
}
if err := utils.Docker.ContainerRemove(ctx, utils.StripeSyncEngineId, container.RemoveOptions{Force: true}); err != nil && !errdefs.IsNotFound(err) {
return errors.Errorf("failed to remove stripe sync engine container: %w", err)
}
fmt.Fprintln(w, "Starting Stripe Sync Engine...")
cfg := utils.Config.StripeSync
env := []string{
fmt.Sprintf("DATABASE_URL=postgresql://postgres:%s@%s:%d/postgres", utils.Config.Db.Password, utils.DbId, 5432),
"SCHEMA=" + cfg.Schema,
fmt.Sprintf("PORT=%d", stripeSyncEnginePort),
fmt.Sprintf("AUTO_EXPAND_LISTS=%t", cfg.AutoExpandLists),
}
// Only forward credentials that have been resolved to a concrete value;
// unset env() references are left out so the engine falls back to its own
// defaults instead of receiving the literal "env(...)" string.
for _, s := range []struct {
name string
value string
}{
{"API_KEY", cfg.ApiKey.Value},
{"STRIPE_SECRET_KEY", cfg.StripeSecretKey.Value},
{"STRIPE_WEBHOOK_SECRET", cfg.StripeWebhookSecret.Value},
} {
if len(s.value) > 0 && !strings.HasPrefix(s.value, "env(") {
env = append(env, s.name+"="+s.value)
}
}
hostPort := nat.Port(strconv.Itoa(stripeSyncEnginePort) + "/tcp")
if _, err := utils.DockerStart(
ctx,
container.Config{
Image: cfg.Image,
Env: env,
Healthcheck: &container.HealthConfig{
Test: []string{"CMD", "node", "-e",
fmt.Sprintf(`fetch("http://127.0.0.1:%d/health").then(r=>process.exit(r.ok?0:1)).catch(()=>process.exit(1))`, stripeSyncEnginePort),
},
Interval: 10 * time.Second,
Timeout: 2 * time.Second,
Retries: 3,
StartPeriod: 10 * time.Second,
},
ExposedPorts: nat.PortSet{hostPort: {}},
},
container.HostConfig{
PortBindings: nat.PortMap{hostPort: []nat.PortBinding{{
HostPort: strconv.FormatUint(uint64(cfg.Port), 10),
}}},
RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyUnlessStopped},
},
network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
utils.NetId: {
Aliases: utils.StripeSyncEngineAliases,
},
},
},
utils.StripeSyncEngineId,
); err != nil {
return err
}
return WaitForHealthyService(ctx, utils.Config.Db.HealthTimeout, utils.StripeSyncEngineId)
}

func SetupDatabase(ctx context.Context, conn *pgx.Conn, host string, w io.Writer, fsys afero.Fs) error {
if err := initSchema(ctx, conn, host, w); err != nil {
return err
Expand Down
64 changes: 64 additions & 0 deletions apps/cli-go/internal/db/start/stripe_sync_engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package start

import (
"context"
"io"
"net/http"
"testing"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/h2non/gock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/supabase/cli/internal/testing/apitest"
"github.com/supabase/cli/internal/utils"
)

func TestStartStripeSyncEngine(t *testing.T) {
t.Run("noop when disabled", func(t *testing.T) {
utils.Config.StripeSync.Enabled = false
require.NoError(t, apitest.MockDocker(utils.Docker))
defer gock.OffAll()
// Run test
err := StartStripeSyncEngine(context.Background(), io.Discard)
// Check error
assert.NoError(t, err)
// No docker interaction expected when disabled
assert.False(t, gock.HasUnmatchedRequest())
})

t.Run("recreates container and waits for healthy", func(t *testing.T) {
utils.StripeSyncEngineId = "test-stripe"
utils.NetId = "test-network"
utils.DbId = "test-db"
utils.Config.StripeSync.Enabled = true
utils.Config.StripeSync.Image = "supabase/stripe-sync-engine:test"
utils.Config.StripeSync.Port = 54328
utils.Config.StripeSync.Schema = "stripe"
t.Cleanup(func() { utils.Config.StripeSync.Enabled = false })
require.NoError(t, apitest.MockDocker(utils.Docker))
defer gock.OffAll()
// Removes any container left over from a previous run so a reset re-runs
// the engine's migrations against the freshly recreated database.
gock.New(utils.Docker.DaemonHost()).
Delete("/v" + utils.Docker.ClientVersion() + "/containers/" + utils.StripeSyncEngineId).
Reply(http.StatusOK)
apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.StripeSync.Image), utils.StripeSyncEngineId)
// Reports healthy on the first probe
gock.New(utils.Docker.DaemonHost()).
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + utils.StripeSyncEngineId + "/json").
Reply(http.StatusOK).
JSON(container.InspectResponse{ContainerJSONBase: &container.ContainerJSONBase{
State: &container.State{
Running: true,
Health: &container.Health{Status: types.Healthy},
},
}})
// Run test
err := StartStripeSyncEngine(context.Background(), io.Discard)
// Check error
assert.NoError(t, err)
assert.Empty(t, apitest.ListUnmatchedRequests())
})
}
8 changes: 8 additions & 0 deletions apps/cli-go/internal/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ func run(ctx context.Context, fsys afero.Fs, excludedContainers []string, dbConf
if err := start.StartDatabase(ctx, "", fsys, os.Stderr, options...); err != nil {
return err
}
// On a fresh database, SetupLocalDatabase already starts the Stripe Sync
// Engine before applying migrations. When the volume already exists, setup
// is skipped, so start it here to bring the service back up.
if !utils.NoBackupVolume && utils.Config.StripeSync.Enabled && !isContainerExcluded(utils.Config.StripeSync.Image, excluded) {
if err := start.StartStripeSyncEngine(ctx, os.Stderr); err != nil {
return err
}
}
}

var started []string
Expand Down
6 changes: 6 additions & 0 deletions apps/cli-go/internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type CustomName struct {
StorageS3AccessKeyId string `env:"storage.s3_access_key_id,default=S3_PROTOCOL_ACCESS_KEY_ID"`
StorageS3SecretAccessKey string `env:"storage.s3_secret_access_key,default=S3_PROTOCOL_ACCESS_KEY_SECRET"`
StorageS3Region string `env:"storage.s3_region,default=S3_PROTOCOL_REGION"`
StripeSyncEngineURL string `env:"stripe_sync_engine.url,default=STRIPE_SYNC_ENGINE_URL"`
}

func (c *CustomName) toValues(exclude ...string) map[string]string {
Expand Down Expand Up @@ -93,6 +94,10 @@ func (c *CustomName) toValues(exclude ...string) map[string]string {
values[c.StorageS3SecretAccessKey] = utils.Config.Storage.S3Credentials.SecretAccessKey
values[c.StorageS3Region] = utils.Config.Storage.S3Credentials.Region
}
stripeSyncEngineEnabled := utils.Config.StripeSync.Enabled && !slices.Contains(exclude, utils.StripeSyncEngineId) && !slices.Contains(exclude, utils.ShortContainerImageName(utils.Config.StripeSync.Image))
if stripeSyncEngineEnabled {
values[c.StripeSyncEngineURL] = fmt.Sprintf("http://%s:%d", utils.Config.Hostname, utils.Config.StripeSync.Port)
}
return values
}

Expand Down Expand Up @@ -245,6 +250,7 @@ func PrettyPrint(w io.Writer, exclude ...string) {
Items: []OutputItem{
{Label: "Studio", Value: values[names.StudioURL], Type: Link},
{Label: "Mailpit", Value: values[names.MailpitURL], Type: Link},
{Label: "Stripe Sync", Value: values[names.StripeSyncEngineURL], Type: Link},
{Label: "MCP", Value: values[names.McpURL], Type: Link},
},
},
Expand Down
71 changes: 41 additions & 30 deletions apps/cli-go/internal/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,39 @@ import (
)

var (
NetId string
DbId string
KongId string
GotrueId string
InbucketId string
RealtimeId string
RestId string
StorageId string
ImgProxyId string
DifferId string
PgmetaId string
StudioId string
EdgeRuntimeId string
LogflareId string
VectorId string
PoolerId string
NetId string
DbId string
KongId string
GotrueId string
InbucketId string
RealtimeId string
RestId string
StorageId string
ImgProxyId string
DifferId string
PgmetaId string
StudioId string
EdgeRuntimeId string
LogflareId string
VectorId string
PoolerId string
StripeSyncEngineId string

DbAliases = []string{"db", "db.supabase.internal"}
KongAliases = []string{"kong", "api.supabase.internal"}
GotrueAliases = []string{"auth"}
InbucketAliases = []string{"inbucket"}
RealtimeAliases = []string{"realtime", Config.Realtime.TenantId}
RestAliases = []string{"rest"}
StorageAliases = []string{"storage"}
ImgProxyAliases = []string{"imgproxy"}
PgmetaAliases = []string{"pg_meta"}
StudioAliases = []string{"studio"}
EdgeRuntimeAliases = []string{"edge_runtime"}
LogflareAliases = []string{"analytics"}
VectorAliases = []string{"vector"}
PoolerAliases = []string{"pooler"}
DbAliases = []string{"db", "db.supabase.internal"}
KongAliases = []string{"kong", "api.supabase.internal"}
GotrueAliases = []string{"auth"}
InbucketAliases = []string{"inbucket"}
RealtimeAliases = []string{"realtime", Config.Realtime.TenantId}
RestAliases = []string{"rest"}
StorageAliases = []string{"storage"}
ImgProxyAliases = []string{"imgproxy"}
PgmetaAliases = []string{"pg_meta"}
StudioAliases = []string{"studio"}
EdgeRuntimeAliases = []string{"edge_runtime"}
LogflareAliases = []string{"analytics"}
VectorAliases = []string{"vector"}
PoolerAliases = []string{"pooler"}
StripeSyncEngineAliases = []string{"stripe_sync_engine"}

//go:embed templates/initial_schemas/13.sql
InitialSchemaPg13Sql string
Expand Down Expand Up @@ -77,6 +79,7 @@ func UpdateDockerIds() {
LogflareId = GetId(LogflareAliases[0])
VectorId = GetId(VectorAliases[0])
PoolerId = GetId(PoolerAliases[0])
StripeSyncEngineId = GetId(StripeSyncEngineAliases[0])
}

func GetDockerIds() []string {
Expand All @@ -94,6 +97,7 @@ func GetDockerIds() []string {
LogflareId,
VectorId,
PoolerId,
StripeSyncEngineId,
}
}

Expand Down Expand Up @@ -190,6 +194,13 @@ func GetServices() types.Services {
PullPolicy: types.PullPolicyMissing,
}
}
if Config.StripeSync.Enabled {
services["stripeSyncEngine"] = types.ServiceConfig{
Name: ShortContainerImageName(Config.StripeSync.Image),
Image: GetRegistryImageUrl(Config.StripeSync.Image),
PullPolicy: types.PullPolicyMissing,
}
}
return services
}

Expand Down
Loading
Loading