Skip to content

Commit

Permalink
check remote service
Browse files Browse the repository at this point in the history
  • Loading branch information
TenderIronh committed Jul 26, 2024
1 parent 9dda148 commit 2dea3a7
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 58 deletions.
23 changes: 19 additions & 4 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package openp2p
import (
"encoding/json"
"flag"
"fmt"
"os"
"strconv"
"strings"
Expand All @@ -28,6 +29,7 @@ type AppConfig struct {
ForceRelay int // default:0 disable;1 enable
Enabled int // default:1
// runtime info
relayMode string // private|public
peerVersion string
peerToken uint64
peerNatType int
Expand Down Expand Up @@ -64,6 +66,13 @@ func (c *AppConfig) ID() uint64 {
return uint64(c.SrcPort)*10 + 1
}

func (c *AppConfig) LogPeerNode() string {
if c.relayMode == "public" { // memapp
return fmt.Sprintf("%d", NodeNameToID(c.PeerNode))
}
return c.PeerNode
}

type Config struct {
Network NetworkConfig `json:"network"`
Apps []*AppConfig `json:"apps"`
Expand Down Expand Up @@ -147,7 +156,7 @@ func (c *Config) retryApp(peerNode string) {
GNetwork.apps.Range(func(id, i interface{}) bool {
app := i.(*p2pApp)
if app.config.PeerNode == peerNode {
gLog.Println(LvDEBUG, "retry app ", peerNode)
gLog.Println(LvDEBUG, "retry app ", app.config.LogPeerNode())
app.config.retryNum = 0
app.config.nextRetryTime = time.Now()
app.retryRelayNum = 0
Expand All @@ -157,7 +166,7 @@ func (c *Config) retryApp(peerNode string) {
app.hbMtx.Unlock()
}
if app.config.RelayNode == peerNode {
gLog.Println(LvDEBUG, "retry app ", peerNode)
gLog.Println(LvDEBUG, "retry app ", app.config.LogPeerNode())
app.retryRelayNum = 0
app.nextRetryRelayTime = time.Now()
app.hbMtx.Lock()
Expand All @@ -171,7 +180,7 @@ func (c *Config) retryApp(peerNode string) {
func (c *Config) retryAllApp() {
GNetwork.apps.Range(func(id, i interface{}) bool {
app := i.(*p2pApp)
gLog.Println(LvDEBUG, "retry app ", app.config.PeerNode)
gLog.Println(LvDEBUG, "retry app ", app.config.LogPeerNode())
app.config.retryNum = 0
app.config.nextRetryTime = time.Now()
app.retryRelayNum = 0
Expand All @@ -189,7 +198,7 @@ func (c *Config) retryAllMemApp() {
if app.config.SrcPort != 0 {
return true
}
gLog.Println(LvDEBUG, "retry app ", app.config.PeerNode)
gLog.Println(LvDEBUG, "retry app ", app.config.LogPeerNode())
app.config.retryNum = 0
app.config.nextRetryTime = time.Now()
app.retryRelayNum = 0
Expand Down Expand Up @@ -246,6 +255,9 @@ func (c *Config) delete(app AppConfig) {
func (c *Config) save() {
// c.mtx.Lock()
// defer c.mtx.Unlock() // internal call
if c.Network.Token == 0 {
return
}
data, _ := json.MarshalIndent(c, "", " ")
err := os.WriteFile("config.json", data, 0644)
if err != nil {
Expand All @@ -256,6 +268,9 @@ func (c *Config) save() {
func (c *Config) saveCache() {
// c.mtx.Lock()
// defer c.mtx.Unlock() // internal call
if c.Network.Token == 0 {
return
}
data, _ := json.MarshalIndent(c, "", " ")
err := os.WriteFile("config.json0", data, 0644)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions core/errorcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ var (
ErrPeerConnectRelay = errors.New("peer connect relayNode error")
ErrBuildTunnelBusy = errors.New("build tunnel busy")
ErrMemAppTunnelNotFound = errors.New("memapp tunnel not found")
ErrRemoteServiceUnable = errors.New("remote service unable")
)
22 changes: 22 additions & 0 deletions core/handlepush.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -44,6 +45,7 @@ func handlePush(subType uint16, msg []byte) error {
config := AppConfig{}
config.PeerNode = req.RelayName
config.peerToken = req.RelayToken
config.relayMode = req.RelayMode
go func(r AddRelayTunnelReq) {
t, errDt := GNetwork.addDirectTunnel(config, 0)
if errDt == nil {
Expand Down Expand Up @@ -142,6 +144,8 @@ func handlePush(subType uint16, msg []byte) error {
err = handleLog(msg)
case MsgPushReportGoroutine:
err = handleReportGoroutine()
case MsgPushCheckRemoteService:
err = handleCheckRemoteService(msg)
case MsgPushEditApp:
err = handleEditApp(msg)
case MsgPushEditNode:
Expand Down Expand Up @@ -458,3 +462,21 @@ func handleReportGoroutine() (err error) {
stackLen := runtime.Stack(buf, true)
return GNetwork.write(MsgReport, MsgPushReportLog, string(buf[:stackLen]))
}

func handleCheckRemoteService(msg []byte) (err error) {
gLog.Println(LvDEBUG, "handleCheckRemoteService")
req := CheckRemoteService{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
rsp := PushRsp{Error: 0}
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.Host, req.Port), time.Second*3)
if err != nil {
rsp.Error = 1
rsp.Detail = ErrRemoteServiceUnable.Error()
} else {
conn.Close()
}
return GNetwork.write(MsgReport, MsgReportResponse, rsp)
}
28 changes: 14 additions & 14 deletions core/p2papp.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (app *p2pApp) checkDirectTunnel() error {
app.config.retryNum = 1
}
if app.config.retryNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s appid:%d disconnect, reconnecting the %d times...", app.config.PeerNode, app.id, app.config.retryNum)
gLog.Printf(LvINFO, "detect app %s appid:%d disconnect, reconnecting the %d times...", app.config.LogPeerNode(), app.id, app.config.retryNum)
}
app.config.retryNum++
app.config.retryTime = time.Now()
Expand All @@ -149,7 +149,7 @@ func (app *p2pApp) checkDirectTunnel() error {
app.config.errMsg = err.Error()
if err == ErrPeerOffline && app.config.retryNum > 2 { // stop retry, waiting for online
app.config.retryNum = retryLimit
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", app.config.PeerNode)
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", app.config.LogPeerNode())
}
if err == ErrBuildTunnelBusy {
app.config.retryNum--
Expand All @@ -174,7 +174,7 @@ func (app *p2pApp) buildDirectTunnel() error {
pn := GNetwork
initErr := pn.requestPeerInfo(&app.config)
if initErr != nil {
gLog.Printf(LvERROR, "%s init error:%s", app.config.PeerNode, initErr)
gLog.Printf(LvERROR, "%s init error:%s", app.config.LogPeerNode(), initErr)
return initErr
}
t, err = pn.addDirectTunnel(app.config, 0)
Expand Down Expand Up @@ -212,15 +212,15 @@ func (app *p2pApp) buildDirectTunnel() error {
AppID: app.id,
AppKey: app.key,
}
gLog.Printf(LvDEBUG, "sync appkey direct to %s", app.config.PeerNode)
gLog.Printf(LvDEBUG, "sync appkey direct to %s", app.config.LogPeerNode())
pn.push(app.config.PeerNode, MsgPushAPPKey, &syncKeyReq)
app.setDirectTunnel(t)

// if memapp notify peer addmemapp
if app.config.SrcPort == 0 {
req := ServerSideSaveMemApp{From: gConf.Network.Node, Node: gConf.Network.Node, TunnelID: t.id, RelayTunnelID: 0, AppID: app.id}
pn.push(app.config.PeerNode, MsgPushServerSideSaveMemApp, &req)
gLog.Printf(LvDEBUG, "push %s ServerSideSaveMemApp: %s", app.config.PeerNode, prettyJson(req))
gLog.Printf(LvDEBUG, "push %s ServerSideSaveMemApp: %s", app.config.LogPeerNode(), prettyJson(req))
}
gLog.Printf(LvDEBUG, "%s use tunnel %d", app.config.AppName, t.id)
return nil
Expand All @@ -244,7 +244,7 @@ func (app *p2pApp) checkRelayTunnel() error {
app.retryRelayNum = 1
}
if app.retryRelayNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s appid:%d relay disconnect, reconnecting the %d times...", app.config.PeerNode, app.id, app.retryRelayNum)
gLog.Printf(LvINFO, "detect app %s appid:%d relay disconnect, reconnecting the %d times...", app.config.LogPeerNode(), app.id, app.retryRelayNum)
}
app.setRelayTunnel(nil) // reset relayTunnel
app.retryRelayNum++
Expand All @@ -256,7 +256,7 @@ func (app *p2pApp) checkRelayTunnel() error {
app.errMsg = err.Error()
if err == ErrPeerOffline && app.retryRelayNum > 2 { // stop retry, waiting for online
app.retryRelayNum = retryLimit
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", app.config.PeerNode)
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", app.config.LogPeerNode())
}
}
if app.Tunnel() != nil {
Expand All @@ -282,7 +282,7 @@ func (app *p2pApp) buildRelayTunnel() error {
config := app.config
initErr := pn.requestPeerInfo(&config)
if initErr != nil {
gLog.Printf(LvERROR, "%s init error:%s", config.PeerNode, initErr)
gLog.Printf(LvERROR, "%s init error:%s", config.LogPeerNode(), initErr)
return initErr
}

Expand Down Expand Up @@ -318,7 +318,7 @@ func (app *p2pApp) buildRelayTunnel() error {
AppID: app.id,
AppKey: app.key,
}
gLog.Printf(LvDEBUG, "sync appkey relay to %s", config.PeerNode)
gLog.Printf(LvDEBUG, "sync appkey relay to %s", config.LogPeerNode())
pn.push(config.PeerNode, MsgPushAPPKey, &syncKeyReq)
app.setRelayTunnelID(rtid)
app.setRelayTunnel(t)
Expand All @@ -330,7 +330,7 @@ func (app *p2pApp) buildRelayTunnel() error {
if config.SrcPort == 0 {
req := ServerSideSaveMemApp{From: gConf.Network.Node, Node: relayNode, TunnelID: rtid, RelayTunnelID: t.id, AppID: app.id, RelayMode: relayMode}
pn.push(config.PeerNode, MsgPushServerSideSaveMemApp, &req)
gLog.Printf(LvDEBUG, "push %s relay ServerSideSaveMemApp: %s", config.PeerNode, prettyJson(req))
gLog.Printf(LvDEBUG, "push %s relay ServerSideSaveMemApp: %s", config.LogPeerNode(), prettyJson(req))
}
gLog.Printf(LvDEBUG, "%s use tunnel %d", app.config.AppName, t.id)
return nil
Expand Down Expand Up @@ -594,8 +594,8 @@ func (app *p2pApp) close() {
func (app *p2pApp) relayHeartbeatLoop() {
app.wg.Add(1)
defer app.wg.Done()
gLog.Printf(LvDEBUG, "%s appid:%d relayHeartbeat to rtid:%d start", app.config.PeerNode, app.id, app.rtid)
defer gLog.Printf(LvDEBUG, "%s appid:%d relayHeartbeat to rtid%d end", app.config.PeerNode, app.id, app.rtid)
gLog.Printf(LvDEBUG, "%s appid:%d relayHeartbeat to rtid:%d start", app.config.LogPeerNode(), app.id, app.rtid)
defer gLog.Printf(LvDEBUG, "%s appid:%d relayHeartbeat to rtid%d end", app.config.LogPeerNode(), app.id, app.rtid)

for app.running {
if app.RelayTunnel() == nil || !app.RelayTunnel().isRuning() {
Expand All @@ -606,11 +606,11 @@ func (app *p2pApp) relayHeartbeatLoop() {
AppID: app.id}
err := app.RelayTunnel().WriteMessage(app.rtid, MsgP2P, MsgRelayHeartbeat, &req)
if err != nil {
gLog.Printf(LvERROR, "%s appid:%d rtid:%d write relay tunnel heartbeat error %s", app.config.PeerNode, app.id, app.rtid, err)
gLog.Printf(LvERROR, "%s appid:%d rtid:%d write relay tunnel heartbeat error %s", app.config.LogPeerNode(), app.id, app.rtid, err)
return
}
// TODO: debug relay heartbeat
gLog.Printf(LvDEBUG, "%s appid:%d rtid:%d write relay tunnel heartbeat ok", app.config.PeerNode, app.id, app.rtid)
gLog.Printf(LvDEBUG, "%s appid:%d rtid:%d write relay tunnel heartbeat ok", app.config.LogPeerNode(), app.id, app.rtid)
time.Sleep(TunnelHeartbeatTime)
}
}
42 changes: 20 additions & 22 deletions core/p2pnetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (pn *P2PNetwork) run() {
}
gConf.retryAllApp()
case t := <-pn.tunnelCloseCh:
gLog.Printf(LvDEBUG, "got tunnelCloseCh %s", t.config.PeerNode)
gLog.Printf(LvDEBUG, "got tunnelCloseCh %s", t.config.LogPeerNode())
pn.apps.Range(func(id, i interface{}) bool {
app := i.(*p2pApp)
if app.DirectTunnel() == t {
Expand Down Expand Up @@ -195,12 +195,12 @@ func (pn *P2PNetwork) autorunApp() {
}

func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, string, error) {
gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.PeerNode)
defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode)
gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.LogPeerNode())
defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.LogPeerNode())
relayConfig := AppConfig{
PeerNode: config.RelayNode,
peerToken: config.peerToken}
relayMode := "private"
peerToken: config.peerToken,
relayMode: "private"}
if relayConfig.PeerNode == "" {
// find existing relay tunnel
pn.apps.Range(func(id, i interface{}) bool {
Expand All @@ -212,7 +212,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
return true
}
relayConfig.PeerNode = app.RelayTunnel().config.PeerNode
gLog.Printf(LvDEBUG, "found existing relay tunnel %s", relayConfig.PeerNode)
gLog.Printf(LvDEBUG, "found existing relay tunnel %s", relayConfig.LogPeerNode())
return false
})
if relayConfig.PeerNode == "" { // request relay node
Expand All @@ -231,11 +231,11 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
gLog.Printf(LvERROR, "MsgRelayNodeReq error")
return nil, 0, "", errors.New("MsgRelayNodeReq error")
}
gLog.Printf(LvDEBUG, "got relay node:%s", rsp.RelayName)
gLog.Printf(LvDEBUG, "got relay node:%s", relayConfig.LogPeerNode())

relayConfig.PeerNode = rsp.RelayName
relayConfig.peerToken = rsp.RelayToken
relayMode = rsp.Mode
relayConfig.relayMode = rsp.Mode
}

}
Expand All @@ -250,10 +250,10 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
From: gConf.Network.Node,
RelayName: relayConfig.PeerNode,
RelayToken: relayConfig.peerToken,
RelayMode: relayMode,
RelayMode: relayConfig.relayMode,
RelayTunnelID: t.id,
}
gLog.Printf(LvDEBUG, "push %s the relay node(%s)", config.PeerNode, relayConfig.PeerNode)
gLog.Printf(LvDEBUG, "push %s the relay node(%s)", config.LogPeerNode(), relayConfig.LogPeerNode())
pn.push(config.PeerNode, MsgPushAddRelayTunnelReq, &req)

// wait relay ready
Expand All @@ -267,13 +267,13 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
gLog.Println(LvDEBUG, ErrPeerConnectRelay)
return nil, 0, "", ErrPeerConnectRelay
}
return t, rspID.ID, relayMode, err
return t, rspID.ID, relayConfig.relayMode, err
}

// use *AppConfig to save status
func (pn *P2PNetwork) AddApp(config AppConfig) error {
gLog.Printf(LvINFO, "addApp %s to %s:%s:%d start", config.AppName, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LvINFO, "addApp %s to %s:%s:%d end", config.AppName, config.PeerNode, config.DstHost, config.DstPort)
gLog.Printf(LvINFO, "addApp %s to %s:%s:%d start", config.AppName, config.LogPeerNode(), config.DstHost, config.DstPort)
defer gLog.Printf(LvINFO, "addApp %s to %s:%s:%d end", config.AppName, config.LogPeerNode(), config.DstHost, config.DstPort)
if !pn.online {
return errors.New("P2PNetwork offline")
}
Expand Down Expand Up @@ -304,8 +304,8 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
}

func (pn *P2PNetwork) DeleteApp(config AppConfig) {
gLog.Printf(LvINFO, "DeleteApp %s to %s:%s:%d start", config.AppName, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LvINFO, "DeleteApp %s to %s:%s:%d end", config.AppName, config.PeerNode, config.DstHost, config.DstPort)
gLog.Printf(LvINFO, "DeleteApp %s to %s:%s:%d start", config.AppName, config.LogPeerNode(), config.DstHost, config.DstPort)
defer gLog.Printf(LvINFO, "DeleteApp %s to %s:%s:%d end", config.AppName, config.LogPeerNode(), config.DstHost, config.DstPort)
// close the apps of this config
i, ok := pn.apps.Load(config.ID())
if ok {
Expand Down Expand Up @@ -339,8 +339,8 @@ func (pn *P2PNetwork) findTunnel(peerNode string) (t *P2PTunnel) {
}

func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) {
gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort, tid)
defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort, tid)
gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d start", config.Protocol, config.SrcPort, config.LogPeerNode(), config.DstHost, config.DstPort, tid)
defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d end", config.Protocol, config.SrcPort, config.LogPeerNode(), config.DstHost, config.DstPort, tid)
isClient := false
// client side tid=0, assign random uint64
if tid == 0 {
Expand All @@ -360,12 +360,12 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne
// peer info
initErr := pn.requestPeerInfo(&config)
if initErr != nil {
gLog.Printf(LvERROR, "%s init error:%s", config.PeerNode, initErr)
gLog.Printf(LvERROR, "%s init error:%s", config.LogPeerNode(), initErr)

return nil, initErr
}
gLog.Printf(LvDEBUG, "config.peerNode=%s,config.peerVersion=%s,config.peerIP=%s,config.peerLanIP=%s,gConf.Network.publicIP=%s,config.peerIPv6=%s,config.hasIPv4=%d,config.hasUPNPorNATPMP=%d,gConf.Network.hasIPv4=%d,gConf.Network.hasUPNPorNATPMP=%d,config.peerNatType=%d,gConf.Network.natType=%d,",
config.PeerNode, config.peerVersion, config.peerIP, config.peerLanIP, gConf.Network.publicIP, config.peerIPv6, config.hasIPv4, config.hasUPNPorNATPMP, gConf.Network.hasIPv4, gConf.Network.hasUPNPorNATPMP, config.peerNatType, gConf.Network.natType)
config.LogPeerNode(), config.peerVersion, config.peerIP, config.peerLanIP, gConf.Network.publicIP, config.peerIPv6, config.hasIPv4, config.hasUPNPorNATPMP, gConf.Network.hasIPv4, gConf.Network.hasUPNPorNATPMP, config.peerNatType, gConf.Network.natType)
// try Intranet
if config.peerIP == gConf.Network.publicIP && compareVersion(config.peerVersion, SupportIntranetVersion) >= 0 { // old version client has no peerLanIP
gLog.Println(LvINFO, "try Intranet")
Expand Down Expand Up @@ -624,8 +624,6 @@ func (pn *P2PNetwork) handleMessage(msg []byte) {
gLog.Printf(LvERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail)
pn.running = false
} else {
gConf.Network.Token = rsp.Token
gConf.Network.User = rsp.User
gConf.setToken(rsp.Token)
gConf.setUser(rsp.User)
if len(rsp.Node) >= MinNodeNameLen {
Expand Down Expand Up @@ -726,7 +724,7 @@ func (pn *P2PNetwork) relay(to uint64, body []byte) error {
}

func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error {
gLog.Printf(LvDEBUG, "push msgType %d to %s", subType, to)
// gLog.Printf(LvDEBUG, "push msgType %d to %s", subType, to)
if !pn.online {
return errors.New("client offline")
}
Expand Down
Loading

0 comments on commit 2dea3a7

Please sign in to comment.