Skip to content

Commit

Permalink
Merge pull request #440 from subutai-io/new-proxy-scheme
Browse files Browse the repository at this point in the history
New proxy scheme
  • Loading branch information
crioto authored Nov 16, 2017
2 parents 1ef20b3 + 8ce8547 commit da6392a
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 209 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Change Log

## [6.2.0-dev] 11/13/2017
## [6.2.1] 11/16/2017

* New scheme for traffic forwarders
* Modified DHT message
* Implemented new DHT message type - RequestProxy and ReportProxy

## [6.2.0] 11/13/2017

* Switched from UDP to TCP in communication with Bootstrap nodes
* Refactored bootstrap packets
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.2.0-dev
6.2.1
119 changes: 67 additions & 52 deletions lib/dht.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 38 additions & 24 deletions lib/dht2.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type DHTClient struct {
IP net.IP // IP of local interface received from DHCP or specified manually
Network *net.IPNet // Network information about current network. Used to inform p2p about mask for interface
StateChannel chan RemotePeerState // Channel to pass states to instance
ProxyChannel chan Forwarder // Channel to pass proxies to instance
ProxyChannel chan string // Channel to pass proxies to instance
isShutdown bool // Whether DHT shutting down or not
PeerData chan NetworkPeer // Channel to pass data about changes in peers
Connected bool // Whether connection with bootstrap nodes established or not
Expand Down Expand Up @@ -82,7 +82,7 @@ func (dht *DHTClient) TCPInit(hash, routers string) error {
// dht.RemovePeerChan = make(chan string)
//dht.PeerChannel = make(chan []PeerIP)
dht.StateChannel = make(chan RemotePeerState)
dht.ProxyChannel = make(chan Forwarder)
dht.ProxyChannel = make(chan string)
dht.PeerData = make(chan NetworkPeer)
dht.NetworkHash = hash
dht.Routers = routers
Expand All @@ -93,26 +93,6 @@ func (dht *DHTClient) TCPInit(hash, routers string) error {
return nil
}

func (dht *DHTClient) setupTCPCallbacks() {
dht.TCPCallbacks = make(map[DHTPacketType]dhtCallback)
dht.TCPCallbacks[DHTPacketType_BadProxy] = dht.packetBadProxy
dht.TCPCallbacks[DHTPacketType_Connect] = dht.packetConnect
dht.TCPCallbacks[DHTPacketType_DHCP] = dht.packetDHCP
dht.TCPCallbacks[DHTPacketType_Error] = dht.packetError
dht.TCPCallbacks[DHTPacketType_Find] = dht.packetFind
dht.TCPCallbacks[DHTPacketType_Forward] = dht.packetForward
dht.TCPCallbacks[DHTPacketType_Node] = dht.packetNode
dht.TCPCallbacks[DHTPacketType_Notify] = dht.packetNotify
dht.TCPCallbacks[DHTPacketType_Ping] = dht.packetPing
dht.TCPCallbacks[DHTPacketType_Proxy] = dht.packetProxy
dht.TCPCallbacks[DHTPacketType_RegisterProxy] = dht.packetRegisterProxy
dht.TCPCallbacks[DHTPacketType_ReportLoad] = dht.packetReportLoad
dht.TCPCallbacks[DHTPacketType_State] = dht.packetState
dht.TCPCallbacks[DHTPacketType_Stop] = dht.packetStop
dht.TCPCallbacks[DHTPacketType_Unknown] = dht.packetUnknown
dht.TCPCallbacks[DHTPacketType_Unsupported] = dht.packetUnsupported
}

func (dht *DHTClient) TCPConnect() error {
// Close every open connection
for _, con := range dht.TCPConnection {
Expand Down Expand Up @@ -303,9 +283,43 @@ func (dht *DHTClient) sendDHCP(ip net.IP, network *net.IPNet) error {
return dht.send(data)
}

func (dht *DHTClient) sendProxy(id string) error {
func (dht *DHTClient) sendProxy() error {
Log(Debug, "Requesting proxies")
packet := &DHTPacket{
Type: DHTPacketType_Proxy,
Id: dht.ID,
}
data, err := proto.Marshal(packet)
if err != nil {
return fmt.Errorf("Failed to marshal DHCP packet: %s", err)
}
return dht.send(data)
}

return nil
func (dht *DHTClient) sendRequestProxy(id string) error {
packet := &DHTPacket{
Type: DHTPacketType_RequestProxy,
Id: dht.ID,
Data: id,
}
data, err := proto.Marshal(packet)
if err != nil {
return fmt.Errorf("Failed to marshal DHCP packet: %s", err)
}
return dht.send(data)
}

func (dht *DHTClient) sendReportProxy(addr *net.UDPAddr) error {
packet := &DHTPacket{
Type: DHTPacketType_ReportProxy,
Id: dht.ID,
Data: addr.String(),
}
data, err := proto.Marshal(packet)
if err != nil {
return fmt.Errorf("Failed to marshal DHCP packet: %s", err)
}
return dht.send(data)
}

func (dht *DHTClient) Shutdown() {
Expand Down
52 changes: 51 additions & 1 deletion lib/dht_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@ import (
"strconv"
)

func (dht *DHTClient) setupTCPCallbacks() {
dht.TCPCallbacks = make(map[DHTPacketType]dhtCallback)
dht.TCPCallbacks[DHTPacketType_BadProxy] = dht.packetBadProxy
dht.TCPCallbacks[DHTPacketType_Connect] = dht.packetConnect
dht.TCPCallbacks[DHTPacketType_DHCP] = dht.packetDHCP
dht.TCPCallbacks[DHTPacketType_Error] = dht.packetError
dht.TCPCallbacks[DHTPacketType_Find] = dht.packetFind
dht.TCPCallbacks[DHTPacketType_Forward] = dht.packetForward
dht.TCPCallbacks[DHTPacketType_Node] = dht.packetNode
dht.TCPCallbacks[DHTPacketType_Notify] = dht.packetNotify
dht.TCPCallbacks[DHTPacketType_Ping] = dht.packetPing
dht.TCPCallbacks[DHTPacketType_Proxy] = dht.packetProxy
dht.TCPCallbacks[DHTPacketType_RequestProxy] = dht.packetRequestProxy
dht.TCPCallbacks[DHTPacketType_ReportProxy] = dht.packetReportProxy
dht.TCPCallbacks[DHTPacketType_RegisterProxy] = dht.packetRegisterProxy
dht.TCPCallbacks[DHTPacketType_ReportLoad] = dht.packetReportLoad
dht.TCPCallbacks[DHTPacketType_State] = dht.packetState
dht.TCPCallbacks[DHTPacketType_Stop] = dht.packetStop
dht.TCPCallbacks[DHTPacketType_Unknown] = dht.packetUnknown
dht.TCPCallbacks[DHTPacketType_Unsupported] = dht.packetUnsupported
}

func (dht *DHTClient) packetBadProxy(packet *DHTPacket) error {
return nil
}
Expand All @@ -17,7 +39,7 @@ func (dht *DHTClient) packetConnect(packet *DHTPacket) error {
}
dht.ID = packet.Id
Log(Info, "Received personal ID for this session: %s", dht.ID)

dht.sendProxy()
return dht.sendFind()
}

Expand Down Expand Up @@ -101,10 +123,38 @@ func (dht *DHTClient) packetPing(packet *DHTPacket) error {
}

func (dht *DHTClient) packetProxy(packet *DHTPacket) error {
Log(Info, "Received list of proxies")
for _, proxy := range packet.Arguments {
dht.ProxyChannel <- proxy
}
return nil
}

// packetRequestProxy received when we was requesting proxy to connect to some peer
func (dht *DHTClient) packetRequestProxy(packet *DHTPacket) error {
list := []*net.UDPAddr{}
for _, proxy := range packet.Proxies {
addr, err := net.ResolveUDPAddr("udp", proxy)
if err != nil {
Log(Error, "Can't parse proxy %s for peer %s", proxy, packet.Data)
continue
}
list = append(list, addr)
}
peer := NetworkPeer{ID: packet.Data, Proxies: list}
dht.PeerData <- peer
return nil
}

func (dht *DHTClient) packetReportProxy(packet *DHTPacket) error {
Log(Info, "DHT confirmed proxy registration")
return nil
}

func (dht *DHTClient) packetRegisterProxy(packet *DHTPacket) error {
if packet.Data == "OK" {
Log(Info, "Proxy registration confirmed")
}
return nil
}

Expand Down
Loading

0 comments on commit da6392a

Please sign in to comment.