Skip to content

Commit

Permalink
Merge pull request #467 from subutai-io/fix-handshake
Browse files Browse the repository at this point in the history
Fix handshake
  • Loading branch information
crioto authored Nov 30, 2017
2 parents d44afbe + 1ac1d47 commit 30f5c03
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 98 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Change Log

## [6.3.0] TBD
## [6.2.4] 11/30/2017

* Draft next version
* Refactored communication between peers over TURN server
* Fixed high CPU consumption
* Improved stability

## [6.2.3] 11/23/2017

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.2.3
6.2.4
7 changes: 7 additions & 0 deletions instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,13 @@ func (p *Daemon) Debug(args *Args, resp *Response) error {
resp.Output += fmt.Sprintf("ID: %s\n", inst.PTP.Dht.ID)
resp.Output += fmt.Sprintf("UDP Port: %d\n", inst.PTP.UDPSocket.GetPort())
resp.Output += fmt.Sprintf("Interface %s, HW Addr: %s, IP: %s\n", inst.PTP.Interface.Name, inst.PTP.Interface.Mac.String(), inst.PTP.Interface.IP.String())
resp.Output += fmt.Sprintf("Proxies:\n")
if len(inst.PTP.Proxies) == 0 {
resp.Output += fmt.Sprintf("\tNo proxies in use\n")
}
for _, proxy := range inst.PTP.Proxies {
resp.Output += fmt.Sprintf("\tProxy address: %s\n", proxy.Addr.String())
}
resp.Output += fmt.Sprintf("Peers:\n")

peers := inst.PTP.Peers.Get()
Expand Down
25 changes: 14 additions & 11 deletions lib/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ func (dht *DHTClient) Listen(conn *net.TCPConn) {
dht.Connected = false
break
}
packet := &DHTPacket{}
err = proto.Unmarshal(data[:n], packet)
if err != nil {
Log(Warning, "Corrupted data: %s", err)
continue
}
go func() {
packet := &DHTPacket{}
err = proto.Unmarshal(data[:n], packet)
if err != nil {
Log(Warning, "Corrupted data: %s", err)
return
}
callback, exists := dht.TCPCallbacks[packet.Type]
if !exists {
Log(Error, "Unknown packet type from BSN")
Expand All @@ -187,12 +187,14 @@ func (dht *DHTClient) Listen(conn *net.TCPConn) {

// Sends bytes to all connected bootstrap nodes
func (dht *DHTClient) send(data []byte) error {
for _, conn := range dht.Connections {
_, err := conn.Write(data)
if err != nil {
return err
go func() {
for _, conn := range dht.Connections {
_, err := conn.Write(data)
if err != nil {
continue
}
}
}
}()
return nil
}

Expand All @@ -202,6 +204,7 @@ func (dht *DHTClient) sendFind() error {
if dht.NetworkHash == "" {
return fmt.Errorf("Failed to find peers: Infohash is not set")
}
Log(Info, "Requesting swarm updates")
packet := &DHTPacket{
Type: DHTPacketType_Find,
Id: dht.ID,
Expand Down
6 changes: 3 additions & 3 deletions lib/dht_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (dht *DHTClient) packetDHCP(packet *DHTPacket) error {
dht.Network = network
Log(Info, "Received network information: %s", network.String())
}
dht.sendProxy()
dht.sendFind()
return nil
}
Expand All @@ -73,6 +72,7 @@ func (dht *DHTClient) packetError(packet *DHTPacket) error {
}

func (dht *DHTClient) packetFind(packet *DHTPacket) error {
dht.sendProxy()
if len(packet.Arguments) == 0 {
Log(Warning, "Received empty peer list")
return nil
Expand All @@ -99,7 +99,7 @@ func (dht *DHTClient) packetNode(packet *DHTPacket) error {
if addr == "" {
continue
}
ip, err := net.ResolveUDPAddr("udp", addr)
ip, err := net.ResolveUDPAddr("udp4", addr)
if err != nil {
Log(Error, "Failed to resolve one of peer addresses: %s", err)
continue
Expand Down Expand Up @@ -135,7 +135,7 @@ func (dht *DHTClient) packetProxy(packet *DHTPacket) error {
func (dht *DHTClient) packetRequestProxy(packet *DHTPacket) error {
list := []*net.UDPAddr{}
for _, proxy := range packet.Proxies {
addr, err := net.ResolveUDPAddr("udp", proxy)
addr, err := net.ResolveUDPAddr("udp4", proxy)
if err != nil {
Log(Error, "Can't parse proxy %s for peer %s", proxy, packet.Data)
continue
Expand Down
7 changes: 4 additions & 3 deletions lib/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,11 @@ func (uc *Network) Init(host string, port int) error {
uc.disposed = true

//todo check if we need Host and Port
uc.addr, err = net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", port))
uc.addr, err = net.ResolveUDPAddr("udp4", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
uc.conn, err = net.ListenUDP("udp", uc.addr)
uc.conn, err = net.ListenUDP("udp4", uc.addr)
if err != nil {
return err
}
Expand All @@ -353,12 +353,13 @@ func (uc *Network) KeepAlive(addr *net.UDPAddr) {
keepAlive = time.Now()
uc.SendRawBytes(data, addr)
}
time.Sleep(100 * time.Millisecond)
}
}

// GetPort return a port assigned
func (uc *Network) GetPort() int {
addr, _ := net.ResolveUDPAddr("udp", uc.conn.LocalAddr().String())
addr, _ := net.ResolveUDPAddr("udp4", uc.conn.LocalAddr().String())
return addr.Port
}

Expand Down
97 changes: 40 additions & 57 deletions lib/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (p *PeerToPeer) retrieveFirstDHTRouter() *net.UDPAddr {
if len(router) != 2 {
return nil
}
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", router[0], 6882))
addr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", router[0], 6882))
if err != nil {
return nil
}
Expand Down Expand Up @@ -541,7 +541,7 @@ func (p *PeerToPeer) Run() {
p.removeStoppedPeers()
p.checkBootstrapNodes()
p.checkLastDHTUpdate()
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
}
Log(Info, "Shutting down instance %s completed", p.Dht.NetworkHash)
Expand Down Expand Up @@ -763,23 +763,15 @@ func (p *PeerToPeer) HandleIntroMessage(msg *P2PMessage, srcAddr *net.UDPAddr) {
id, mac, ip := p.ParseIntroString(string(msg.Data))
peer := p.Peers.GetPeer(id)
// Do nothing when handshaking already done
if peer.State != PeerStateHandshaking {
if peer.State != PeerStateHandshaking && peer.State != PeerStateHandshakingForwarder {
return
}
if peer == nil {
Log(Debug, "Received introduction confirmation from unknown peer: %s", id)
p.Dht.sendFind()
return
}
if msg.Header.ProxyID > 0 && peer.ProxyID == 0 {
peer.ForceProxy = true
peer.PeerAddr = nil
peer.Endpoint = nil
peer.SetState(PeerStateInit, p)
peer.KnownIPs = peer.KnownIPs[:0]
p.Peers.Update(id, peer)
return
}

if mac == nil {
Log(Error, "Received empty MAC address. Skipping")
return
Expand All @@ -805,8 +797,32 @@ func (p *PeerToPeer) HandleIntroRequestMessage(msg *P2PMessage, srcAddr *net.UDP
p.Dht.sendFind()
return
}
proxy := false
if msg.Header.ProxyID > 0 {
proxy = true
Log(Info, "Received introduction request via proxy")
if len(peer.Proxies) == 0 {
Log(Warning, "Peer %s has no proxies attached", id)
p.Dht.sendRequestProxy(id)
return
}
} else {
Log(Info, "Received introduction request directly")
}

response := p.PrepareIntroductionMessage(p.Dht.ID)
response.Header.ProxyID = uint16(peer.ProxyID)
if proxy {
response.Header.ProxyID = 1
for _, peerProxy := range peer.Proxies {
Log(Info, "Sending handshake response over proxy %s", peerProxy.String())
_, err := p.UDPSocket.SendMessage(response, peerProxy)
if err != nil {
Log(Error, "Failed to respond to introduction request over proxy: %v", err)
}
}
return
}
Log(Info, "Sending handshake response")
_, err := p.UDPSocket.SendMessage(response, srcAddr)
if err != nil {
Log(Error, "Failed to respond to introduction request: %v", err)
Expand All @@ -818,10 +834,17 @@ func (p *PeerToPeer) HandleIntroRequestMessage(msg *P2PMessage, srcAddr *net.UDP
func (p *PeerToPeer) HandleProxyMessage(msg *P2PMessage, srcAddr *net.UDPAddr) {
Log(Info, "New proxy message from %s", srcAddr)
for i, proxy := range p.Proxies {
Log(Info, "Proxy addr: %s", proxy.addr.String())
if proxy.addr.String() == srcAddr.String() {
p.Proxies[i].status = proxyActive
Log(Info, "Proxy addr: %s", proxy.Addr.String())
if proxy.Addr.String() == srcAddr.String() && proxy.Status == proxyConnecting {
p.Proxies[i].Status = proxyActive
Log(Info, "Connected with %s proxy", srcAddr.String())
addr, err := net.ResolveUDPAddr("udp4", string(msg.Data))
if err != nil {
Log(Error, "Failed to resolve proxy addr: %s", err)
return
}
Log(Info, "This peer is now available over %s", addr.String())
p.Dht.sendReportProxy(addr)
}
}
}
Expand Down Expand Up @@ -911,7 +934,7 @@ func (p *PeerToPeer) StopInstance() {
Log(Info, "Stopping P2P Message handler")
// Tricky part: we need to send a message to ourselves to quit blocking operation
msg := CreateTestP2PMessage(p.Crypter, "STOP", 1)
addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("127.0.0.1:%d", p.Dht.P2PPort))
addr, _ := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", p.Dht.P2PPort))
p.UDPSocket.SendMessage(msg, addr)
var ipIt = 200
if ip != nil {
Expand Down Expand Up @@ -976,43 +999,3 @@ func (p *PeerToPeer) handlePeerData(peerData NetworkPeer) {
p.Peers.Update(peer.ID, peer)
}
}

// ReadProxies - reads a list of proxies received by DHT client
/*
func (p *PeerToPeer) ReadProxies() {
for {
if p.Shutdown {
break
}
if p.Dht == nil {
time.Sleep(10 * time.Millisecond)
continue
}
select {
case proxy, hasData := <-p.Dht.ProxyChannel:
if hasData {
exists := false
peers := p.Peers.Get()
for i, peer := range peers {
if i == proxy.DestinationID {
peer.SetState(PeerStateHandshakingForwarder, p)
peer.Forwarder = proxy.Addr
peer.Endpoint = proxy.Addr
p.Peers.Update(i, peer)
}
}
if !exists {
Log(Info, "Received forwarder for unknown peer")
p.Dht.sendFind()
}
} else {
Log(Trace, "Closed channel")
}
default:
time.Sleep(100 * time.Millisecond)
}
}
Log(Info, "Stopped Proxy reader channel")
}
*/
16 changes: 9 additions & 7 deletions lib/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (np *NetworkPeer) stateHandshaking(ptpc *PeerToPeer) error {
np.SetState(PeerStateHandshakingFailed, ptpc)
return fmt.Errorf("Failed to handshake with peer %s", np.ID)
}
np.sendHandshake(ptpc)
np.sendHandshake(ptpc, false)
time.Sleep(time.Millisecond * 500)
}
return nil
Expand Down Expand Up @@ -392,7 +392,7 @@ func (np *NetworkPeer) stateHandshakingForwarder(ptpc *PeerToPeer) error {
np.SetState(PeerStateHandshakingFailed, ptpc)
return fmt.Errorf("Failed to handshake with peer %s", np.ID)
}
np.sendHandshake(ptpc)
np.sendHandshake(ptpc, true)
time.Sleep(time.Millisecond * 500)
}
}
Expand Down Expand Up @@ -458,21 +458,23 @@ func (np *NetworkPeer) ProbeLocalConnection(ptpc *PeerToPeer) bool {
return false
}

func (np *NetworkPeer) sendHandshake(ptpc *PeerToPeer) {
func (np *NetworkPeer) sendHandshake(ptpc *PeerToPeer, proxy bool) {
Log(Debug, "Preparing introduction message for %s", np.ID)
if ptpc.Dht.ID == "" {
np.LastError = "DHT Disconnected"
return
}
msg := CreateIntroRequest(ptpc.Crypter, ptpc.Dht.ID)
//msg.Header.ProxyID = uint16(np.ProxyID)
if proxy {
msg.Header.ProxyID = 1
}
_, err := ptpc.UDPSocket.SendMessage(msg, np.Endpoint)
if err != nil {
np.LastError = "Failed to send intoduction message"
Log(Error, "Failed to send introduction to %s", np.Endpoint.String())
} else {
Log(Debug, "Sent introduction handshake to %s [%s %d]", np.ID, np.Endpoint.String(), np.ProxyID)
return
}
Log(Info, "Sent introduction handshake to %s [%s %d]", np.ID, np.Endpoint.String(), np.ProxyID)
}

// SendProxyHandshake sends a handshake packet to a proxy
Expand Down Expand Up @@ -535,7 +537,7 @@ func (np *NetworkPeer) holePunch(endpoint *net.UDPAddr, ptpc *PeerToPeer) bool {
Log(Warning, "Stopping UDP hole punching to %s after timeout", endpoint.String())
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}
return false
}
Expand Down
Loading

0 comments on commit 30f5c03

Please sign in to comment.