Skip to content

Commit

Permalink
feat: sync team_members with casbin_rules
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jan 3, 2025
1 parent 0cdcb7e commit b32bae4
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
68 changes: 63 additions & 5 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
Expand Down Expand Up @@ -170,27 +171,69 @@ func tableUpdatesHandler(ctx context.Context) {
playbooksActionUpdateChan := notifyRouter.GetOrCreateChannel("playbook_run_actions")
permissionUpdateChan := notifyRouter.GetOrCreateChannel("permissions")
permissionGroupUpdateChan := notifyRouter.GetOrCreateChannel("permission_groups")
teamMembersUpdateChan := notifyRouter.GetOrCreateChannel("team_members")

// use a single job instance to maintain retention
pushPlaybookActionsJob := jobs.PushPlaybookActions(ctx)
pushPlaybookActionsJob.Schedule = "" // to disable jitter

for {
select {
case id := <-notificationUpdateCh:
case v := <-notificationUpdateCh:
_, id := tableActivityPayload(v)
notification.PurgeCache(id)

case id := <-playbooksUpdateChan:
case v := <-playbooksUpdateChan:
_, id := tableActivityPayload(v)
query.InvalidateCacheByID[models.Playbook](id)

case <-playbooksActionUpdateChan:
if api.UpstreamConf.Valid() {
pushPlaybookActionsJob.Run()
}

case id := <-teamsUpdateChan:
responder.PurgeCache(id)
teams.PurgeCache(id)
case v := <-teamsUpdateChan:
tgOperation, id := tableActivityPayload(v)

if tgOperation != TGOPInsert {
responder.PurgeCache(id)
teams.PurgeCache(id)
}

if tgOperation == TGOPDelete {
if ok, err := rbac.DeleteRole(id); err != nil {
ctx.Errorf("failed to delete rbac policy for team(%s): %v", id, err)
} else if ok {
if err := rbac.ReloadPolicy(); err != nil {
ctx.Errorf("failed to reload rbac policy: %v", err)
}
}
}

case v := <-teamMembersUpdateChan:
tgOperation, payload := tableActivityPayload(v)
fields := strings.Fields(payload)
if len(fields) != 2 {
ctx.Errorf("bad payload for team_members update: %s. expected (team_id person_id)", payload)
continue
}
teamID, personID := fields[0], fields[1]

switch tgOperation {
case TGOPDelete:
if err := rbac.DeleteRoleForUser(personID, teamID); err != nil {
ctx.Errorf("failed to delete team(%s)->user(%s) rbac policy: %v", teamID, personID, err)
} else if err := rbac.ReloadPolicy(); err != nil {
ctx.Errorf("failed to reload rbac policy: %v", err)
}

case TGOPInsert, TGOPUpdate:
if err := rbac.AddRoleForUser(personID, teamID); err != nil {
ctx.Errorf("failed to add team(%s)->user(%s) rbac policy: %v", teamID, personID, err)
} else if err := rbac.ReloadPolicy(); err != nil {
ctx.Errorf("failed to reload rbac policy: %v", err)
}
}

case <-permissionUpdateChan:
if err := rbac.ReloadPolicy(); err != nil {
Expand All @@ -208,3 +251,18 @@ func tableUpdatesHandler(ctx context.Context) {
}
}
}

func tableActivityPayload(payload string) (TGOP, string) {
fields := strings.Fields(payload)
derivedPayload := strings.Join(fields[1:], " ")
return TGOP(fields[0]), derivedPayload
}

// TG_OP from SQL trigger functions
type TGOP string

const (
TGOPDelete TGOP = "DELETE"
TGOPInsert TGOP = "INSERT"
TGOPUpdate TGOP = "UPDATE"
)
4 changes: 4 additions & 0 deletions rbac/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func Stop() {
}
}

func DeleteRole(role string) (bool, error) {
return enforcer.DeleteRole(role)
}

func DeleteRoleForUser(user string, role string) error {
_, err := enforcer.DeleteRoleForUser(user, role)
return err
Expand Down

0 comments on commit b32bae4

Please sign in to comment.