Skip to content

Commit

Permalink
Extract DocEvent into events Package (#1137)
Browse files Browse the repository at this point in the history
Moved DocEvent from pubsub to events package to enable broader
integration possibilities. This change prepares the system for future
webhook and message broker integrations while maintaining existing
pubsub functionality.
  • Loading branch information
hackerwins authored Feb 4, 2025
1 parent eded115 commit a6fd4f0
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 107 deletions.
11 changes: 6 additions & 5 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/crdt"
Expand Down Expand Up @@ -200,16 +201,16 @@ func FromDocumentID(pbID string) (types.ID, error) {
}

// FromEventType converts the given Protobuf formats to model format.
func FromEventType(pbDocEventType api.DocEventType) (types.DocEventType, error) {
func FromEventType(pbDocEventType api.DocEventType) (events.DocEventType, error) {
switch pbDocEventType {
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_CHANGED:
return types.DocumentChangedEvent, nil
return events.DocChangedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED:
return types.DocumentWatchedEvent, nil
return events.DocWatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED:
return types.DocumentUnwatchedEvent, nil
return events.DocUnwatchedEvent, nil
case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST:
return types.DocumentBroadcastEvent, nil
return events.DocBroadcastEvent, nil
}
return "", fmt.Errorf("%v: %w", pbDocEventType, ErrUnsupportedEventType)
}
Expand Down
11 changes: 6 additions & 5 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/crdt"
Expand Down Expand Up @@ -193,15 +194,15 @@ func ToVersionVector(vector time.VersionVector) (*api.VersionVector, error) {
}

// ToDocEventType converts the given model format to Protobuf format.
func ToDocEventType(eventType types.DocEventType) (api.DocEventType, error) {
func ToDocEventType(eventType events.DocEventType) (api.DocEventType, error) {
switch eventType {
case types.DocumentChangedEvent:
case events.DocChangedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_CHANGED, nil
case types.DocumentWatchedEvent:
case events.DocWatchedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED, nil
case types.DocumentUnwatchedEvent:
case events.DocUnwatchedEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED, nil
case types.DocumentBroadcastEvent:
case events.DocBroadcastEvent:
return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST, nil
default:
return 0, fmt.Errorf("%s: %w", eventType, ErrUnsupportedEventType)
Expand Down
33 changes: 0 additions & 33 deletions api/types/event.go

This file was deleted.

70 changes: 70 additions & 0 deletions api/types/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2025 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package events defines the events that occur in the document and the client.
package events

import (
"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/document/time"
)

// DocEventType represents the type of the DocEvent.
type DocEventType string

const (
// DocChangedEvent is an event indicating that document is being
// modified by a change.
DocChangedEvent DocEventType = "document-changed"

// DocWatchedEvent is an event that occurs when document is watched
// by other clients.
DocWatchedEvent DocEventType = "document-watched"

// DocUnwatchedEvent is an event that occurs when document is
// unwatched by other clients.
DocUnwatchedEvent DocEventType = "document-unwatched"

// DocBroadcastEvent is an event that occurs when a payload is broadcasted
// on a specific topic.
DocBroadcastEvent DocEventType = "document-broadcast"
)

// DocEventBody includes additional data specific to the DocEvent.
type DocEventBody struct {
Topic string
Payload []byte
}

// PayloadLen returns the size of the payload.
func (b *DocEventBody) PayloadLen() int {
return len(b.Payload)
}

// DocEvent represents an event that occurs in the document.
type DocEvent struct {
// Type is the type of the event.
Type DocEventType

// Publisher is the actor who published the event.
Publisher *time.ActorID

// DocRefKey is the key of the document that the event occurred.
DocRefKey types.DocRefKey

// Body includes additional data specific to the DocEvent.
Body DocEventBody
}
9 changes: 5 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect"
"github.com/yorkie-team/yorkie/pkg/document"
Expand Down Expand Up @@ -619,9 +620,9 @@ func handleResponse(
}

switch eventType {
case types.DocumentChangedEvent:
case events.DocChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
case events.DocWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
Expand All @@ -633,7 +634,7 @@ func handleResponse(
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
case events.DocUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
Expand All @@ -646,7 +647,7 @@ func handleResponse(
cli.String(): p,
},
}, nil
case types.DocumentBroadcastEvent:
case events.DocBroadcastEvent:
eventBody := resp.Event.Body
// If the handler exists, it means that the broadcast topic has been subscribed to.
if handler, ok := doc.BroadcastEventHandlers()[eventBody.Topic]; ok && handler != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/webhook/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (c *Client[Req, Res]) Send(ctx context.Context, req Req) (*Res, int, error)
}
defer func() {
if err := resp.Body.Close(); err != nil {
// TODO(hackerwins): Consider to remove the dependency of logging.
logging.From(ctx).Error(err)
}
}()
Expand Down
5 changes: 3 additions & 2 deletions server/backend/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.uber.org/zap"

"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/server/logging"
)

Expand All @@ -41,7 +42,7 @@ func (c *loggerID) next() string {
type BatchPublisher struct {
logger *zap.SugaredLogger
mutex gosync.Mutex
events []DocEvent
events []events.DocEvent

window time.Duration
closeChan chan struct{}
Expand All @@ -63,7 +64,7 @@ func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublishe

// Publish adds the given event to the batch. If the batch is full, it publishes
// the batch.
func (bp *BatchPublisher) Publish(event DocEvent) {
func (bp *BatchPublisher) Publish(event events.DocEvent) {
bp.mutex.Lock()
defer bp.mutex.Unlock()

Expand Down
15 changes: 4 additions & 11 deletions server/backend/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.uber.org/zap"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/cmap"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/logging"
Expand All @@ -33,14 +34,6 @@ const (
publishTimeout = 100 * gotime.Millisecond
)

// DocEvent represents events that occur related to the document.
type DocEvent struct {
Type types.DocEventType
Publisher *time.ActorID
DocumentRefKey types.DocRefKey
Body types.DocEventBody
}

// Subscriptions is a map of Subscriptions.
type Subscriptions struct {
docKey types.DocRefKey
Expand Down Expand Up @@ -68,7 +61,7 @@ func (s *Subscriptions) Values() []*Subscription {
}

// Publish publishes the given event.
func (s *Subscriptions) Publish(event DocEvent) {
func (s *Subscriptions) Publish(event events.DocEvent) {
s.publisher.Publish(event)
}

Expand Down Expand Up @@ -182,13 +175,13 @@ func (m *PubSub) Unsubscribe(
func (m *PubSub) Publish(
ctx context.Context,
publisherID *time.ActorID,
event DocEvent,
event events.DocEvent,
) {
// NOTE(hackerwins): String() triggers the cache of ActorID to avoid
// race condition of concurrent access to the cache.
_ = event.Publisher.String()

docKey := event.DocumentRefKey
docKey := event.DocRefKey

if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(`Publish(%s,%s) Start`,
Expand Down
9 changes: 5 additions & 4 deletions server/backend/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend/pubsub"
)
Expand All @@ -40,10 +41,10 @@ func TestPubSub(t *testing.T) {
ProjectID: types.ID("000000000000000000000000"),
DocID: types.ID("000000000000000000000000"),
}
docEvent := pubsub.DocEvent{
Type: types.DocumentWatchedEvent,
Publisher: idB,
DocumentRefKey: refKey,
docEvent := events.DocEvent{
Type: events.DocWatchedEvent,
Publisher: idB,
DocRefKey: refKey,
}

ctx := context.Background()
Expand Down
9 changes: 5 additions & 4 deletions server/backend/pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/rs/xid"

"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/document/time"
)

Expand All @@ -31,15 +32,15 @@ type Subscription struct {
subscriber *time.ActorID
mu sync.Mutex
closed bool
events chan DocEvent
events chan events.DocEvent
}

// NewSubscription creates a new instance of Subscription.
func NewSubscription(subscriber *time.ActorID) *Subscription {
return &Subscription{
id: xid.New().String(),
subscriber: subscriber,
events: make(chan DocEvent, 1),
events: make(chan events.DocEvent, 1),
closed: false,
}
}
Expand All @@ -50,7 +51,7 @@ func (s *Subscription) ID() string {
}

// Events returns the DocEvent channel of this subscription.
func (s *Subscription) Events() chan DocEvent {
func (s *Subscription) Events() chan events.DocEvent {
return s.events
}

Expand All @@ -71,7 +72,7 @@ func (s *Subscription) Close() {
}

// Publish publishes the given event to the subscriber.
func (s *Subscription) Publish(event DocEvent) bool {
func (s *Subscription) Publish(event events.DocEvent) bool {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down
10 changes: 5 additions & 5 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (
"go.uber.org/zap"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/api/types/events"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/pkg/units"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/pubsub"
"github.com/yorkie-team/yorkie/server/backend/sync"
"github.com/yorkie-team/yorkie/server/logging"
)
Expand Down Expand Up @@ -188,10 +188,10 @@ func PushPull(
be.PubSub.Publish(
ctx,
publisherID,
pubsub.DocEvent{
Type: types.DocumentChangedEvent,
Publisher: publisherID,
DocumentRefKey: docRefKey,
events.DocEvent{
Type: events.DocChangedEvent,
Publisher: publisherID,
DocRefKey: docRefKey,
},
)

Expand Down
Loading

0 comments on commit a6fd4f0

Please sign in to comment.