Skip to content

Commit

Permalink
Simplify Coordinator by Refactoring to Locker and PubSub (#1136)
Browse files Browse the repository at this point in the history
Refactored the Coordinator to focus solely on in-memory locking and PubSub
functionality, aligning with our shift to shard-based cluster architecture.
This simplification merges sync with memory package, removes server info
dependencies, and isolates PubSub into its own package, resulting in a more
streamlined and maintainable system.
  • Loading branch information
hackerwins authored Feb 4, 2025
1 parent 778d34f commit eded115
Show file tree
Hide file tree
Showing 31 changed files with 226 additions and 478 deletions.
12 changes: 6 additions & 6 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

authWebhookMaxWaitInterval time.Duration
authWebhookCacheTTL time.Duration
projectInfoCacheTTL time.Duration
projectCacheTTL time.Duration

conf = server.NewConfig()
)
Expand All @@ -65,7 +65,7 @@ func newServerCmd() *cobra.Command {

conf.Backend.AuthWebhookMaxWaitInterval = authWebhookMaxWaitInterval.String()
conf.Backend.AuthWebhookCacheTTL = authWebhookCacheTTL.String()
conf.Backend.ProjectInfoCacheTTL = projectInfoCacheTTL.String()
conf.Backend.ProjectCacheTTL = projectCacheTTL.String()

conf.Housekeeping.Interval = housekeepingInterval.String()

Expand Down Expand Up @@ -320,15 +320,15 @@ func init() {
"TTL value to set when caching authorization webhook response.",
)
cmd.Flags().IntVar(
&conf.Backend.ProjectInfoCacheSize,
&conf.Backend.ProjectCacheSize,
"project-info-cache-size",
server.DefaultProjectInfoCacheSize,
server.DefaultProjectCacheSize,
"The cache size of the project info.",
)
cmd.Flags().DurationVar(
&projectInfoCacheTTL,
&projectCacheTTL,
"project-info-cache-ttl",
server.DefaultProjectInfoCacheTTL,
server.DefaultProjectCacheTTL,
"TTL value to set when caching project info.",
)
cmd.Flags().StringVar(
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ type LRUExpireCache[K comparable, V any] struct {
}

// NewLRUExpireCache creates an expiring cache with the given size
func NewLRUExpireCache[K comparable, V any](maxSize int) (*LRUExpireCache[K, V], error) {
func NewLRUExpireCache[K comparable, V any](maxSize int) *LRUExpireCache[K, V] {
if maxSize <= 0 {
return nil, ErrInvalidMaxSize
panic(ErrInvalidMaxSize)
}

return &LRUExpireCache[K, V]{
maxSize: maxSize,
entries: map[K]*list.Element{},
}, nil
}
}

type cacheEntry[K comparable, V any] struct {
Expand Down
21 changes: 7 additions & 14 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,16 @@ import (

func TestCache(t *testing.T) {
t.Run("create lru expire cache test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)
lruCache := cache.NewLRUExpireCache[string, string](1)
assert.NotNil(t, lruCache)

lruCache, err = cache.NewLRUExpireCache[string, string](0)
assert.ErrorIs(t, err, cache.ErrInvalidMaxSize)
assert.Nil(t, lruCache)
assert.PanicsWithError(t, cache.ErrInvalidMaxSize.Error(), func() {
cache.NewLRUExpireCache[string, string](0)
})
})

t.Run("add test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)

lruCache := cache.NewLRUExpireCache[string, string](1)
lruCache.Add("request1", "response1", time.Second)
response1, ok := lruCache.Get("request1")
assert.True(t, ok)
Expand All @@ -41,9 +38,7 @@ func TestCache(t *testing.T) {
})

t.Run("get expired cache test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)

lruCache := cache.NewLRUExpireCache[string, string](1)
ttl := time.Millisecond
lruCache.Add("request", "response", ttl)

Expand All @@ -54,9 +49,7 @@ func TestCache(t *testing.T) {
})

t.Run("update expired cache test", func(t *testing.T) {
lruCache, err := cache.NewLRUExpireCache[string, string](1)
assert.NoError(t, err)

lruCache := cache.NewLRUExpireCache[string, string](1)
var ttl time.Duration
ttl = time.Minute

Expand Down
24 changes: 6 additions & 18 deletions pkg/webhook/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@ func TestHMAC(t *testing.T) {
defer testServer.Close()

t.Run("webhook client with valid HMAC key test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand All @@ -95,12 +92,9 @@ func TestHMAC(t *testing.T) {
})

t.Run("webhook client with invalid HMAC key test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand All @@ -118,12 +112,9 @@ func TestHMAC(t *testing.T) {
})

t.Run("webhook client without HMAC key test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
client := webhook.NewClient[testRequest](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand All @@ -140,12 +131,9 @@ func TestHMAC(t *testing.T) {
})

t.Run("webhook client with empty body test", func(t *testing.T) {
testCache, err := cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100)
assert.NoError(t, err)

client := webhook.NewClient[testRequest, testResponse](
client := webhook.NewClient[testRequest](
testServer.URL,
testCache,
cache.NewLRUExpireCache[string, types.Pair[int, *testResponse]](100),
webhook.Options{
CacheKeyPrefix: "testPrefix-hmac",
CacheTTL: 5 * time.Second,
Expand Down
78 changes: 30 additions & 48 deletions server/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/rs/xid"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/cache"
Expand All @@ -35,26 +32,35 @@ import (
memdb "github.com/yorkie-team/yorkie/server/backend/database/memory"
"github.com/yorkie-team/yorkie/server/backend/database/mongo"
"github.com/yorkie-team/yorkie/server/backend/housekeeping"
"github.com/yorkie-team/yorkie/server/backend/pubsub"
"github.com/yorkie-team/yorkie/server/backend/sync"
memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory"
"github.com/yorkie-team/yorkie/server/logging"
"github.com/yorkie-team/yorkie/server/profiling/prometheus"
)

// Backend manages Yorkie's backend such as Database and Coordinator. And it
// has the server status such as the information of this Server.
// Backend manages Yorkie's backend such as Database and Coordinator. It also
// provides in-memory cache, pubsub, and locker.
type Backend struct {
Config *Config
serverInfo *sync.ServerInfo
Config *Config

// AuthWebhookCache is used to cache the response of the auth webhook.
WebhookCache *cache.LRUExpireCache[string, pkgtypes.Pair[
int,
*types.AuthWebhookResponse,
]]

Metrics *prometheus.Metrics
DB database.Database
Coordinator sync.Coordinator
Background *background.Background
// PubSub is used to publish/subscribe events to/from clients.
PubSub *pubsub.PubSub
// Locker is used to lock/unlock resources.
Locker *sync.LockerManager

// Metrics is used to expose metrics.
Metrics *prometheus.Metrics
// DB is the database instance.
DB database.Database

// Background is used to manage background tasks.
Background *background.Background
// Housekeeping is used to manage background batch tasks.
Housekeeping *housekeeping.Housekeeping
}

Expand All @@ -76,27 +82,20 @@ func New(
conf.Hostname = hostname
}

serverInfo := &sync.ServerInfo{
ID: xid.New().String(),
Hostname: hostname,
UpdatedAt: time.Now(),
}

// 02. Create the auth webhook cache. The auth webhook cache is used to
// cache the response of the auth webhook.
webhookCache, err := cache.NewLRUExpireCache[string, pkgtypes.Pair[int, *types.AuthWebhookResponse]](
// 02. Create in-memory cache, pubsub, and locker.
cache := cache.NewLRUExpireCache[string, pkgtypes.Pair[int, *types.AuthWebhookResponse]](
conf.AuthWebhookCacheSize,
)
if err != nil {
return nil, err
}
locker := sync.New()
pubsub := pubsub.New()

// 03. Create the background instance. The background instance is used to
// manage background tasks.
bg := background.New(metrics)

// 04. Create the database instance. If the MongoDB configuration is given,
// create a MongoDB instance. Otherwise, create a memory database instance.
var err error
var db database.Database
if mongoConf != nil {
db, err = mongo.Dial(mongoConf)
Expand All @@ -110,13 +109,6 @@ func New(
}
}

// 05. Create the coordinator instance. The coordinator is used to manage
// the synchronization between the Yorkie servers.
// TODO(hackerwins): Implement the coordinator for a shard. For now, we
// distribute workloads to all shards per document. In the future, we
// will need to distribute workloads of a document.
coordinator := memsync.NewCoordinator(serverInfo)

// 06. Create the housekeeping instance. The housekeeping is used
// to manage keeping tasks such as deactivating inactive clients.
keeping, err := housekeeping.New(housekeepingConf)
Expand Down Expand Up @@ -144,19 +136,18 @@ func New(
}

logging.DefaultLogger().Infof(
"backend created: id: %s, db: %s",
serverInfo.ID,
"backend created: db: %s",
dbInfo,
)

return &Backend{
Config: conf,
serverInfo: serverInfo,
WebhookCache: webhookCache,
WebhookCache: cache,
Locker: locker,
PubSub: pubsub,

Metrics: metrics,
DB: db,
Coordinator: coordinator,
Background: bg,
Housekeeping: keeping,
}, nil
Expand All @@ -168,7 +159,7 @@ func (b *Backend) Start() error {
return err
}

logging.DefaultLogger().Infof("backend started: id: %s", b.serverInfo.ID)
logging.DefaultLogger().Infof("backend started")
return nil
}

Expand All @@ -180,19 +171,10 @@ func (b *Backend) Shutdown() error {

b.Background.Close()

if err := b.Coordinator.Close(); err != nil {
logging.DefaultLogger().Error(err)
}

if err := b.DB.Close(); err != nil {
logging.DefaultLogger().Error(err)
}

logging.DefaultLogger().Infof("backend stopped: id: %s", b.serverInfo.ID)
logging.DefaultLogger().Infof("backend stopped")
return nil
}

// Members returns the members of this cluster.
func (b *Backend) Members() map[string]*sync.ServerInfo {
return b.Coordinator.Members()
}
20 changes: 10 additions & 10 deletions server/backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ type Config struct {
// AuthWebhookCacheTTL is the TTL value to set when caching the authorized result.
AuthWebhookCacheTTL string `yaml:"AuthWebhookCacheTTL"`

// ProjectInfoCacheSize is the cache size of the project info.
ProjectInfoCacheSize int `yaml:"ProjectInfoCacheSize"`
// ProjectCacheSize is the cache size of the project metadata.
ProjectCacheSize int `yaml:"ProjectCacheSize"`

// ProjectInfoCacheTTL is the TTL value to set when caching the project info.
ProjectInfoCacheTTL string `yaml:"ProjectInfoCacheTTL"`
// ProjectCacheTTL is the TTL value to set when caching the project metadata.
ProjectCacheTTL string `yaml:"ProjectCacheTTL"`

// Hostname is yorkie server hostname. hostname is used by metrics.
Hostname string `yaml:"Hostname"`
Expand Down Expand Up @@ -109,10 +109,10 @@ func (c *Config) Validate() error {
)
}

if _, err := time.ParseDuration(c.ProjectInfoCacheTTL); err != nil {
if _, err := time.ParseDuration(c.ProjectCacheTTL); err != nil {
return fmt.Errorf(
`invalid argument "%s" for "--project-info-cache-ttl" flag: %w`,
c.ProjectInfoCacheTTL,
c.ProjectCacheTTL,
err,
)
}
Expand Down Expand Up @@ -153,11 +153,11 @@ func (c *Config) ParseAuthWebhookCacheTTL() time.Duration {
return result
}

// ParseProjectInfoCacheTTL returns TTL for project info cache.
func (c *Config) ParseProjectInfoCacheTTL() time.Duration {
result, err := time.ParseDuration(c.ProjectInfoCacheTTL)
// ParseProjectCacheTTL returns TTL for project metadata cache.
func (c *Config) ParseProjectCacheTTL() time.Duration {
result, err := time.ParseDuration(c.ProjectCacheTTL)
if err != nil {
fmt.Fprintln(os.Stderr, "parse project info cache ttl: %w", err)
fmt.Fprintln(os.Stderr, "parse project metadata cache ttl: %w", err)
os.Exit(1)
}

Expand Down
4 changes: 2 additions & 2 deletions server/backend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestConfig(t *testing.T) {
ClientDeactivateThreshold: "1h",
AuthWebhookMaxWaitInterval: "0ms",
AuthWebhookCacheTTL: "10s",
ProjectInfoCacheTTL: "10m",
ProjectCacheTTL: "10m",
}
assert.NoError(t, validConf.Validate())

Expand All @@ -47,7 +47,7 @@ func TestConfig(t *testing.T) {
assert.Error(t, conf3.Validate())

conf4 := validConf
conf4.ProjectInfoCacheTTL = "10 minutes"
conf4.ProjectCacheTTL = "10 minutes"
assert.Error(t, conf4.Validate())
})
}
Loading

0 comments on commit eded115

Please sign in to comment.