diff --git a/CHANGELOG b/CHANGELOG index b96f0464..b7997911 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,15 @@ # Change Log +## [4.0.6] 11/09/2016 + +* Modified timeout intervals to speed up disconnect procedure +* Modified DHT reconnect procedures to exclude uneccessary reconnects +* Channels has been moved to DHT to improve code readability +* Proper handling of "Unknown" command to force reconnect to DHT +* STOP procedure modified to force peer disconnect +* Fixed network comparison to exclude possible connection over existing p2p interface +* Fixed several issues with deadlocks while dealing with instances + ## [4.0.5] 11/02/2016 * Fixed an issue when p2p was not able to connect to LAN peers diff --git a/lib/dht.go b/lib/dht.go index 4d2dd6c0..ffca2f5d 100644 --- a/lib/dht.go +++ b/lib/dht.go @@ -381,6 +381,9 @@ func (dht *DHTClient) HandleFind(data DHTMessage, conn *net.UDPConn) { dht.Peers = append(dht.Peers[:i], dht.Peers[i+1:]...) } } + if dht.PeerChannel == nil { + dht.PeerChannel = make(chan []PeerIP) + } dht.PeerChannel <- dht.Peers Log(Debug, "Received peers from %s: %s", conn.RemoteAddr().String(), data.Arguments) dht.UpdateLastCatch(data.Arguments) @@ -441,6 +444,9 @@ func (dht *DHTClient) HandleCp(data DHTMessage, conn *net.UDPConn) { var fwd Forwarder fwd.Addr = addr fwd.DestinationID = data.Arguments + if dht.ProxyChannel == nil { + dht.ProxyChannel = make(chan Forwarder) + } dht.ProxyChannel <- fwd found := false for _, f := range dht.Forwarders { @@ -468,6 +474,9 @@ func (dht *DHTClient) HandleStop(data DHTMessage, conn *net.UDPConn) { // We need to stop particular peer by changing it's state to // P_DISCONNECT Log(Info, "Stop command for %s", data.Arguments) + if dht.RemovePeerChan == nil { + dht.RemovePeerChan = make(chan string) + } dht.RemovePeerChan <- data.Arguments } else { conn.Close() @@ -496,6 +505,7 @@ func (dht *DHTClient) HandleDHCP(data DHTMessage, conn *net.UDPConn) { // handshaked clients func (dht *DHTClient) HandleUnknown(data DHTMessage, conn *net.UDPConn) { Log(Warning, "DHT server refuses our identity") + dht.ID = "" if dht.State == DHTStateConnecting || dht.State == DHTStateReconnecting { time.Sleep(3 * time.Second) } @@ -520,9 +530,11 @@ func (dht *DHTClient) HandleError(data DHTMessage, conn *net.UDPConn) { // Initialize - This method initializes DHT by splitting list of routers and connect to each one func (dht *DHTClient) Initialize(config *DHTClient, ips []net.IP, peerChan chan []PeerIP, proxyChan chan Forwarder) *DHTClient { dht.RemovePeerChan = make(chan string) + dht.PeerChannel = make(chan []PeerIP) + dht.ProxyChannel = make(chan Forwarder) dht = config - dht.PeerChannel = peerChan - dht.ProxyChannel = proxyChan + //dht.PeerChannel = peerChan + //dht.ProxyChannel = proxyChan routers := strings.Split(dht.Routers, ",") dht.FailedRouters = make([]string, len(routers)) dht.ResponseHandlers = make(map[string]DHTResponseCallback) diff --git a/lib/p2p.go b/lib/p2p.go index 5c1bda70..8ac69487 100644 --- a/lib/p2p.go +++ b/lib/p2p.go @@ -13,6 +13,8 @@ import ( "gopkg.in/yaml.v2" ) +var GlobalIPBlacklist []string + // MessageHandler is a messages callback type MessageHandler func(message *P2PMessage, srcAddr *net.UDPAddr) @@ -38,8 +40,8 @@ type PeerToPeer struct { MACIDTable map[string]string // Mapping for MAC->ID MessageHandlers map[uint16]MessageHandler // Callbacks PacketHandlers map[PacketType]PacketHandlerCallback // Callbacks for network packet handlers - DHTPeerChannel chan []PeerIP - ProxyChannel chan Forwarder + //DHTPeerChannel chan []PeerIP + //ProxyChannel chan Forwarder RemovePeer chan string MessageBuffer map[string]map[uint16]map[uint16][]byte MessageLifetime map[string]map[uint16]time.Time @@ -52,6 +54,14 @@ type PeerToPeer struct { func (p *PeerToPeer) AssignInterface(ip, mac, mask, device string) error { var err error + for _, i := range GlobalIPBlacklist { + if i == ip { + Log(Error, "Can't assign IP Address: IP %s is already in use", ip) + return fmt.Errorf("Can't assign IP Address: IP %s is already in use", ip) + } + } + GlobalIPBlacklist = append(GlobalIPBlacklist, ip) + p.IP = ip p.Mac = mac p.Mask = mask @@ -191,6 +201,11 @@ func (p *PeerToPeer) FindNetworkAddresses() { if !p.IsIPv4(ip.String()) { decision = "No IPv4" } + for _, i := range GlobalIPBlacklist { + if i == ip.String() { + decision = "Ignoring" + } + } Log(Info, "Interface %s: %s. Type: %s. %s", i.Name, addr.String(), ipType, decision) if decision == "Saving" { p.LocalIPs = append(p.LocalIPs, ip) @@ -310,11 +325,11 @@ func StartP2PInstance(argIP, argMac, argDev, argDirect, argHash, argDht, argKeyf } */ // TODO: Move channels inside DHT - p.DHTPeerChannel = make(chan []PeerIP) - p.ProxyChannel = make(chan Forwarder) + //p.DHTPeerChannel = make(chan []PeerIP) + //p.ProxyChannel = make(chan Forwarder) p.StartDHT(argHash, argDht) /* - p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel) + p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel) for p.Dht == nil { Log(Warning, "Failed to connect to DHT. Retrying in 5 seconds") time.Sleep(5 * time.Second) @@ -388,13 +403,13 @@ func (p *PeerToPeer) StartDHT(hash, routers string) { if routers != "" { config.Routers = routers } - p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel) + p.Dht = dhtClient.Initialize(config, p.LocalIPs, nil, nil) for p.Dht == nil { Log(Warning, "Failed to connect to DHT. Retrying in 5 seconds") time.Sleep(5 * time.Second) p.LocalIPs = p.LocalIPs[:0] p.FindNetworkAddresses() - p.Dht = dhtClient.Initialize(config, p.LocalIPs, p.DHTPeerChannel, p.ProxyChannel) + p.Dht = dhtClient.Initialize(config, p.LocalIPs, nil, nil) } Log(Info, "ID assigned. Continue") } @@ -408,24 +423,33 @@ func (p *PeerToPeer) Run() { if p.Shutdown { break } - rm := <-p.Dht.RemovePeerChan - if rm == "DUMMY" { - continue - } - p.PeersLock.Lock() - peer, exists := p.NetworkPeers[rm] - p.PeersLock.Unlock() - runtime.Gosched() - if exists { - Log(Info, "Stopping %s after STOP command", rm) - peer.State = PeerStateDisconnect - p.PeersLock.Lock() - p.NetworkPeers[rm] = peer - p.PeersLock.Unlock() - runtime.Gosched() - } else { - Log(Info, "Can't stop peer. ID not found") + select { + case rm, r := <-p.Dht.RemovePeerChan: + if r { + if rm == "DUMMY" || rm == "" { + continue + } + p.PeersLock.Lock() + peer, exists := p.NetworkPeers[rm] + p.PeersLock.Unlock() + runtime.Gosched() + if exists { + Log(Info, "Stopping %s after STOP command", rm) + peer.State = PeerStateDisconnect + p.PeersLock.Lock() + p.NetworkPeers[rm] = peer + p.PeersLock.Unlock() + runtime.Gosched() + } else { + Log(Info, "Can't stop peer. ID not found") + } + } else { + Log(Trace, "Channel was closed") + } + default: + time.Sleep(100 * time.Microsecond) } + //rm := <-p.Dht.RemovePeerChan } Log(Info, "Stopping peer state listener") }() @@ -444,9 +468,16 @@ func (p *PeerToPeer) Run() { if peer.State == PeerStateStop { Log(Info, "Removing peer %s", i) time.Sleep(100 * time.Microsecond) - delete(p.IPIDTable, peer.PeerLocalIP.String()) + lip := peer.PeerLocalIP.String() + delete(p.IPIDTable, lip) delete(p.MACIDTable, peer.PeerHW.String()) + for k, i := range GlobalIPBlacklist { + if i == lip { + GlobalIPBlacklist = append(GlobalIPBlacklist[:k], GlobalIPBlacklist[k+1:]...) + } + } + p.PeersLock.Lock() delete(p.NetworkPeers, i) p.PeersLock.Unlock() @@ -454,7 +485,7 @@ func (p *PeerToPeer) Run() { } } passed := time.Since(p.Dht.LastDHTPing) - interval := time.Duration(time.Second * 50) + interval := time.Duration(time.Second * 45) if passed > interval { Log(Error, "Lost connection to DHT") p.Dht.Shutdown = true @@ -683,6 +714,7 @@ func (p *PeerToPeer) HandleIntroMessage(msg *P2PMessage, srcAddr *net.UDPAddr) { } peer.PeerHW = mac peer.PeerLocalIP = ip + GlobalIPBlacklist = append(GlobalIPBlacklist, ip.String()) peer.State = PeerStateConnected peer.LastContact = time.Now() p.PeersLock.Lock() @@ -804,8 +836,8 @@ func (p *PeerToPeer) StopInstance() { p.Shutdown = true var peers []PeerIP var proxy Forwarder - p.DHTPeerChannel <- peers - p.ProxyChannel <- proxy + p.Dht.PeerChannel <- peers + p.Dht.ProxyChannel <- proxy Log(Info, "Stopping P2P Message handler") // Tricky part: we need to send a message to ourselves to quit blocking operation msg := CreateTestP2PMessage(p.Crypter, "STOP", 1) @@ -837,8 +869,16 @@ func (p *PeerToPeer) ReadDHTPeers() { if p.Shutdown { break } - peers := <-p.DHTPeerChannel - p.UpdatePeers(peers) + select { + case peers, hasData := <-p.Dht.PeerChannel: + if hasData { + p.UpdatePeers(peers) + } else { + Log(Trace, "Clossed channel") + } + default: + time.Sleep(100 * time.Microsecond) + } } Log(Info, "Stopped DHT reader channel") } @@ -849,23 +889,32 @@ func (p *PeerToPeer) ReadProxies() { if p.Shutdown { break } - proxy := <-p.ProxyChannel - exists := false - for i, peer := range p.NetworkPeers { - if i == proxy.DestinationID { - peer.State = PeerStateHandshakingForwarder - peer.Forwarder = proxy.Addr - peer.Endpoint = proxy.Addr - p.PeersLock.Lock() - p.NetworkPeers[i] = peer - p.PeersLock.Unlock() - runtime.Gosched() - exists = true + select { + case proxy, hasData := <-p.Dht.ProxyChannel: + if hasData { + exists := false + for i, peer := range p.NetworkPeers { + if i == proxy.DestinationID { + peer.State = PeerStateHandshakingForwarder + peer.Forwarder = proxy.Addr + peer.Endpoint = proxy.Addr + p.PeersLock.Lock() + p.NetworkPeers[i] = peer + p.PeersLock.Unlock() + runtime.Gosched() + exists = true + } + } + if !exists { + Log(Info, "Received forwarder for unknown peer") + p.Dht.SendUpdateRequest() + } + + } else { + Log(Trace, "Clossed channel") } - } - if !exists { - Log(Info, "Received forwarder for unknown peer") - p.Dht.SendUpdateRequest() + default: + time.Sleep(100 * time.Microsecond) } } Log(Info, "Stopped Proxy reader channel") diff --git a/lib/peer.go b/lib/peer.go index f051e5b0..be765b45 100644 --- a/lib/peer.go +++ b/lib/peer.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net" + "strings" "time" ) @@ -161,6 +162,7 @@ func (np *NetworkPeer) StateConnected(ptpc *PeerToPeer) error { np.PeerAddr = nil np.Endpoint = nil np.PingCount = 0 + time.Sleep(30 * time.Second) return fmt.Errorf("Peer %s has been timed out", np.ID) } if np.Endpoint == nil { @@ -172,7 +174,7 @@ func (np *NetworkPeer) StateConnected(ptpc *PeerToPeer) error { passed := time.Since(np.LastContact) if passed > PeerPingTimeout { np.LastError = "" - Log(Debug, "Sending ping") + Log(Trace, "Sending ping") msg := CreateXpeerPingMessage(PingReq, ptpc.HardwareAddr.String()) ptpc.SendTo(np.PeerHW, msg) np.PingCount++ @@ -399,6 +401,14 @@ func (np *NetworkPeer) ProbeLocalConnection(ptpc *PeerToPeer) bool { Log(Debug, "Probing new IP %s against network %s", kip.IP.String(), network.String()) if network.Contains(kip.IP) { + + for _, i := range GlobalIPBlacklist { + str := kip.String() + parts := strings.Split(str, ":") + if len(parts) > 1 && i == parts[0] { + continue + } + } if np.TestConnection(ptpc, kip) { np.Endpoint = kip Log(Info, "Setting endpoint for %s to %s", np.ID, kip.String())