Skip to content

Commit

Permalink
Merge pull request #892 from subutai-io/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
crioto authored Apr 25, 2018
2 parents 75baf33 + 36d9d9a commit d28985a
Show file tree
Hide file tree
Showing 9 changed files with 410 additions and 133 deletions.
7 changes: 0 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ BRANCH=$(shell git rev-parse --abbrev-ref HEAD)
NAME_PREFIX=p2p
NAME_BASE=p2p
SOURCES=instance.go main.go rest.go start.go stop.go show.go set.go status.go debug.go daemon.go dht_connection.go dht_router.go
#DHT=mdht.subut.ai:6881
#ifeq ($(BRANCH),HEAD)
# DHT=mdht.subut.ai:6881
# SCHEME=
#else
# SCHEME=-$(BRANCH)
#endif


sinclude config.make
Expand Down
30 changes: 28 additions & 2 deletions daemon.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package main

import (
"errors"
"fmt"
"net"
"os"
"os/signal"
"runtime/pprof"
"strings"
"time"

ptp "github.com/subutai-io/p2p/lib"
)

var (
errEmptyDHTEndpoint = errors.New("DHT endpoint wasn't specified")
errBadDHTEndpoint = errors.New("Endpoint have wrong format")
)

type DaemonArgs struct {
IP string `json:"ip"`
Mac string `json:"mac"`
Expand All @@ -33,7 +40,10 @@ type DaemonArgs struct {
var bootstrap DHTConnection

// ExecDaemon starts P2P daemon
func ExecDaemon(port int, sFile, profiling, syslog string) {
func ExecDaemon(port int, dht, sFile, profiling, syslog string) {
if validateDHT(dht) != nil {
os.Exit(213)
}
if syslog != "" {
ptp.SetSyslogSocket(syslog)
}
Expand All @@ -59,7 +69,7 @@ func ExecDaemon(port int, sFile, profiling, syslog string) {

ReadyToServe = false

err := bootstrap.init(DefaultDHT)
err := bootstrap.init(dht)
if err != nil {
ptp.Log(ptp.Error, "Failed to initialize bootstrap node connection")
os.Exit(152)
Expand Down Expand Up @@ -140,3 +150,19 @@ func ExecDaemon(port int, sFile, profiling, syslog string) {
}()
select {}
}

func validateDHT(dht string) error {
if dht == "" {
ptp.Log(ptp.Error, "Empty bootstrap list")
return errEmptyDHTEndpoint
}
eps := strings.Split(dht, ",")
for _, ep := range eps {
_, err := net.ResolveTCPAddr("tcp4", ep)
if err != nil {
ptp.Log(ptp.Error, "Bootstrap %s have bad format or wrong address: %s", ep, err)
return errBadDHTEndpoint
}
}
return nil
}
23 changes: 23 additions & 0 deletions daemon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package main

import (
"testing"
)

func TestValidateDHT(t *testing.T) {
if validateDHT("") != errEmptyDHTEndpoint {
t.Fatalf("DHT Validate: providing empty list doesn't generate proper error")
}
if validateDHT("google.com") != errBadDHTEndpoint {
t.Fatalf("Providing URL without port doesn't generate expected error")
}
if validateDHT("iamnotexist.atall:80") != errBadDHTEndpoint {
t.Fatalf("Providing non existing URL doesn't generate expected error")
}
if validateDHT("google.com:80") != nil {
t.Fatalf("Providing correct endpoint generates error")
}
if validateDHT("google.com:80,yandex.ru:80") != nil {
t.Fatalf("Providing correct endpoints generates error")
}
}
32 changes: 20 additions & 12 deletions lib/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type DHTClient struct {
IP net.IP // IP of local interface received from DHCP or specified manually
Network *net.IPNet // Network information about current network. Used to inform p2p about mask for interface
Connected bool // Whether connection with bootstrap nodes established or not
//isShutdown bool // Whether DHT shutting down or not
//isShutdown bool // Whether DHT shutting down or not
LastUpdate time.Time // When last `find` packet was sent
OutboundIP net.IP // Outbound IP
ListenerIsRunning bool // True if listener is runnning
Expand Down Expand Up @@ -80,6 +80,9 @@ func (dht *DHTClient) Connect(ipList []net.IP, proxyList []*proxyServer) error {
ips := []string{}
proxies := []string{}
for _, ip := range ipList {
if ip == nil {
continue
}
skip := false
for _, a := range ActiveInterfaces {
if a.Equal(ip) {
Expand Down Expand Up @@ -111,7 +114,7 @@ func (dht *DHTClient) Connect(ipList []net.IP, proxyList []*proxyServer) error {
}
// Waiting for 3 seconds to get connection confirmation
sent := time.Now()
for time.Since(sent) < time.Duration(5000*time.Millisecond) {
for time.Since(sent) < time.Duration(5000 * time.Millisecond) {
if dht.Connected {
return nil
}
Expand All @@ -122,14 +125,17 @@ func (dht *DHTClient) Connect(ipList []net.IP, proxyList []*proxyServer) error {
}

func (dht *DHTClient) read() (*DHTPacket, error) {
if dht.IncomingData == nil {
return nil, fmt.Errorf("Trying to receive DHTPacket from closed channel")
}
packet := <-dht.IncomingData
if packet == nil {
return nil, fmt.Errorf("Received nil packet: channel is closed")
}
return packet, nil
}

// Sends bytes to all connected bootstrap nodes
// send sends bytes to all connected bootstrap nodes
func (dht *DHTClient) send(packet *DHTPacket) error {
// if dht.OutgoingData != nil && !dht.isShutdown {
if dht.OutgoingData != nil {
Expand Down Expand Up @@ -166,8 +172,7 @@ func (dht *DHTClient) send(packet *DHTPacket) error {
return nil
}

// This method will send request for network peers known to BSN
// As a response BSN will send array of IDs of peers in this swarm
// sendFind will send request for network peers known to BSN. As a response BSN will send array of IDs of peers in this swarm
func (dht *DHTClient) sendFind() error {
dht.LastUpdate = time.Now()
if dht.NetworkHash == "" {
Expand All @@ -183,7 +188,7 @@ func (dht *DHTClient) sendFind() error {
return dht.send(packet)
}

// This method will send request of IPs of particular peer known to BSN
// sendNode will send request of IPs of particular peer known to BSN
func (dht *DHTClient) sendNode(id string, ipList []net.IP) error {
if len(id) != 36 {
return fmt.Errorf("Failed to send node: Malformed ID %s", id)
Expand Down Expand Up @@ -263,6 +268,9 @@ func (dht *DHTClient) sendProxy() error {
}

func (dht *DHTClient) sendRequestProxy(id string) error {
if len(id) != 36 {
return fmt.Errorf("Failed to send requst proxy: Malformed ID")
}
packet := &DHTPacket{
Type: DHTPacketType_RequestProxy,
Id: dht.ID,
Expand All @@ -288,8 +296,7 @@ func (dht *DHTClient) sendReportProxy(addr []*net.UDPAddr) error {
return dht.send(packet)
}

// Close will close all connections and switch DHT object to
// shutdown mode, which will terminate every loop/goroutine
// Close will close all connections and switch DHT object to shutdown mode, which will terminate every loop/goroutine
func (dht *DHTClient) Close() error {
if dht.IncomingData != nil {
close(dht.IncomingData)
Expand All @@ -303,8 +310,8 @@ func (dht *DHTClient) Close() error {
return nil
}

// WaitID will block DHT until valid instance ID is received from Bootstrap node
// or specified timeout passes.
// TODO: Refactor
// WaitID will block DHT until valid instance ID is received from Bootstrap node or specified timeout passes.
func (dht *DHTClient) WaitID() error {
started := time.Now()
period := time.Duration(time.Second * 10)
Expand All @@ -321,8 +328,8 @@ func (dht *DHTClient) WaitID() error {
return nil
}

// RegisterProxy will register current node as a proxy on
// bootstrap node
// TODO: Refactor
// RegisterProxy will register current node as a proxy on bootstrap node
func (dht *DHTClient) RegisterProxy(ip net.IP, port int) error {
id, err := uuid.NewTimeBased()
if err != nil {
Expand All @@ -339,6 +346,7 @@ func (dht *DHTClient) RegisterProxy(ip net.IP, port int) error {
return dht.send(packet)
}

// TODO: Refactor
// ReportLoad will send amount of tunnels created on particular proxy
func (dht *DHTClient) ReportLoad(clientsNum int) error {
return nil
Expand Down
Loading

0 comments on commit d28985a

Please sign in to comment.