diff --git a/api/converter/from_pb.go b/api/converter/from_pb.go index 21d0ef887..aaaa9fce9 100644 --- a/api/converter/from_pb.go +++ b/api/converter/from_pb.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/pkg/document/change" "github.com/yorkie-team/yorkie/pkg/document/crdt" @@ -200,16 +201,16 @@ func FromDocumentID(pbID string) (types.ID, error) { } // FromEventType converts the given Protobuf formats to model format. -func FromEventType(pbDocEventType api.DocEventType) (types.DocEventType, error) { +func FromEventType(pbDocEventType api.DocEventType) (events.DocEventType, error) { switch pbDocEventType { case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_CHANGED: - return types.DocumentChangedEvent, nil + return events.DocChangedEvent, nil case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED: - return types.DocumentWatchedEvent, nil + return events.DocWatchedEvent, nil case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED: - return types.DocumentUnwatchedEvent, nil + return events.DocUnwatchedEvent, nil case api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST: - return types.DocumentBroadcastEvent, nil + return events.DocBroadcastEvent, nil } return "", fmt.Errorf("%v: %w", pbDocEventType, ErrUnsupportedEventType) } diff --git a/api/converter/to_pb.go b/api/converter/to_pb.go index 0057a9e94..9b4a2c8c5 100644 --- a/api/converter/to_pb.go +++ b/api/converter/to_pb.go @@ -24,6 +24,7 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/pkg/document/change" "github.com/yorkie-team/yorkie/pkg/document/crdt" @@ -193,15 +194,15 @@ func ToVersionVector(vector time.VersionVector) (*api.VersionVector, error) { } // ToDocEventType converts the given model format to Protobuf format. -func ToDocEventType(eventType types.DocEventType) (api.DocEventType, error) { +func ToDocEventType(eventType events.DocEventType) (api.DocEventType, error) { switch eventType { - case types.DocumentChangedEvent: + case events.DocChangedEvent: return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_CHANGED, nil - case types.DocumentWatchedEvent: + case events.DocWatchedEvent: return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_WATCHED, nil - case types.DocumentUnwatchedEvent: + case events.DocUnwatchedEvent: return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_UNWATCHED, nil - case types.DocumentBroadcastEvent: + case events.DocBroadcastEvent: return api.DocEventType_DOC_EVENT_TYPE_DOCUMENT_BROADCAST, nil default: return 0, fmt.Errorf("%s: %w", eventType, ErrUnsupportedEventType) diff --git a/api/types/event.go b/api/types/event.go deleted file mode 100644 index 3d0909d87..000000000 --- a/api/types/event.go +++ /dev/null @@ -1,33 +0,0 @@ -package types - -// DocEventType represents the event that the Server delivers to the client. -type DocEventType string - -const ( - // DocumentChangedEvent is an event indicating that document is being - // modified by a change. - DocumentChangedEvent DocEventType = "document-changed" - - // DocumentWatchedEvent is an event that occurs when document is watched - // by other clients. - DocumentWatchedEvent DocEventType = "document-watched" - - // DocumentUnwatchedEvent is an event that occurs when document is - // unwatched by other clients. - DocumentUnwatchedEvent DocEventType = "document-unwatched" - - // DocumentBroadcastEvent is an event that occurs when a payload is broadcasted - // on a specific topic. - DocumentBroadcastEvent DocEventType = "document-broadcast" -) - -// DocEventBody includes additional data specific to the DocEvent. -type DocEventBody struct { - Topic string - Payload []byte -} - -// PayloadLen returns the size of the payload. -func (b *DocEventBody) PayloadLen() int { - return len(b.Payload) -} diff --git a/api/types/events/events.go b/api/types/events/events.go new file mode 100644 index 000000000..8b9562c7f --- /dev/null +++ b/api/types/events/events.go @@ -0,0 +1,70 @@ +/* + * Copyright 2025 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package events defines the events that occur in the document and the client. +package events + +import ( + "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/pkg/document/time" +) + +// DocEventType represents the type of the DocEvent. +type DocEventType string + +const ( + // DocChangedEvent is an event indicating that document is being + // modified by a change. + DocChangedEvent DocEventType = "document-changed" + + // DocWatchedEvent is an event that occurs when document is watched + // by other clients. + DocWatchedEvent DocEventType = "document-watched" + + // DocUnwatchedEvent is an event that occurs when document is + // unwatched by other clients. + DocUnwatchedEvent DocEventType = "document-unwatched" + + // DocBroadcastEvent is an event that occurs when a payload is broadcasted + // on a specific topic. + DocBroadcastEvent DocEventType = "document-broadcast" +) + +// DocEventBody includes additional data specific to the DocEvent. +type DocEventBody struct { + Topic string + Payload []byte +} + +// PayloadLen returns the size of the payload. +func (b *DocEventBody) PayloadLen() int { + return len(b.Payload) +} + +// DocEvent represents an event that occurs in the document. +type DocEvent struct { + // Type is the type of the event. + Type DocEventType + + // Publisher is the actor who published the event. + Publisher *time.ActorID + + // DocRefKey is the key of the document that the event occurred. + DocRefKey types.DocRefKey + + // Body includes additional data specific to the DocEvent. + Body DocEventBody +} diff --git a/client/client.go b/client/client.go index 254c1c32a..e36ba8e3c 100644 --- a/client/client.go +++ b/client/client.go @@ -37,6 +37,7 @@ import ( "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" "github.com/yorkie-team/yorkie/pkg/document" @@ -619,9 +620,9 @@ func handleResponse( } switch eventType { - case types.DocumentChangedEvent: + case events.DocChangedEvent: return &WatchResponse{Type: DocumentChanged}, nil - case types.DocumentWatchedEvent: + case events.DocWatchedEvent: doc.AddOnlineClient(cli.String()) if doc.Presence(cli.String()) == nil { return nil, nil @@ -633,7 +634,7 @@ func handleResponse( cli.String(): doc.Presence(cli.String()), }, }, nil - case types.DocumentUnwatchedEvent: + case events.DocUnwatchedEvent: p := doc.Presence(cli.String()) doc.RemoveOnlineClient(cli.String()) if p == nil { @@ -646,7 +647,7 @@ func handleResponse( cli.String(): p, }, }, nil - case types.DocumentBroadcastEvent: + case events.DocBroadcastEvent: eventBody := resp.Event.Body // If the handler exists, it means that the broadcast topic has been subscribed to. if handler, ok := doc.BroadcastEventHandlers()[eventBody.Topic]; ok && handler != nil { diff --git a/pkg/webhook/client.go b/pkg/webhook/client.go index 5d4e7dcb3..9fb76bde4 100644 --- a/pkg/webhook/client.go +++ b/pkg/webhook/client.go @@ -98,6 +98,7 @@ func (c *Client[Req, Res]) Send(ctx context.Context, req Req) (*Res, int, error) } defer func() { if err := resp.Body.Close(); err != nil { + // TODO(hackerwins): Consider to remove the dependency of logging. logging.From(ctx).Error(err) } }() diff --git a/server/backend/pubsub/publisher.go b/server/backend/pubsub/publisher.go index c158b2660..233c7039f 100644 --- a/server/backend/pubsub/publisher.go +++ b/server/backend/pubsub/publisher.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" + "github.com/yorkie-team/yorkie/api/types/events" "github.com/yorkie-team/yorkie/server/logging" ) @@ -41,7 +42,7 @@ func (c *loggerID) next() string { type BatchPublisher struct { logger *zap.SugaredLogger mutex gosync.Mutex - events []DocEvent + events []events.DocEvent window time.Duration closeChan chan struct{} @@ -63,7 +64,7 @@ func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublishe // Publish adds the given event to the batch. If the batch is full, it publishes // the batch. -func (bp *BatchPublisher) Publish(event DocEvent) { +func (bp *BatchPublisher) Publish(event events.DocEvent) { bp.mutex.Lock() defer bp.mutex.Unlock() diff --git a/server/backend/pubsub/pubsub.go b/server/backend/pubsub/pubsub.go index 6483f1783..f9a1b5e9c 100644 --- a/server/backend/pubsub/pubsub.go +++ b/server/backend/pubsub/pubsub.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" "github.com/yorkie-team/yorkie/pkg/cmap" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/logging" @@ -33,14 +34,6 @@ const ( publishTimeout = 100 * gotime.Millisecond ) -// DocEvent represents events that occur related to the document. -type DocEvent struct { - Type types.DocEventType - Publisher *time.ActorID - DocumentRefKey types.DocRefKey - Body types.DocEventBody -} - // Subscriptions is a map of Subscriptions. type Subscriptions struct { docKey types.DocRefKey @@ -68,7 +61,7 @@ func (s *Subscriptions) Values() []*Subscription { } // Publish publishes the given event. -func (s *Subscriptions) Publish(event DocEvent) { +func (s *Subscriptions) Publish(event events.DocEvent) { s.publisher.Publish(event) } @@ -182,13 +175,13 @@ func (m *PubSub) Unsubscribe( func (m *PubSub) Publish( ctx context.Context, publisherID *time.ActorID, - event DocEvent, + event events.DocEvent, ) { // NOTE(hackerwins): String() triggers the cache of ActorID to avoid // race condition of concurrent access to the cache. _ = event.Publisher.String() - docKey := event.DocumentRefKey + docKey := event.DocRefKey if logging.Enabled(zap.DebugLevel) { logging.From(ctx).Debugf(`Publish(%s,%s) Start`, diff --git a/server/backend/pubsub/pubsub_test.go b/server/backend/pubsub/pubsub_test.go index a3022605d..329b1d342 100644 --- a/server/backend/pubsub/pubsub_test.go +++ b/server/backend/pubsub/pubsub_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend/pubsub" ) @@ -40,10 +41,10 @@ func TestPubSub(t *testing.T) { ProjectID: types.ID("000000000000000000000000"), DocID: types.ID("000000000000000000000000"), } - docEvent := pubsub.DocEvent{ - Type: types.DocumentWatchedEvent, - Publisher: idB, - DocumentRefKey: refKey, + docEvent := events.DocEvent{ + Type: events.DocWatchedEvent, + Publisher: idB, + DocRefKey: refKey, } ctx := context.Background() diff --git a/server/backend/pubsub/subscription.go b/server/backend/pubsub/subscription.go index cfeff9bf2..7d2c53e5e 100644 --- a/server/backend/pubsub/subscription.go +++ b/server/backend/pubsub/subscription.go @@ -22,6 +22,7 @@ import ( "github.com/rs/xid" + "github.com/yorkie-team/yorkie/api/types/events" "github.com/yorkie-team/yorkie/pkg/document/time" ) @@ -31,7 +32,7 @@ type Subscription struct { subscriber *time.ActorID mu sync.Mutex closed bool - events chan DocEvent + events chan events.DocEvent } // NewSubscription creates a new instance of Subscription. @@ -39,7 +40,7 @@ func NewSubscription(subscriber *time.ActorID) *Subscription { return &Subscription{ id: xid.New().String(), subscriber: subscriber, - events: make(chan DocEvent, 1), + events: make(chan events.DocEvent, 1), closed: false, } } @@ -50,7 +51,7 @@ func (s *Subscription) ID() string { } // Events returns the DocEvent channel of this subscription. -func (s *Subscription) Events() chan DocEvent { +func (s *Subscription) Events() chan events.DocEvent { return s.events } @@ -71,7 +72,7 @@ func (s *Subscription) Close() { } // Publish publishes the given event to the subscriber. -func (s *Subscription) Publish(event DocEvent) bool { +func (s *Subscription) Publish(event events.DocEvent) bool { s.mu.Lock() defer s.mu.Unlock() diff --git a/server/packs/packs.go b/server/packs/packs.go index dcc6c11fe..7a80130ab 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" "github.com/yorkie-team/yorkie/pkg/document" "github.com/yorkie-team/yorkie/pkg/document/change" "github.com/yorkie-team/yorkie/pkg/document/key" @@ -34,7 +35,6 @@ import ( "github.com/yorkie-team/yorkie/pkg/units" "github.com/yorkie-team/yorkie/server/backend" "github.com/yorkie-team/yorkie/server/backend/database" - "github.com/yorkie-team/yorkie/server/backend/pubsub" "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/logging" ) @@ -188,10 +188,10 @@ func PushPull( be.PubSub.Publish( ctx, publisherID, - pubsub.DocEvent{ - Type: types.DocumentChangedEvent, - Publisher: publisherID, - DocumentRefKey: docRefKey, + events.DocEvent{ + Type: events.DocChangedEvent, + Publisher: publisherID, + DocRefKey: docRefKey, }, ) diff --git a/server/profiling/prometheus/metrics.go b/server/profiling/prometheus/metrics.go index 87740f483..adbc1bd78 100644 --- a/server/profiling/prometheus/metrics.go +++ b/server/profiling/prometheus/metrics.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" "github.com/yorkie-team/yorkie/internal/version" ) @@ -350,7 +351,7 @@ func (m *Metrics) RemoveWatchDocumentConnections(hostname string, project *types } // AddWatchDocumentEvents adds the number of events in document watch stream connections. -func (m *Metrics) AddWatchDocumentEvents(hostname string, project *types.Project, docEventType types.DocEventType) { +func (m *Metrics) AddWatchDocumentEvents(hostname string, project *types.Project, docEventType events.DocEventType) { m.watchDocumentEventsTotal.With(prometheus.Labels{ projectIDLabel: project.ID.String(), projectNameLabel: project.Name, @@ -361,7 +362,7 @@ func (m *Metrics) AddWatchDocumentEvents(hostname string, project *types.Project // AddWatchDocumentEventPayloadBytes adds the bytes of event payload in document watch stream connections. func (m *Metrics) AddWatchDocumentEventPayloadBytes(hostname string, project *types.Project, - docEventType types.DocEventType, bytes int) { + docEventType events.DocEventType, bytes int) { m.watchDocumentEventsTotal.With(prometheus.Labels{ projectIDLabel: project.ID.String(), projectNameLabel: project.Name, diff --git a/server/rpc/admin_server.go b/server/rpc/admin_server.go index bff582210..8be057443 100644 --- a/server/rpc/admin_server.go +++ b/server/rpc/admin_server.go @@ -25,12 +25,12 @@ import ( "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/internal/version" "github.com/yorkie-team/yorkie/pkg/document/key" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend" - "github.com/yorkie-team/yorkie/server/backend/pubsub" "github.com/yorkie-team/yorkie/server/documents" "github.com/yorkie-team/yorkie/server/logging" "github.com/yorkie-team/yorkie/server/packs" @@ -431,10 +431,10 @@ func (s *adminServer) RemoveDocumentByAdmin( s.backend.PubSub.Publish( ctx, publisherID, - pubsub.DocEvent{ - Type: types.DocumentChangedEvent, - Publisher: publisherID, - DocumentRefKey: docInfo.RefKey(), + events.DocEvent{ + Type: events.DocChangedEvent, + Publisher: publisherID, + DocRefKey: docInfo.RefKey(), }, ) diff --git a/server/rpc/server.go b/server/rpc/server.go index 8f8719b31..c14068e17 100644 --- a/server/rpc/server.go +++ b/server/rpc/server.go @@ -45,7 +45,6 @@ type Server struct { conf *Config httpServer *http.Server yorkieServiceCancel context.CancelFunc - tokenManager *auth.TokenManager } // NewServer creates a new instance of Server. @@ -130,7 +129,6 @@ func (s *Server) listenAndServe() error { if err := s.httpServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { logging.DefaultLogger().Errorf("HTTP server ListenAndServe: %v", err) } - return }() return nil } diff --git a/server/rpc/yorkie_server.go b/server/rpc/yorkie_server.go index ab1adbfb2..7625ca4b6 100644 --- a/server/rpc/yorkie_server.go +++ b/server/rpc/yorkie_server.go @@ -24,6 +24,7 @@ import ( "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/api/types/events" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/pkg/document" "github.com/yorkie-team/yorkie/pkg/document/key" @@ -572,9 +573,9 @@ func (s *yorkieServer) RemoveDocument( func (s *yorkieServer) watchDoc( ctx context.Context, clientID *time.ActorID, - documentRefKey types.DocRefKey, + docKey types.DocRefKey, ) (*pubsub.Subscription, []*time.ActorID, error) { - subscription, clientIDs, err := s.backend.PubSub.Subscribe(ctx, clientID, documentRefKey) + subscription, clientIDs, err := s.backend.PubSub.Subscribe(ctx, clientID, docKey) if err != nil { logging.From(ctx).Error(err) return nil, nil, err @@ -583,16 +584,16 @@ func (s *yorkieServer) watchDoc( s.backend.PubSub.Publish( ctx, subscription.Subscriber(), - pubsub.DocEvent{ - Type: types.DocumentWatchedEvent, - Publisher: subscription.Subscriber(), - DocumentRefKey: documentRefKey, + events.DocEvent{ + Type: events.DocWatchedEvent, + Publisher: subscription.Subscriber(), + DocRefKey: docKey, }, ) s.backend.Metrics.AddWatchDocumentEventPayloadBytes( s.backend.Config.Hostname, projects.From(ctx), - types.DocumentWatchedEvent, + events.DocWatchedEvent, 0, ) @@ -602,23 +603,22 @@ func (s *yorkieServer) watchDoc( func (s *yorkieServer) unwatchDoc( ctx context.Context, subscription *pubsub.Subscription, - documentRefKey types.DocRefKey, + docKey types.DocRefKey, ) error { - s.backend.PubSub.Unsubscribe(ctx, documentRefKey, subscription) - + s.backend.PubSub.Unsubscribe(ctx, docKey, subscription) s.backend.PubSub.Publish( ctx, subscription.Subscriber(), - pubsub.DocEvent{ - Type: types.DocumentUnwatchedEvent, - Publisher: subscription.Subscriber(), - DocumentRefKey: documentRefKey, + events.DocEvent{ + Type: events.DocUnwatchedEvent, + Publisher: subscription.Subscriber(), + DocRefKey: docKey, }, ) s.backend.Metrics.AddWatchDocumentEventPayloadBytes( s.backend.Config.Hostname, projects.From(ctx), - types.DocumentUnwatchedEvent, + events.DocUnwatchedEvent, 0, ) @@ -639,7 +639,7 @@ func (s *yorkieServer) Broadcast( if err != nil { return nil, err } - docRefKey := types.DocRefKey{ + docKey := types.DocRefKey{ ProjectID: project.ID, DocID: docID, } @@ -647,7 +647,7 @@ func (s *yorkieServer) Broadcast( docInfo, err := documents.FindDocInfoByRefKey( ctx, s.backend, - docRefKey, + docKey, ) if err != nil { return nil, err @@ -668,22 +668,25 @@ func (s *yorkieServer) Broadcast( return nil, err } - docEventType := types.DocumentBroadcastEvent s.backend.PubSub.Publish( ctx, clientID, - pubsub.DocEvent{ - Type: docEventType, - Publisher: clientID, - DocumentRefKey: docRefKey, - Body: types.DocEventBody{ + events.DocEvent{ + Type: events.DocBroadcastEvent, + Publisher: clientID, + DocRefKey: docKey, + Body: events.DocEventBody{ Topic: req.Msg.Topic, Payload: req.Msg.Payload, }, }, ) - s.backend.Metrics.AddWatchDocumentEventPayloadBytes(s.backend.Config.Hostname, project, docEventType, - len(req.Msg.Payload)) + s.backend.Metrics.AddWatchDocumentEventPayloadBytes( + s.backend.Config.Hostname, + project, + events.DocBroadcastEvent, + len(req.Msg.Payload), + ) return connect.NewResponse(&api.BroadcastResponse{}), nil }