forked from openp2p-cn/gateway-lite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
relayhandler.go
117 lines (111 loc) · 3.17 KB
/
relayhandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"time"
"github.com/openp2p-cn/totp"
)
type relayHandler struct {
}
func (h *relayHandler) handleMessage(ctx *msgContext) error {
head := openP2PHeader{}
err := binary.Read(bytes.NewReader(ctx.msg[:openP2PHeaderSize]), binary.LittleEndian, &head)
if err != nil {
return err
}
reqSess, ok := ctx.sess.(*wssSession)
if !ok {
gLog.Println(LvERROR, "interface conversion error")
return errors.New("interface conversion error")
}
switch head.SubType {
case MsgRelayNodeReq:
// gLogger.Println(LvINFO, string(msg))
// TODO: find one cone node NOT randomly. Group node, bandwidth, latency, online stable...
req := RelayNodeReq{}
err = json.Unmarshal(ctx.msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeReq:%s", err)
return err
}
var relayNodeName string
var relayNodeToken uint64
relayMode := "private"
peerNodeID := nodeNameToID(req.PeerNode)
gWSSessionMgr.allSessionsMtx.RLock()
defer gWSSessionMgr.allSessionsMtx.RUnlock()
peerSess, ok := gWSSessionMgr.allSessions[peerNodeID]
if !ok {
gLog.Printf(LvERROR, "request relay node error: %s offline", req.PeerNode)
return nil
}
peerIP := peerSess.IPv4
var relaySess *wssSession
gLog.Printf(LvINFO, "searching relay node: %s:%s---%s:%s", reqSess.node, reqSess.IPv4, req.PeerNode, peerIP)
// count fail try
failNum := 0
reqSess.failNodes.Range(func(k, v interface{}) bool {
ts := v.(time.Time)
if ts.After(time.Now().Add(-time.Hour)) { // count within 1h failure
failNum++
if failNum > MaxDirectTry {
return false
}
}
return true
})
if failNum > MaxDirectTry {
gLog.Println(LvINFO, "force search PUBLIC IP NODE")
}
// find usableNodes
// find relay node in this user
for _, sess := range gWSSessionMgr.allSessions {
if reqSess.majorVer != sess.majorVer {
continue
}
// exclude requester itself
if sess.user != reqSess.user || sess.natType == NATSymmetric {
continue
}
// these two network could not connect directly, so filter them
if sess.IPv4 == reqSess.IPv4 || sess.IPv4 == peerIP {
continue
}
// // filter failNodes
// if _, ok := reqSess.failNodes.Load(sess.nodeID); ok {
// continue
// }
if relaySess == nil {
relaySess = sess
} else if sess.relayTime.Before(relaySess.relayTime) { // find the most idle node
relaySess = sess
}
// relaytime > 30mins idle long enough, break the range
if sess.relayTime.Before(time.Now().Add(-time.Minute * 30)) {
break
}
}
if relaySess != nil {
relayNodeName = relaySess.node
t := totp.TOTP{Step: totp.RelayTOTPStep}
relayNodeToken = t.Gen(relaySess.token, time.Now().Unix())
relaySess.relayTime = time.Now()
if relaySess.user != reqSess.user {
relayMode = "public"
}
gLog.Printf(LvINFO, "got %s relay node %s:%d", relayMode, relayNodeName, relayNodeToken)
} else {
gLog.Printf(LvINFO, "no available relay node")
}
rsp := RelayNodeRsp{
Mode: relayMode,
RelayName: relayNodeName,
RelayToken: relayNodeToken}
reqSess.write(MsgRelay, MsgRelayNodeRsp, &rsp)
default:
return nil
}
return nil
}