Skip to content

Commit

Permalink
feat(subscription): write first version of workflow service
Browse files Browse the repository at this point in the history
  • Loading branch information
GAlexIHU committed Nov 19, 2024
1 parent 1b2ac2d commit 0e7a9ea
Show file tree
Hide file tree
Showing 25 changed files with 1,159 additions and 479 deletions.
109 changes: 109 additions & 0 deletions openmeter/entitlement/entitlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package entitlement

import (
"fmt"
"reflect"
"slices"
"time"

Expand Down Expand Up @@ -38,6 +39,10 @@ type MeasureUsageFromInput struct {
ts time.Time
}

func (m MeasureUsageFromInput) Equal(other MeasureUsageFromInput) bool {
return m.ts.Equal(other.Get())
}

func (m MeasureUsageFromInput) Get() time.Time {
return m.ts
}
Expand Down Expand Up @@ -96,6 +101,98 @@ type CreateEntitlementInputs struct {
SubscriptionManaged bool `json:"subscriptionManaged,omitempty"`
}

func (c CreateEntitlementInputs) Equal(other CreateEntitlementInputs) bool {
if c.Namespace != other.Namespace {
return false
}

if !reflect.DeepEqual(c.FeatureID, other.FeatureID) {
return false
}

if !reflect.DeepEqual(c.FeatureKey, other.FeatureKey) {
return false
}

if !reflect.DeepEqual(c.SubjectKey, other.SubjectKey) {
return false
}

if c.EntitlementType != other.EntitlementType {
return false
}

if !reflect.DeepEqual(c.Metadata, other.Metadata) {
return false
}

if (c.ActiveFrom == nil) != (other.ActiveFrom == nil) {
return false
}

if (c.ActiveFrom != nil && other.ActiveFrom != nil) && !c.ActiveFrom.Equal(*other.ActiveFrom) {
return false
}

if (c.ActiveTo == nil) != (other.ActiveTo == nil) {
return false
}

if (c.ActiveTo != nil && other.ActiveTo != nil) && !c.ActiveTo.Equal(*other.ActiveTo) {
return false
}

if (c.MeasureUsageFrom == nil) != (other.MeasureUsageFrom == nil) {
return false
}

if (c.MeasureUsageFrom != nil && other.MeasureUsageFrom != nil) && !c.MeasureUsageFrom.Equal(*other.MeasureUsageFrom) {
return false
}

if !reflect.DeepEqual(c.IssueAfterReset, other.IssueAfterReset) {
return false
}

if !reflect.DeepEqual(c.IssueAfterResetPriority, other.IssueAfterResetPriority) {
return false
}

if !reflect.DeepEqual(c.IsSoftLimit, other.IsSoftLimit) {
return false
}

if !reflect.DeepEqual(c.Config, other.Config) {
return false
}

if (c.UsagePeriod == nil) != (other.UsagePeriod == nil) {
return false
}

if (c.UsagePeriod != nil && other.UsagePeriod != nil) && !c.UsagePeriod.Equal(*other.UsagePeriod) {
return false
}

if !reflect.DeepEqual(c.PreserveOverageAtReset, other.PreserveOverageAtReset) {
return false
}

if c.SubscriptionManaged != other.SubscriptionManaged {
return false
}

return true
}

func (c CreateEntitlementInputs) Validate() error {
if c.FeatureID == nil && c.FeatureKey == nil {
return fmt.Errorf("feature id or key must be set")
}

return nil
}

func (c CreateEntitlementInputs) GetType() EntitlementType {
return c.EntitlementType
}
Expand Down Expand Up @@ -233,6 +330,18 @@ type GenericProperties struct {

type UsagePeriod recurrence.Recurrence

func (u UsagePeriod) Equal(other UsagePeriod) bool {
if u.Interval != other.Interval {
return false
}

if !u.Anchor.Equal(other.Anchor) {
return false
}

return true
}

// The returned period is exclusive at the end end inclusive in the start
func (u UsagePeriod) GetCurrentPeriodAt(at time.Time) (recurrence.Period, error) {
rec := recurrence.Recurrence{
Expand Down
16 changes: 14 additions & 2 deletions openmeter/subscription/entitlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package subscription
import (
"context"
"fmt"
"reflect"
"time"

"github.com/openmeterio/openmeter/openmeter/entitlement"
Expand Down Expand Up @@ -56,7 +55,20 @@ type ScheduleSubscriptionEntitlementInput struct {
}

func (s ScheduleSubscriptionEntitlementInput) Equal(other ScheduleSubscriptionEntitlementInput) bool {
return reflect.DeepEqual(s, other)
if s.SubscriptionID != other.SubscriptionID {
return false
}
if s.SubscriptionItemID != other.SubscriptionItemID {
return false
}
if !s.Cadence.Equal(other.Cadence) {
return false
}
if !s.EntitlementInputs.Equal(other.EntitlementInputs) {
return false
}

return true
}

func (s ScheduleSubscriptionEntitlementInput) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions openmeter/subscription/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func NewItemPath(phaseKey, itemKey string) PatchPath {
type Patch interface {
json.Marshaler
Applies
Validate() error
Op() PatchOperation
Path() PatchPath
}
Expand Down
16 changes: 16 additions & 0 deletions openmeter/subscription/patch/additem.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ func (a PatchAddItem) Value() subscription.SubscriptionItemSpec {
return a.CreateInput
}

func (a PatchAddItem) Validate() error {
if err := a.Path().Validate(); err != nil {
return err
}

if err := a.Op().Validate(); err != nil {
return err
}

if err := a.CreateInput.Validate(); err != nil {
return err
}

return nil
}

func (a PatchAddItem) ValueAsAny() any {
return a.CreateInput
}
Expand Down
16 changes: 16 additions & 0 deletions openmeter/subscription/patch/addphase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ func (a PatchAddPhase) ValueAsAny() any {
return a.CreateInput
}

func (a PatchAddPhase) Validate() error {
if err := a.Path().Validate(); err != nil {
return err
}

if err := a.Op().Validate(); err != nil {
return err
}

if err := a.CreateInput.Validate(); err != nil {
return err
}

return nil
}

var _ subscription.ValuePatch[subscription.CreateSubscriptionPhaseInput] = PatchAddPhase{}

func (a PatchAddPhase) ApplyTo(spec *subscription.SubscriptionSpec, actx subscription.ApplyContext) error {
Expand Down
16 changes: 16 additions & 0 deletions openmeter/subscription/patch/extendphase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ func (e PatchExtendPhase) ValueAsAny() any {
return e.Duration
}

func (e PatchExtendPhase) Validate() error {
if err := e.Path().Validate(); err != nil {
return err
}

if err := e.Op().Validate(); err != nil {
return err
}

if e.Duration.IsZero() {
return fmt.Errorf("duration cannot be zero")
}

return nil
}

var _ subscription.ValuePatch[datex.Period] = PatchExtendPhase{}

func (e PatchExtendPhase) ApplyTo(spec *subscription.SubscriptionSpec, actx subscription.ApplyContext) error {
Expand Down
12 changes: 12 additions & 0 deletions openmeter/subscription/patch/removeitem.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ func (r PatchRemoveItem) Path() subscription.PatchPath {
return subscription.NewItemPath(r.PhaseKey, r.ItemKey)
}

func (r PatchRemoveItem) Validate() error {
if err := r.Path().Validate(); err != nil {
return err
}

if err := r.Op().Validate(); err != nil {
return err
}

return nil
}

var _ subscription.Patch = PatchRemoveItem{}

func (r PatchRemoveItem) ApplyTo(spec *subscription.SubscriptionSpec, actx subscription.ApplyContext) error {
Expand Down
12 changes: 12 additions & 0 deletions openmeter/subscription/patch/removephase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ func (r PatchRemovePhase) ValueAsAny() any {
return r.RemoveInput
}

func (r PatchRemovePhase) Validate() error {
if err := r.Path().Validate(); err != nil {
return err
}

if err := r.Op().Validate(); err != nil {
return err
}

return nil
}

var _ subscription.ValuePatch[subscription.RemoveSubscriptionPhaseInput] = PatchRemovePhase{}

func (r PatchRemovePhase) ApplyTo(spec *subscription.SubscriptionSpec, actx subscription.ApplyContext) error {
Expand Down
2 changes: 1 addition & 1 deletion openmeter/subscription/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ type PlanNotFoundError struct {
Version int
}

func (e *PlanNotFoundError) Error() string {
func (e PlanNotFoundError) Error() string {
return fmt.Sprintf("plan %s@%d not found", e.Key, e.Version)
}
5 changes: 3 additions & 2 deletions openmeter/subscription/repo/subscriptionitemrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@ func (r *subscriptionItemRepo) Create(ctx context.Context, input subscription.Cr

func (r *subscriptionItemRepo) Delete(ctx context.Context, input models.NamespacedID) error {
_, err := entutils.TransactingRepo(ctx, r, func(ctx context.Context, repo *subscriptionItemRepo) (any, error) {
at := clock.Now()
err := repo.db.SubscriptionItem.UpdateOneID(input.ID).
Where(
dbsubscriptionitem.Namespace(input.Namespace),
dbsubscriptionitem.Or(
dbsubscriptionitem.DeletedAtIsNil(),
dbsubscriptionitem.DeletedAtGT(clock.Now()),
dbsubscriptionitem.DeletedAtGT(at),
),
).Exec(ctx)
).SetDeletedAt(at).Exec(ctx)

if db.IsNotFound(err) {
return nil, &subscription.ItemNotFoundError{ID: input.ID}
Expand Down
5 changes: 3 additions & 2 deletions openmeter/subscription/repo/subscriptionphaserepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ func (r *subscriptionPhaseRepo) Create(ctx context.Context, phase subscription.C

func (r *subscriptionPhaseRepo) Delete(ctx context.Context, id models.NamespacedID) error {
_, err := entutils.TransactingRepo(ctx, r, func(ctx context.Context, repo *subscriptionPhaseRepo) (any, error) {
at := clock.Now()
err := repo.db.SubscriptionPhase.UpdateOneID(id.ID).
Where(
dbsubscriptionphase.Namespace(id.Namespace),
dbsubscriptionphase.Or(
dbsubscriptionphase.DeletedAtIsNil(),
dbsubscriptionphase.DeletedAtGT(clock.Now()),
dbsubscriptionphase.DeletedAtGT(at),
),
).Exec(ctx)
).SetDeletedAt(at).Exec(ctx)
if db.IsNotFound(err) {
return nil, &subscription.PhaseNotFoundError{ID: id.ID}
}
Expand Down
8 changes: 6 additions & 2 deletions openmeter/subscription/service/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (s *service) createPhase(
return transaction.Run(ctx, s.TransactionManager, func(ctx context.Context) (subscription.SubscriptionPhaseView, error) {
res := subscription.SubscriptionPhaseView{
Spec: phaseSpec,
Items: make([]subscription.SubscriptionItemView, 0, len(phaseSpec.Items)),
Items: make(map[string]subscription.SubscriptionItemView, len(phaseSpec.Items)),
}

// First, let's create the phase itself
Expand All @@ -43,7 +43,11 @@ func (s *service) createPhase(
return res, fmt.Errorf("failed to create item: %w", err)
}

res.Items = append(res.Items, item)
if _, exists := res.Items[item.SubscriptionItem.Key]; exists {
return res, fmt.Errorf("item %s already exists", item.SubscriptionItem.Key)
}

res.Items[item.SubscriptionItem.Key] = item
}

return res, nil
Expand Down
8 changes: 0 additions & 8 deletions openmeter/subscription/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type ServiceConfig struct {
// connectors
CustomerService customer.Service
// adapters
PlanAdapter subscription.PlanAdapter
EntitlementAdapter subscription.EntitlementAdapter
DiscountAdapter subscription.DiscountAdapter
// framework
Expand Down Expand Up @@ -76,13 +75,6 @@ func (s *service) Create(ctx context.Context, namespace string, spec subscriptio
return def, &models.GenericConflictError{Message: "customer already has a subscription"}
}

// Fetch the plan, check if it exists
// We don't actually use the plan for anything, we don't validate the spec against it, but we expect it to be a valid reference.
_, err = s.PlanAdapter.GetVersion(ctx, spec.Plan.Key, spec.Plan.Version)
if err != nil {
return def, err
}

return transaction.Run(ctx, s.TransactionManager, func(ctx context.Context) (subscription.Subscription, error) {
// Create subscription entity
sub, err := s.SubscriptionRepo.Create(ctx, spec.ToCreateSubscriptionEntityInput(namespace))
Expand Down
Loading

0 comments on commit 0e7a9ea

Please sign in to comment.