Skip to content

Commit

Permalink
chore: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Dec 30, 2024
1 parent 5ab3717 commit 6ac93e5
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 17 deletions.
3 changes: 1 addition & 2 deletions warehouse/integrations/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/types"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -22,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/types"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
Expand Down
5 changes: 2 additions & 3 deletions warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"

"github.com/samber/lo"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -204,7 +203,7 @@ func (r *Router) Start(ctx context.Context) error {
return r.CronTracker(gCtx)
}))
g.Go(crash.NotifyWarehouse(func() error {
return r.syncRemoteSchema(gCtx)
return r.sync(gCtx)
}))
return g.Wait()
}
Expand Down
22 changes: 12 additions & 10 deletions warehouse/router/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package router

import (
"context"
"encoding/json"
"fmt"
"time"

warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/schema"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func (r *Router) syncRemoteSchema(ctx context.Context) error {
type syncSchemaRepo interface {
GetLocalSchema(ctx context.Context) (model.Schema, error)
UpdateLocalSchemaWithWarehouse(ctx context.Context, schema model.Schema) error
HasSchemaChanged(schema model.Schema) bool
FetchSchemaFromWarehouse(ctx context.Context, m schema.FetchSchemaRepo) (model.Schema, error)
}

func (r *Router) sync(ctx context.Context) error {
for {
r.configSubscriberLock.RLock()
warehouses := append([]model.Warehouse{}, r.warehouses...)
Expand All @@ -37,7 +42,7 @@ func (r *Router) syncRemoteSchema(ctx context.Context) error {
r.logger.Child("syncer"),
r.statsFactory,
)
if err := r.SyncRemoteSchema(ctx, whManager, sh); err != nil {
if err := r.syncRemoteSchema(ctx, whManager, sh); err != nil {
r.logger.Errorn("failed to sync schema", obskit.Error(err))
continue
}
Expand All @@ -52,8 +57,8 @@ func (r *Router) syncRemoteSchema(ctx context.Context) error {
}
}

func (r *Router) SyncRemoteSchema(ctx context.Context, m manager.Manager, sh *schema.Schema) error {
localSchema, err := sh.GetLocalSchema(ctx)
func (r *Router) syncRemoteSchema(ctx context.Context, m schema.FetchSchemaRepo, sh syncSchemaRepo) error {
_, err := sh.GetLocalSchema(ctx)
if err != nil {
return fmt.Errorf("fetching schema from local: %w", err)
}
Expand All @@ -63,9 +68,6 @@ func (r *Router) SyncRemoteSchema(ctx context.Context, m manager.Manager, sh *sc
return fmt.Errorf("fetching schema from warehouse: %w", err)
}

res, _ := json.Marshal(schemaFromWarehouse)
res2, _ := json.Marshal(localSchema)
r.logger.Infof("schema from warehouse %v with local schema %v", string(res), string(res2))
if sh.HasSchemaChanged(schemaFromWarehouse) {
err := sh.UpdateLocalSchemaWithWarehouse(ctx, schemaFromWarehouse)
if err != nil {
Expand Down
125 changes: 125 additions & 0 deletions warehouse/router/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package router

import (
"context"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/schema"
)

type mockFetchSchemaRepo struct {
schemaInWarehouse model.Schema
err error
}

func (m *mockFetchSchemaRepo) FetchSchema(_ context.Context) (model.Schema, error) {
if m.err != nil {
return model.Schema{}, m.err
}
return m.schemaInWarehouse, nil
}

type mockSyncSchemaRepo struct {
getLocalSchemaerr error
updateLocalSchemaErr error
fetchSchemaErr error
hasChanged bool
schemaMap map[string]model.Schema
}

func schemaKey(sourceID, destinationID, namespace string) string {
return fmt.Sprintf("%s_%s_%s", sourceID, destinationID, namespace)
}

func (m *mockSyncSchemaRepo) GetLocalSchema(_ context.Context) (model.Schema, error) {
if m.getLocalSchemaerr != nil {
return model.Schema{}, m.getLocalSchemaerr
}

return model.Schema{}, nil
}

func (m *mockSyncSchemaRepo) UpdateLocalSchemaWithWarehouse(_ context.Context, _ model.Schema) error {
if m.updateLocalSchemaErr != nil && m.hasChanged {
return m.updateLocalSchemaErr
}
return nil
}

func (m *mockSyncSchemaRepo) HasSchemaChanged(_ model.Schema) bool {
return m.hasChanged
}

func (m *mockSyncSchemaRepo) FetchSchemaFromWarehouse(_ context.Context, _ schema.FetchSchemaRepo) (model.Schema, error) {
if m.fetchSchemaErr != nil {
return nil, m.fetchSchemaErr
}

return m.schemaMap[schemaKey("test-sourceID", "test-destinationID", "test-namespace")], nil
}

func TestSync_SyncRemoteSchema(t *testing.T) {
t.Run("fetching schema from local fails", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
getLocalSchemaerr: fmt.Errorf("error fetching schema from local"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
require.True(t, errors.Is(err, mock.getLocalSchemaerr))
})
t.Run("fetching schema from warehouse fails", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
fetchSchemaErr: fmt.Errorf("error fetching schema from warehouse"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
require.True(t, errors.Is(err, mock.fetchSchemaErr))
})
t.Run("schema has changed and updating errors", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
hasChanged: true,
updateLocalSchemaErr: fmt.Errorf("error updating local schema"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
})
t.Run("schema has changed and updating errors", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
hasChanged: false,
updateLocalSchemaErr: fmt.Errorf("error updating local schema"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.NoError(t, err)
})
t.Run("fetching schema succeeds with no error", func(t *testing.T) {
mock := &mockSyncSchemaRepo{}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.NoError(t, err)
})
}
4 changes: 2 additions & 2 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type stagingFileRepo interface {
GetSchemasByIDs(ctx context.Context, ids []int64) ([]model.Schema, error)
}

type fetchSchemaRepo interface {
type FetchSchemaRepo interface {
FetchSchema(ctx context.Context) (model.Schema, error)
}

Expand Down Expand Up @@ -309,7 +309,7 @@ func (sh *Schema) GetLocalSchema(ctx context.Context) (model.Schema, error) {
// 1. Fetches schema from warehouse
// 2. Removes deprecated columns from schema
// 3. Updates local warehouse schema and unrecognized schema instance
func (sh *Schema) FetchSchemaFromWarehouse(ctx context.Context, repo fetchSchemaRepo) (model.Schema, error) {
func (sh *Schema) FetchSchemaFromWarehouse(ctx context.Context, repo FetchSchemaRepo) (model.Schema, error) {
warehouseSchema, err := repo.FetchSchema(ctx)
if err != nil {
return nil, fmt.Errorf("fetching schema: %w", err)
Expand Down

0 comments on commit 6ac93e5

Please sign in to comment.