Skip to content

[management] Remove store locks #4038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions management/server/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,6 @@ func (am *DefaultAccountManager) GetIdpManager() idp.Manager {
// User that performs the update has to belong to the account.
// Returns an updated Settings
func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, accountID, userID string, newSettings *types.Settings) (*types.Settings, error) {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Settings, operations.Update)
if err != nil {
return nil, fmt.Errorf("failed to validate user permissions: %w", err)
Expand Down Expand Up @@ -460,8 +457,6 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(ctx context.Context, acc
ctx := context.WithValue(ctx, nbcontext.AccountIDKey, accountID)
//nolint
ctx = context.WithValue(ctx, hook.ExecutionContextKey, fmt.Sprintf("%s-PEER-EXPIRATION", hook.SystemSource))
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

expiredPeers, err := am.getExpiredPeers(ctx, accountID)
if err != nil {
Expand Down Expand Up @@ -497,9 +492,6 @@ func (am *DefaultAccountManager) schedulePeerLoginExpiration(ctx context.Context
// peerInactivityExpirationJob marks login expired for all inactive peers and returns the minimum duration in which the next peer of the account will expire by inactivity if found
func (am *DefaultAccountManager) peerInactivityExpirationJob(ctx context.Context, accountID string) func() (time.Duration, bool) {
return func() (time.Duration, bool) {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

inactivePeers, err := am.getInactivePeers(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed getting inactive peers for account %s", accountID)
Expand Down Expand Up @@ -640,8 +632,6 @@ func (am *DefaultAccountManager) isCacheCold(ctx context.Context, store cacheSto

// DeleteAccount deletes an account and all its users from local store and from the remote IDP if the requester is an admin and account owner
func (am *DefaultAccountManager) DeleteAccount(ctx context.Context, accountID, userID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()
account, err := am.Store.GetAccount(ctx, accountID)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that here we can have possible race. somewhere between user are being removed and actual removal of an account, still possible that user could be created

return err
Expand Down Expand Up @@ -1007,9 +997,6 @@ func (am *DefaultAccountManager) updateAccountDomainAttributesIfNotUpToDate(ctx
return nil
}

unlockAccount := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlockAccount()

Comment on lines -1010 to -1012
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that here better to add a transaction

accountDomain, domainCategory, err := am.Store.GetAccountDomainAndCategory(ctx, store.LockingStrengthShare, accountID)
if err != nil {
log.WithContext(ctx).Errorf("error getting account domain and category: %v", err)
Expand Down Expand Up @@ -1102,9 +1089,6 @@ func (am *DefaultAccountManager) addNewPrivateAccount(ctx context.Context, domai
}

func (am *DefaultAccountManager) addNewUserToDomainAccount(ctx context.Context, domainAccountID string, userAuth nbcontext.UserAuth) (string, error) {
unlockAccount := am.Store.AcquireWriteLockByUID(ctx, domainAccountID)
defer unlockAccount()

newUser := types.NewRegularUser(userAuth.UserId)
newUser.AccountID = domainAccountID
err := am.Store.SaveUser(ctx, store.LockingStrengthUpdate, newUser)
Expand Down Expand Up @@ -1251,13 +1235,6 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
return nil
}

unlockAccount := am.Store.AcquireWriteLockByUID(ctx, userAuth.AccountId)
defer func() {
if unlockAccount != nil {
unlockAccount()
}
}()

var addNewGroups []string
var removeOldGroups []string
var hasChanges bool
Expand Down Expand Up @@ -1326,8 +1303,6 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
return fmt.Errorf("error incrementing network serial: %w", err)
}
}
unlockAccount()
unlockAccount = nil

return nil
})
Expand Down Expand Up @@ -1542,11 +1517,6 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
log.WithContext(ctx).Debugf("SyncAndMarkPeer: took %v", time.Since(start))
}()

accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
defer accountUnlock()
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
defer peerUnlock()

peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
if err != nil {
return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
Expand All @@ -1561,11 +1531,6 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
}

func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
defer accountUnlock()
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
defer peerUnlock()

err := am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID)
if err != nil {
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
Expand All @@ -1581,12 +1546,6 @@ func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey st
return err
}

unlock := am.Store.AcquireReadLockByUID(ctx, accountID)
defer unlock()

unlockPeer := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
defer unlockPeer()

_, _, _, err = am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, UpdateAccountPeers: true}, accountID)
if err != nil {
return mapError(ctx, err)
Expand Down
16 changes: 0 additions & 16 deletions management/server/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ func (am *DefaultAccountManager) GetGroupByName(ctx context.Context, groupName,

// SaveGroup object of the peers
func (am *DefaultAccountManager) SaveGroup(ctx context.Context, accountID, userID string, newGroup *types.Group, create bool) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()
return am.SaveGroups(ctx, accountID, userID, []*types.Group{newGroup}, create)
}

Expand Down Expand Up @@ -202,8 +200,6 @@ func (am *DefaultAccountManager) prepareGroupEvents(ctx context.Context, transac

// DeleteGroup object of the peers.
func (am *DefaultAccountManager) DeleteGroup(ctx context.Context, accountID, userID, groupID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()
return am.DeleteGroups(ctx, accountID, userID, []string{groupID})
}

Expand Down Expand Up @@ -262,9 +258,6 @@ func (am *DefaultAccountManager) DeleteGroups(ctx context.Context, accountID, us

// GroupAddPeer appends peer to the group
func (am *DefaultAccountManager) GroupAddPeer(ctx context.Context, accountID, groupID, peerID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

var group *types.Group
var updateAccountPeers bool
var err error
Expand Down Expand Up @@ -303,9 +296,6 @@ func (am *DefaultAccountManager) GroupAddPeer(ctx context.Context, accountID, gr

// GroupAddResource appends resource to the group
func (am *DefaultAccountManager) GroupAddResource(ctx context.Context, accountID, groupID string, resource types.Resource) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

var group *types.Group
var updateAccountPeers bool
var err error
Expand Down Expand Up @@ -344,9 +334,6 @@ func (am *DefaultAccountManager) GroupAddResource(ctx context.Context, accountID

// GroupDeletePeer removes peer from the group
func (am *DefaultAccountManager) GroupDeletePeer(ctx context.Context, accountID, groupID, peerID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

var group *types.Group
var updateAccountPeers bool
var err error
Expand Down Expand Up @@ -385,9 +372,6 @@ func (am *DefaultAccountManager) GroupDeletePeer(ctx context.Context, accountID,

// GroupDeleteResource removes resource from the group
func (am *DefaultAccountManager) GroupDeleteResource(ctx context.Context, accountID, groupID string, resource types.Resource) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

var group *types.Group
var updateAccountPeers bool
var err error
Expand Down
3 changes: 0 additions & 3 deletions management/server/integrated_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
return errors.New("invalid groups")
}

unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

return am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
a, err := transaction.GetAccountByUser(ctx, userID)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions management/server/nameserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ func (am *DefaultAccountManager) GetNameServerGroup(ctx context.Context, account

// CreateNameServerGroup creates and saves a new nameserver group
func (am *DefaultAccountManager) CreateNameServerGroup(ctx context.Context, accountID string, name, description string, nameServerList []nbdns.NameServer, groups []string, primary bool, domains []string, enabled bool, userID string, searchDomainEnabled bool) (*nbdns.NameServerGroup, error) {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Nameservers, operations.Create)
if err != nil {
return nil, status.NewPermissionValidationError(err)
Expand Down Expand Up @@ -94,9 +91,6 @@ func (am *DefaultAccountManager) CreateNameServerGroup(ctx context.Context, acco

// SaveNameServerGroup saves nameserver group
func (am *DefaultAccountManager) SaveNameServerGroup(ctx context.Context, accountID, userID string, nsGroupToSave *nbdns.NameServerGroup) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

if nsGroupToSave == nil {
return status.Errorf(status.InvalidArgument, "nameserver group provided is nil")
}
Expand Down Expand Up @@ -148,9 +142,6 @@ func (am *DefaultAccountManager) SaveNameServerGroup(ctx context.Context, accoun

// DeleteNameServerGroup deletes nameserver group with nsGroupID
func (am *DefaultAccountManager) DeleteNameServerGroup(ctx context.Context, accountID, nsGroupID, userID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Nameservers, operations.Delete)
if err != nil {
return status.NewPermissionValidationError(err)
Expand Down
9 changes: 0 additions & 9 deletions management/server/networks/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ func (m *managerImpl) CreateNetwork(ctx context.Context, userID string, network

network.ID = xid.New().String()

unlock := m.store.AcquireWriteLockByUID(ctx, network.AccountID)
defer unlock()

err = m.store.SaveNetwork(ctx, store.LockingStrengthUpdate, network)
if err != nil {
return nil, fmt.Errorf("failed to save network: %w", err)
Expand Down Expand Up @@ -104,9 +101,6 @@ func (m *managerImpl) UpdateNetwork(ctx context.Context, userID string, network
return nil, status.NewPermissionDeniedError()
}

unlock := m.store.AcquireWriteLockByUID(ctx, network.AccountID)
defer unlock()

Comment on lines -107 to -109
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need to add a transaction here, as some changes could be made between select and actual update

_, err = m.store.GetNetworkByID(ctx, store.LockingStrengthUpdate, network.AccountID, network.ID)
if err != nil {
return nil, fmt.Errorf("failed to get network: %w", err)
Expand All @@ -131,9 +125,6 @@ func (m *managerImpl) DeleteNetwork(ctx context.Context, accountID, userID, netw
return fmt.Errorf("failed to get network: %w", err)
}

unlock := m.store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

var eventsToStore []func()
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
resources, err := transaction.GetNetworkResourcesByNetID(ctx, store.LockingStrengthUpdate, accountID, networkID)
Expand Down
9 changes: 0 additions & 9 deletions management/server/networks/resources/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ func (m *managerImpl) CreateResource(ctx context.Context, userID string, resourc
return nil, fmt.Errorf("failed to create new network resource: %w", err)
}

unlock := m.store.AcquireWriteLockByUID(ctx, resource.AccountID)
defer unlock()

var eventsToStore []func()
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
_, err = transaction.GetNetworkResourceByName(ctx, store.LockingStrengthShare, resource.AccountID, resource.Name)
Expand Down Expand Up @@ -204,9 +201,6 @@ func (m *managerImpl) UpdateResource(ctx context.Context, userID string, resourc
resource.Domain = domain
resource.Prefix = prefix

unlock := m.store.AcquireWriteLockByUID(ctx, resource.AccountID)
defer unlock()

var eventsToStore []func()
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
network, err := transaction.GetNetworkByID(ctx, store.LockingStrengthUpdate, resource.AccountID, resource.NetworkID)
Expand Down Expand Up @@ -315,9 +309,6 @@ func (m *managerImpl) DeleteResource(ctx context.Context, accountID, userID, net
return status.NewPermissionDeniedError()
}

unlock := m.store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

var events []func()
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
events, err = m.DeleteResourceInTransaction(ctx, transaction, accountID, userID, networkID, resourceID)
Expand Down
9 changes: 0 additions & 9 deletions management/server/networks/routers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ func (m *managerImpl) CreateRouter(ctx context.Context, userID string, router *t
return nil, status.NewPermissionDeniedError()
}

unlock := m.store.AcquireWriteLockByUID(ctx, router.AccountID)
defer unlock()

var network *networkTypes.Network
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
network, err = transaction.GetNetworkByID(ctx, store.LockingStrengthShare, router.AccountID, router.NetworkID)
Expand Down Expand Up @@ -157,9 +154,6 @@ func (m *managerImpl) UpdateRouter(ctx context.Context, userID string, router *t
return nil, status.NewPermissionDeniedError()
}

unlock := m.store.AcquireWriteLockByUID(ctx, router.AccountID)
defer unlock()

var network *networkTypes.Network
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
network, err = transaction.GetNetworkByID(ctx, store.LockingStrengthShare, router.AccountID, router.NetworkID)
Expand Down Expand Up @@ -203,9 +197,6 @@ func (m *managerImpl) DeleteRouter(ctx context.Context, accountID, userID, netwo
return status.NewPermissionDeniedError()
}

unlock := m.store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

var event func()
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
event, err = m.DeleteRouterInTransaction(ctx, transaction, accountID, userID, networkID, routerID)
Expand Down
28 changes: 0 additions & 28 deletions management/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ func updatePeerStatusAndLocation(ctx context.Context, geo geolocation.Geolocatio

// UpdatePeer updates peer. Only Peer.Name, Peer.SSHEnabled, Peer.LoginExpirationEnabled and Peer.InactivityExpirationEnabled can be updated.
func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, userID string, update *nbpeer.Peer) (*nbpeer.Peer, error) {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Update)
if err != nil {
return nil, status.NewPermissionValidationError(err)
Expand Down Expand Up @@ -324,9 +321,6 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user

// DeletePeer removes peer from the account by its IP
func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
if err != nil {
return status.NewPermissionValidationError(err)
Expand Down Expand Up @@ -476,13 +470,6 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
return nil, nil, nil, status.Errorf(status.NotFound, "failed adding new peer: account not found")
}

unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer func() {
if unlock != nil {
unlock()
}
}()

// This is a handling for the case when the same machine (with the same WireGuard pub key) tries to register twice.
// Such case is possible when AddPeer function takes long time to finish after AcquireWriteLockByUID (e.g., database is slow)
// and the peer disconnects with a timeout and tries to register again.
Expand Down Expand Up @@ -663,9 +650,6 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s

am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)

unlock()
unlock = nil

if updateAccountPeers {
am.BufferUpdateAccountPeers(ctx, accountID)
}
Expand Down Expand Up @@ -805,15 +789,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
}
}

unlockAccount := am.Store.AcquireReadLockByUID(ctx, accountID)
defer unlockAccount()
unlockPeer := am.Store.AcquireWriteLockByUID(ctx, login.WireGuardPubKey)
defer func() {
if unlockPeer != nil {
unlockPeer()
}
}()

var peer *nbpeer.Peer
var updateRemotePeers bool
var isRequiresApproval bool
Expand Down Expand Up @@ -894,9 +869,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
return nil, nil, nil, err
}

unlockPeer()
unlockPeer = nil

if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
am.BufferUpdateAccountPeers(ctx, accountID)
}
Expand Down
6 changes: 0 additions & 6 deletions management/server/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ func (am *DefaultAccountManager) GetPolicy(ctx context.Context, accountID, polic

// SavePolicy in the store
func (am *DefaultAccountManager) SavePolicy(ctx context.Context, accountID, userID string, policy *types.Policy, create bool) (*types.Policy, error) {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

operation := operations.Create
if !create {
operation = operations.Update
Expand Down Expand Up @@ -88,9 +85,6 @@ func (am *DefaultAccountManager) SavePolicy(ctx context.Context, accountID, user

// DeletePolicy from the store
func (am *DefaultAccountManager) DeletePolicy(ctx context.Context, accountID, policyID, userID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()

allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Policies, operations.Delete)
if err != nil {
return status.NewPermissionValidationError(err)
Expand Down
Loading
Loading