Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/implement sse v2 #18

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/NorskHelsenett/ror-api/internal/utils/switchboard"
"github.com/NorskHelsenett/ror-api/internal/webserver/sse"
"github.com/NorskHelsenett/ror-api/pkg/servers/sseserver"

"github.com/NorskHelsenett/ror-api/internal/apiconfig"
"github.com/NorskHelsenett/ror-api/internal/apiconnections"
Expand Down Expand Up @@ -90,7 +91,7 @@ func main() {
<-sigs
done <- struct{}{}
}()

sseserver.StartEventServer()
rlog.Infoc(ctx, "Initializing health server")
_ = healthserver.Start(healthserver.ServerString(viper.GetString(configconsts.HEALTH_ENDPOINT)))

Expand Down
2 changes: 1 addition & 1 deletion internal/apiservices/clustersService/clusters_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func Create(ctx context.Context, input *apicontracts.Cluster) (string, error) {
rlog.Errorc(ctx, "could not send cluster created event", err, rlog.String("clusterId", clusterId))
}

sse.Server.BroadcastMessage(ssemodels.SseMessage{SSEBase: ssemodels.SSEBase{Event: ssemodels.SseType_Cluster_Created}, Data: event})
sse.Server.BroadcastMessage(ssemodels.SseMessage{Event: ssemodels.SseType_Cluster_Created, Data: event})
return clusterId, nil
}

Expand Down
12 changes: 5 additions & 7 deletions internal/models/ssemodels/sse_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ const (
SseType_ClusterOrder_Updated SseType = "clusterOrder.updated"
)

type SSEBase struct {
Event SseType `json:"event"`
}

// Deprecated: Use SseMessage instead, this is not a valid format
type Time struct {
SSEBase
Event SseType `json:"event"`
CurrentTime time.Time `json:"currentTime"`
}

type SseMessage struct {
SSEBase
Data interface{} `json:"data"`
Id string `json:"id omitempty"`
Event SseType `json:"event"`
Data interface{} `json:"data"`
}
6 changes: 2 additions & 4 deletions internal/rabbitmq/apirabbitmqhandler/cluster_order_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ func HandleClusterOrderResource(ctx context.Context, message amqp091.Delivery) e
}

payload := ssemodels.SseMessage{
SSEBase: ssemodels.SSEBase{
Event: ssemodels.SseType_ClusterOrder_Updated,
},
Data: resourceUpdateModel,
Event: ssemodels.SseType_ClusterOrder_Updated,
Data: resourceUpdateModel,
}

sse.Server.BroadcastMessage(payload)
Expand Down
15 changes: 12 additions & 3 deletions internal/routes/route_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
v2resourcescontroller "github.com/NorskHelsenett/ror-api/internal/controllers/v2/resourcescontroller"
ctrlWorkspaces "github.com/NorskHelsenett/ror-api/internal/controllers/workspacescontroller"

"github.com/NorskHelsenett/ror-api/pkg/handlers/ssehandler"
"github.com/NorskHelsenett/ror-api/pkg/middelware/ssemiddleware"

"github.com/NorskHelsenett/ror-api/internal/controllers/v2/handlerv2selfcontroller"
"github.com/NorskHelsenett/ror-api/internal/models"
"github.com/NorskHelsenett/ror-api/internal/webserver/middlewares"
Expand Down Expand Up @@ -313,8 +316,16 @@ func SetupRoutes(router *gin.Engine) {

v2 := router.Group("/v2")

v2.Use(middlewares.TimeoutMiddleware(timeoutduration))
eventsRoute := v2.Group("events", auth.AuthenticationMiddleware)
{
eventstimeout := 60 * time.Second
eventsRoute.GET("listen", ssemiddleware.SSEHeadersMiddleware(), ssehandler.HandleSSE())
eventsRoute.POST("send", middlewares.TimeoutMiddleware(eventstimeout), ssehandler.Send())
}

v2.Use(auth.AuthenticationMiddleware)
v2.Use(middlewares.TimeoutMiddleware(timeoutduration))
// Self
selfv2Route := v2.Group("self")
selfv2Route.GET("", handlerv2selfcontroller.GetSelf())
selfv2Route.POST("/apikeys", handlerv2selfcontroller.CreateOrRenewApikey())
Expand All @@ -327,8 +338,6 @@ func SetupRoutes(router *gin.Engine) {
docs.SwaggerInfo.Version = apiconfig.RorVersion.GetVersion()
router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerfiles.Handler))

v2.Use(auth.AuthenticationMiddleware)

resourceRoute := v2.Group("resources")

resourceRoute.GET("", v2resourcescontroller.GetResources())
Expand Down
8 changes: 3 additions & 5 deletions internal/webserver/sse/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (sse *SSE) Send() gin.HandlerFunc {
return
}

message := ssemodels.SseMessage{SSEBase: ssemodels.SSEBase{Event: ssemodels.SseType(input.Event)}, Data: input.Data}
message := ssemodels.SseMessage{Event: ssemodels.SseType(input.Event), Data: input.Data}
err = apiconnections.RabbitMQConnection.SendMessage(ctx, message, messagebuscontracts.Event_Broadcast, nil)
if err != nil {
rlog.Errorc(ctx, "could not send sse broadcast event", err)
Expand All @@ -267,10 +267,8 @@ func KeepAlive() {
for {
now := time.Now()
payload := ssemodels.SseMessage{
SSEBase: ssemodels.SSEBase{
Event: ssemodels.SseType_Time,
},
Data: now,
Event: ssemodels.SseType_Time,
Data: now,
}
Server.BroadcastMessage(payload)
time.Sleep(time.Second * 15)
Expand Down
127 changes: 127 additions & 0 deletions pkg/handlers/ssehandler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package ssehandler

import (
"io"
"net/http"
"sync"
"time"

aclservice "github.com/NorskHelsenett/ror-api/internal/acl/services"
"github.com/NorskHelsenett/ror-api/pkg/servers/sseserver"

aclmodels "github.com/NorskHelsenett/ror/pkg/models/aclmodels"

"github.com/NorskHelsenett/ror/pkg/helpers/rorerror"

"github.com/NorskHelsenett/ror/pkg/context/gincontext"
"github.com/NorskHelsenett/ror/pkg/context/rorcontext"

"github.com/gin-gonic/gin"
)

// @Summary Server sent events
// @Schemes
// @Description Listen to server sent events
// @Tags events
// @Accept text/event-stream
// @Produce text/event-stream
// @Success 200 {string} string "ok"
// @Failure 403 {object} rorerror.RorError
// @Failure 400 {object} rorerror.RorError
// @Failure 401 {object} rorerror.RorError
// @Failure 500 {object} rorerror.RorError
// @Router /v2/events/listen [get]
// @Security ApiKey || AccessToken
func HandleSSE() gin.HandlerFunc {
return func(c *gin.Context) {
stopChan := make(chan bool)
var writeLock sync.Mutex

ctx, cancel := gincontext.GetRorContextFromGinContext(c)
defer cancel()
identity := rorcontext.GetIdentityFromRorContext(ctx)
client := &sseserver.EventClient{
Id: sseserver.NewEventClientId(),
Identity: identity,
Connection: make(sseserver.EventClientChan),
}
sseserver.Server.NewClients <- client
// Send new connection to event server

defer func() {
stopChan <- true
}()
go func() {
for {
select {
case <-stopChan:
go func() {
for range client.Connection {
}
}()
// Send closed connection to event server
sseserver.Server.ClosedClients <- client.Id
return
default:
time.Sleep(time.Second * 1)
writeLock.Lock()
c.Writer.Write([]byte(":keepalive\n"))
c.Writer.Flush()
writeLock.Unlock()
}
}
}()

c.Stream(func(w io.Writer) bool {
select {
case msg, ok := <-client.Connection:
if ok {
writeLock.Lock()
c.SSEvent(msg.Event, msg.Data)
writeLock.Unlock()
return true
}
return false
case <-c.Request.Context().Done():
stopChan <- true
return false
}
})
}
}

func Send() gin.HandlerFunc {
return func(c *gin.Context) {
ctx, cancel := gincontext.GetRorContextFromGinContext(c)
defer cancel()
// Access check
// Scope: ror
// Subject: global
// Access: create
// TODO: check if this is the right way to do it
accessQuery := aclmodels.NewAclV2QueryAccessScopeSubject(aclmodels.Acl2ScopeRor, aclmodels.Acl2RorSubjectGlobal)
accessObject := aclservice.CheckAccessByContextAclQuery(ctx, accessQuery)
if !accessObject.Create {
c.JSON(http.StatusForbidden, "403: No access")
return
}

var input sseserver.SseEvent
err := c.BindJSON(&input)
if err != nil {
rerr := rorerror.NewRorError(http.StatusBadRequest, "Object is not valid", err)
rerr.GinLogErrorAbort(c)

Check failure on line 113 in pkg/handlers/ssehandler/handler.go

View workflow job for this annotation

GitHub Actions / build-app

rerr.GinLogErrorAbort undefined (type rorerror.RorError has no field or method GinLogErrorAbort)
return
}

sseserver.Server.Message <- sseserver.EventMessage{
Clients: sseserver.Server.Clients.GetBroadcast(),
SseEvent: sseserver.SseEvent{
Event: input.Event,
Data: input.Data,
},
}

c.JSON(http.StatusOK, nil)
}
}
13 changes: 13 additions & 0 deletions pkg/middelware/ssemiddleware/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ssemiddleware

import "github.com/gin-gonic/gin"

func SSEHeadersMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Next()
}
}
78 changes: 78 additions & 0 deletions pkg/servers/sseserver/eventclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package sseserver

import (
"sync"

identitymodels "github.com/NorskHelsenett/ror/pkg/models/identity"
)

type SseEvent struct {
Event string `json:"event"`
Data string `json:"data" validate:"required"`
}

type EventClientId string

type EventClientChan chan SseEvent

type EventClient struct {
Id EventClientId
Connection EventClientChan
Identity identitymodels.Identity
}

type EventClients struct {
clients []*EventClient
lock sync.RWMutex
}

func NewEventClients() EventClients {
return EventClients{
clients: make([]*EventClient, 0),
}
}

func (e *EventClients) Len() int {
e.lock.RLock()
defer e.lock.RUnlock()
return len(e.clients)
}

func (e *EventClients) Get(id EventClientId) *EventClient {
e.lock.RLock()
defer e.lock.RUnlock()
for _, client := range e.clients {
if client.Id == id {
return client
}
}
return nil
}

func (e *EventClients) Remove(id EventClientId) {
e.lock.Lock()
defer e.lock.Unlock()
for i, client := range e.clients {
if client.Id == id {
e.clients[i] = e.clients[len(e.clients)-1]
e.clients = e.clients[:len(e.clients)-1]
break
}
}
}

func (e *EventClients) Add(client *EventClient) {
e.lock.Lock()
defer e.lock.Unlock()
e.clients = append(e.clients, client)
}

func (e *EventClients) GetBroadcast() []EventClientId {
e.lock.RLock()
defer e.lock.RUnlock()
var clients []EventClientId
for _, client := range e.clients {
clients = append(clients, client.Id)
}
return clients
}
Loading
Loading