Skip to content

Commit

Permalink
Merge pull request #902 from subutai-io/sysnet
Browse files Browse the repository at this point in the history
Sysnet
  • Loading branch information
crioto authored Apr 28, 2018
2 parents a2e3863 + f8cac31 commit b6d8ca1
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions lib/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type NetworkPeer struct {
Running bool // Whether peer is running or not
EndpointsHeap []*PeerEndpoint // List of all endpoints
// EndpointsActive []PeerEndpoint // List of active endpoints
EndpointsLock sync.RWMutex // Mutex for endpoints operations
Lock sync.RWMutex // Mutex for endpoints operations
punchingInProgress bool // Whether or not UDP hole punching is running
LastFind time.Time // Moment when we got this peer from DHT
LastPunch time.Time // Last time we run hole punch
Expand Down Expand Up @@ -77,6 +77,10 @@ type NetworkPeerState struct {

// Run is main loop for a peer
func (np *NetworkPeer) Run(ptpc *PeerToPeer) {
np.Lock.Lock()
if np.Running {
return
}
np.Running = true
np.ConnectionAttempts = 0

Expand All @@ -91,6 +95,7 @@ func (np *NetworkPeer) Run(ptpc *PeerToPeer) {
np.handlers[PeerStateWaitingForProxy] = np.stateWaitingForProxy
np.handlers[PeerStateWaitingToConnect] = np.stateWaitingToConnect
np.handlers[PeerStateCooldown] = np.stateCooldown
np.Lock.Unlock()

for {
if np.State == PeerStateStop {
Expand Down Expand Up @@ -318,7 +323,7 @@ func (np *NetworkPeer) stateWaitingToConnect(ptpc *PeerToPeer) error {
}

func (np *NetworkPeer) sortEndpoints(ptpc *PeerToPeer) ([]*PeerEndpoint, []*PeerEndpoint, []*PeerEndpoint) {
np.EndpointsLock.RLock()
np.Lock.RLock()
locals := []*PeerEndpoint{}
internet := []*PeerEndpoint{}
proxies := []*PeerEndpoint{}
Expand Down Expand Up @@ -372,7 +377,7 @@ func (np *NetworkPeer) sortEndpoints(ptpc *PeerToPeer) ([]*PeerEndpoint, []*Peer
internet = append(internet, ep)
}
}
np.EndpointsLock.RUnlock()
np.Lock.RUnlock()
return locals, internet, proxies
}

Expand All @@ -384,12 +389,12 @@ func (np *NetworkPeer) route(ptpc *PeerToPeer) error {
stat := PeerStats{}
locals, internet, proxies := np.sortEndpoints(ptpc)

np.EndpointsLock.Lock()
np.Lock.Lock()
np.EndpointsHeap = np.EndpointsHeap[:0]
np.EndpointsHeap = append(np.EndpointsHeap, locals...)
np.EndpointsHeap = append(np.EndpointsHeap, internet...)
np.EndpointsHeap = append(np.EndpointsHeap, proxies...)
np.EndpointsLock.Unlock()
np.Lock.Unlock()

stat.localNum = len(locals)
stat.internetNum = len(internet)
Expand Down Expand Up @@ -444,8 +449,8 @@ func (np *NetworkPeer) stateCooldown(ptpc *PeerToPeer) error {
// This method will append new endpoint to the end of endpoints slice
// without any checks
func (np *NetworkPeer) addEndpoint(addr *net.UDPAddr) error {
np.EndpointsLock.Lock()
defer np.EndpointsLock.Unlock()
np.Lock.Lock()
defer np.Lock.Unlock()
for _, ep := range np.EndpointsHeap {
if ep.Addr.String() == addr.String() {
return fmt.Errorf("Endpoint already exists")
Expand All @@ -458,7 +463,7 @@ func (np *NetworkPeer) addEndpoint(addr *net.UDPAddr) error {
// This method will send xpeer ping message to endpoints
// if ping timeout has been passed
func (np *NetworkPeer) pingEndpoints(ptpc *PeerToPeer) {
np.EndpointsLock.RLock()
np.Lock.RLock()
for _, ep := range np.EndpointsHeap {
if time.Since(ep.LastPing) > time.Duration(time.Millisecond*3000) {
ep.LastPing = time.Now()
Expand All @@ -470,7 +475,7 @@ func (np *NetworkPeer) pingEndpoints(ptpc *PeerToPeer) {
ptpc.UDPSocket.SendMessage(msg, ep.Addr)
}
}
np.EndpointsLock.RUnlock()
np.Lock.RUnlock()
}

// This method will check if remote state requires local
Expand All @@ -495,8 +500,8 @@ func (np *NetworkPeer) syncWithRemoteState(ptpc *PeerToPeer) {
}

func (np *NetworkPeer) BumpEndpoint(epAddr string) {
np.EndpointsLock.Lock()
defer np.EndpointsLock.Unlock()
np.Lock.Lock()
defer np.Lock.Unlock()
for _, ep := range np.EndpointsHeap {
if ep.Addr.String() == epAddr {
ep.LastContact = time.Now()
Expand Down

0 comments on commit b6d8ca1

Please sign in to comment.