Skip to content

Commit

Permalink
Merge pull request #237 from subutai-io/dev
Browse files Browse the repository at this point in the history
4.0.6 Release
  • Loading branch information
crioto authored Nov 9, 2016
2 parents 17d59bc + eedc378 commit 9e279a7
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 49 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Change Log

## [4.0.6] 11/09/2016

* Modified timeout intervals to speed up disconnect procedure
* Modified DHT reconnect procedures to exclude uneccessary reconnects
* Channels has been moved to DHT to improve code readability
* Proper handling of "Unknown" command to force reconnect to DHT
* STOP procedure modified to force peer disconnect
* Fixed network comparison to exclude possible connection over existing p2p interface
* Fixed several issues with deadlocks while dealing with instances

## [4.0.5] 11/02/2016

* Fixed an issue when p2p was not able to connect to LAN peers
Expand Down
16 changes: 14 additions & 2 deletions lib/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func (dht *DHTClient) HandleFind(data DHTMessage, conn *net.UDPConn) {
dht.Peers = append(dht.Peers[:i], dht.Peers[i+1:]...)
}
}
if dht.PeerChannel == nil {
dht.PeerChannel = make(chan []PeerIP)
}
dht.PeerChannel <- dht.Peers
Log(Debug, "Received peers from %s: %s", conn.RemoteAddr().String(), data.Arguments)
dht.UpdateLastCatch(data.Arguments)
Expand Down Expand Up @@ -441,6 +444,9 @@ func (dht *DHTClient) HandleCp(data DHTMessage, conn *net.UDPConn) {
var fwd Forwarder
fwd.Addr = addr
fwd.DestinationID = data.Arguments
if dht.ProxyChannel == nil {
dht.ProxyChannel = make(chan Forwarder)
}
dht.ProxyChannel <- fwd
found := false
for _, f := range dht.Forwarders {
Expand Down Expand Up @@ -468,6 +474,9 @@ func (dht *DHTClient) HandleStop(data DHTMessage, conn *net.UDPConn) {
// We need to stop particular peer by changing it's state to
// P_DISCONNECT
Log(Info, "Stop command for %s", data.Arguments)
if dht.RemovePeerChan == nil {
dht.RemovePeerChan = make(chan string)
}
dht.RemovePeerChan <- data.Arguments
} else {
conn.Close()
Expand Down Expand Up @@ -496,6 +505,7 @@ func (dht *DHTClient) HandleDHCP(data DHTMessage, conn *net.UDPConn) {
// handshaked clients
func (dht *DHTClient) HandleUnknown(data DHTMessage, conn *net.UDPConn) {
Log(Warning, "DHT server refuses our identity")
dht.ID = ""
if dht.State == DHTStateConnecting || dht.State == DHTStateReconnecting {
time.Sleep(3 * time.Second)
}
Expand All @@ -520,9 +530,11 @@ func (dht *DHTClient) HandleError(data DHTMessage, conn *net.UDPConn) {
// Initialize - This method initializes DHT by splitting list of routers and connect to each one
func (dht *DHTClient) Initialize(config *DHTClient, ips []net.IP, peerChan chan []PeerIP, proxyChan chan Forwarder) *DHTClient {
dht.RemovePeerChan = make(chan string)
dht.PeerChannel = make(chan []PeerIP)
dht.ProxyChannel = make(chan Forwarder)
dht = config
dht.PeerChannel = peerChan
dht.ProxyChannel = proxyChan
//dht.PeerChannel = peerChan
//dht.ProxyChannel = proxyChan
routers := strings.Split(dht.Routers, ",")
dht.FailedRouters = make([]string, len(routers))
dht.ResponseHandlers = make(map[string]DHTResponseCallback)
Expand Down
141 changes: 95 additions & 46 deletions lib/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"gopkg.in/yaml.v2"
)

var GlobalIPBlacklist []string

// MessageHandler is a messages callback
type MessageHandler func(message *P2PMessage, srcAddr *net.UDPAddr)

Expand All @@ -38,8 +40,8 @@ type PeerToPeer struct {
MACIDTable map[string]string // Mapping for MAC->ID
MessageHandlers map[uint16]MessageHandler // Callbacks
PacketHandlers map[PacketType]PacketHandlerCallback // Callbacks for network packet handlers
DHTPeerChannel chan []PeerIP
ProxyChannel chan Forwarder
//DHTPeerChannel chan []PeerIP
//ProxyChannel chan Forwarder
RemovePeer chan string
MessageBuffer map[string]map[uint16]map[uint16][]byte
MessageLifetime map[string]map[uint16]time.Time
Expand All @@ -52,6 +54,14 @@ type PeerToPeer struct {
func (p *PeerToPeer) AssignInterface(ip, mac, mask, device string) error {
var err error

for _, i := range GlobalIPBlacklist {
if i == ip {
Log(Error, "Can't assign IP Address: IP %s is already in use", ip)
return fmt.Errorf("Can't assign IP Address: IP %s is already in use", ip)
}
}
GlobalIPBlacklist = append(GlobalIPBlacklist, ip)

p.IP = ip
p.Mac = mac
p.Mask = mask
Expand Down Expand Up @@ -191,6 +201,11 @@ func (p *PeerToPeer) FindNetworkAddresses() {
if !p.IsIPv4(ip.String()) {
decision = "No IPv4"
}
for _, i := range GlobalIPBlacklist {
if i == ip.String() {
decision = "Ignoring"
}
}
Log(Info, "Interface %s: %s. Type: %s. %s", i.Name, addr.String(), ipType, decision)
if decision == "Saving" {
p.LocalIPs = append(p.LocalIPs, ip)
Expand Down Expand Up @@ -310,11 +325,11 @@ func StartP2PInstance(argIP, argMac, argDev, argDirect, argHash, argDht, argKeyf
}
*/
// TODO: Move channels inside DHT
p.DHTPeerChannel = make(chan []PeerIP)
p.ProxyChannel = make(chan Forwarder)
//p.DHTPeerChannel = make(chan []PeerIP)
//p.ProxyChannel = make(chan Forwarder)
p.StartDHT(argHash, argDht)
/*
p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel)
p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel)
for p.Dht == nil {
Log(Warning, "Failed to connect to DHT. Retrying in 5 seconds")
time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -388,13 +403,13 @@ func (p *PeerToPeer) StartDHT(hash, routers string) {
if routers != "" {
config.Routers = routers
}
p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel)
p.Dht = dhtClient.Initialize(config, p.LocalIPs, nil, nil)
for p.Dht == nil {
Log(Warning, "Failed to connect to DHT. Retrying in 5 seconds")
time.Sleep(5 * time.Second)
p.LocalIPs = p.LocalIPs[:0]
p.FindNetworkAddresses()
p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel)
p.Dht = dhtClient.Initialize(config, p.LocalIPs, nil, nil)
}
Log(Info, "ID assigned. Continue")
}
Expand All @@ -408,24 +423,33 @@ func (p *PeerToPeer) Run() {
if p.Shutdown {
break
}
rm := <-p.Dht.RemovePeerChan
if rm == "DUMMY" {
continue
}
p.PeersLock.Lock()
peer, exists := p.NetworkPeers[rm]
p.PeersLock.Unlock()
runtime.Gosched()
if exists {
Log(Info, "Stopping %s after STOP command", rm)
peer.State = PeerStateDisconnect
p.PeersLock.Lock()
p.NetworkPeers[rm] = peer
p.PeersLock.Unlock()
runtime.Gosched()
} else {
Log(Info, "Can't stop peer. ID not found")
select {
case rm, r := <-p.Dht.RemovePeerChan:
if r {
if rm == "DUMMY" || rm == "" {
continue
}
p.PeersLock.Lock()
peer, exists := p.NetworkPeers[rm]
p.PeersLock.Unlock()
runtime.Gosched()
if exists {
Log(Info, "Stopping %s after STOP command", rm)
peer.State = PeerStateDisconnect
p.PeersLock.Lock()
p.NetworkPeers[rm] = peer
p.PeersLock.Unlock()
runtime.Gosched()
} else {
Log(Info, "Can't stop peer. ID not found")
}
} else {
Log(Trace, "Channel was closed")
}
default:
time.Sleep(100 * time.Microsecond)
}
//rm := <-p.Dht.RemovePeerChan
}
Log(Info, "Stopping peer state listener")
}()
Expand All @@ -444,17 +468,24 @@ func (p *PeerToPeer) Run() {
if peer.State == PeerStateStop {
Log(Info, "Removing peer %s", i)
time.Sleep(100 * time.Microsecond)
delete(p.IPIDTable, peer.PeerLocalIP.String())
lip := peer.PeerLocalIP.String()
delete(p.IPIDTable, lip)
delete(p.MACIDTable, peer.PeerHW.String())

for k, i := range GlobalIPBlacklist {
if i == lip {
GlobalIPBlacklist = append(GlobalIPBlacklist[:k], GlobalIPBlacklist[k+1:]...)
}
}

p.PeersLock.Lock()
delete(p.NetworkPeers, i)
p.PeersLock.Unlock()
runtime.Gosched()
}
}
passed := time.Since(p.Dht.LastDHTPing)
interval := time.Duration(time.Second * 50)
interval := time.Duration(time.Second * 45)
if passed > interval {
Log(Error, "Lost connection to DHT")
p.Dht.Shutdown = true
Expand Down Expand Up @@ -683,6 +714,7 @@ func (p *PeerToPeer) HandleIntroMessage(msg *P2PMessage, srcAddr *net.UDPAddr) {
}
peer.PeerHW = mac
peer.PeerLocalIP = ip
GlobalIPBlacklist = append(GlobalIPBlacklist, ip.String())
peer.State = PeerStateConnected
peer.LastContact = time.Now()
p.PeersLock.Lock()
Expand Down Expand Up @@ -804,8 +836,8 @@ func (p *PeerToPeer) StopInstance() {
p.Shutdown = true
var peers []PeerIP
var proxy Forwarder
p.DHTPeerChannel <- peers
p.ProxyChannel <- proxy
p.Dht.PeerChannel <- peers
p.Dht.ProxyChannel <- proxy
Log(Info, "Stopping P2P Message handler")
// Tricky part: we need to send a message to ourselves to quit blocking operation
msg := CreateTestP2PMessage(p.Crypter, "STOP", 1)
Expand Down Expand Up @@ -837,8 +869,16 @@ func (p *PeerToPeer) ReadDHTPeers() {
if p.Shutdown {
break
}
peers := <-p.DHTPeerChannel
p.UpdatePeers(peers)
select {
case peers, hasData := <-p.Dht.PeerChannel:
if hasData {
p.UpdatePeers(peers)
} else {
Log(Trace, "Clossed channel")
}
default:
time.Sleep(100 * time.Microsecond)
}
}
Log(Info, "Stopped DHT reader channel")
}
Expand All @@ -849,23 +889,32 @@ func (p *PeerToPeer) ReadProxies() {
if p.Shutdown {
break
}
proxy := <-p.ProxyChannel
exists := false
for i, peer := range p.NetworkPeers {
if i == proxy.DestinationID {
peer.State = PeerStateHandshakingForwarder
peer.Forwarder = proxy.Addr
peer.Endpoint = proxy.Addr
p.PeersLock.Lock()
p.NetworkPeers[i] = peer
p.PeersLock.Unlock()
runtime.Gosched()
exists = true
select {
case proxy, hasData := <-p.Dht.ProxyChannel:
if hasData {
exists := false
for i, peer := range p.NetworkPeers {
if i == proxy.DestinationID {
peer.State = PeerStateHandshakingForwarder
peer.Forwarder = proxy.Addr
peer.Endpoint = proxy.Addr
p.PeersLock.Lock()
p.NetworkPeers[i] = peer
p.PeersLock.Unlock()
runtime.Gosched()
exists = true
}
}
if !exists {
Log(Info, "Received forwarder for unknown peer")
p.Dht.SendUpdateRequest()
}

} else {
Log(Trace, "Clossed channel")
}
}
if !exists {
Log(Info, "Received forwarder for unknown peer")
p.Dht.SendUpdateRequest()
default:
time.Sleep(100 * time.Microsecond)
}
}
Log(Info, "Stopped Proxy reader channel")
Expand Down
12 changes: 11 additions & 1 deletion lib/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net"
"strings"
"time"
)

Expand Down Expand Up @@ -161,6 +162,7 @@ func (np *NetworkPeer) StateConnected(ptpc *PeerToPeer) error {
np.PeerAddr = nil
np.Endpoint = nil
np.PingCount = 0
time.Sleep(30 * time.Second)
return fmt.Errorf("Peer %s has been timed out", np.ID)
}
if np.Endpoint == nil {
Expand All @@ -172,7 +174,7 @@ func (np *NetworkPeer) StateConnected(ptpc *PeerToPeer) error {
passed := time.Since(np.LastContact)
if passed > PeerPingTimeout {
np.LastError = ""
Log(Debug, "Sending ping")
Log(Trace, "Sending ping")
msg := CreateXpeerPingMessage(PingReq, ptpc.HardwareAddr.String())
ptpc.SendTo(np.PeerHW, msg)
np.PingCount++
Expand Down Expand Up @@ -399,6 +401,14 @@ func (np *NetworkPeer) ProbeLocalConnection(ptpc *PeerToPeer) bool {
Log(Debug, "Probing new IP %s against network %s", kip.IP.String(), network.String())

if network.Contains(kip.IP) {

for _, i := range GlobalIPBlacklist {
str := kip.String()
parts := strings.Split(str, ":")
if len(parts) > 1 && i == parts[0] {
continue
}
}
if np.TestConnection(ptpc, kip) {
np.Endpoint = kip
Log(Info, "Setting endpoint for %s to %s", np.ID, kip.String())
Expand Down

0 comments on commit 9e279a7

Please sign in to comment.