Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
yyk808 committed Jul 23, 2024
1 parent 4f429b2 commit b8a9c5b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 39 deletions.
11 changes: 3 additions & 8 deletions agent/apps/ztm/tunnel/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ export default function ({ app, mesh, punch }) {
if (ep === app.endpoint.id) {
return getLocalConfig().then(
config => {
var inbound = config.inbound.find(
config.inbound.find(
i => i.protocol === protocol && i.name === name
) || null
var hole = punch.findHole(ep)
}
)
} else {
Expand Down Expand Up @@ -227,8 +226,6 @@ export default function ({ app, mesh, punch }) {
currentListens = []
currentTargets = {}

console.info(`Applying config: ${JSON.encode(config)}`)

config.inbound.forEach(i => {
var protocol = i.protocol
var name = i.name
Expand All @@ -245,10 +242,10 @@ export default function ({ app, mesh, punch }) {
punch.createInboundHole($selectedEP)
var hole = punch.findHole($selectedEP)
if(hole && hole.ready()) {
console.info("Using direct session")
app.log("Using direct session")
return hole.directSession()
}
console.info("Using hub forwarded session")
app.log("Using hub forwarded session")
return pipeline($=>$
.muxHTTP().to($=>$
.pipe(mesh.connect($selectedEP))
Expand Down Expand Up @@ -341,7 +338,6 @@ export default function ({ app, mesh, punch }) {
return new StreamEnd
})
.onEnd(() => {
console.info('Answers in api: ', $response)
return $response
})
).spawn()
Expand Down Expand Up @@ -397,7 +393,6 @@ export default function ({ app, mesh, punch }) {
var tunnelHole = null
var makeRespTunnel = pipeline($=>$
.onStart(ctx => {
console.info("Making resp tunnel: ", ctx)
var ep = ctx.peer.id
tunnelHole = punch.findHole(ep)
if(!tunnelHole) throw `Invalid Hole State for ${ep}`
Expand Down
5 changes: 2 additions & 3 deletions agent/apps/ztm/tunnel/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export default function ({ app, mesh, utils }) {
var ip = $ctx.peer.ip
var port = $ctx.peer.port

console.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
app.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
switch(action) {
case 'leave':
api.deleteHole(ep, true)
Expand All @@ -178,8 +178,7 @@ export default function ({ app, mesh, utils }) {
var ip = $ctx.peer.ip
var port = $ctx.peer.port

console.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
console.log("Punch req: ", obj)
app.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
switch(action) {
case 'request':
api.createHole(ep, 'server')
Expand Down
48 changes: 20 additions & 28 deletions agent/apps/ztm/tunnel/punch.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default function ({ app, mesh }) {


// Check if ep is self.
console.info(`Creating hole to peer ${ep}, bound ${bound}`)
app.log(`Creating hole to peer ${ep}, bound ${bound}`)
if (ep === app.endpoint.id) {
throw 'Must not create a hole to self'
}
Expand Down Expand Up @@ -56,7 +56,7 @@ export default function ({ app, mesh }) {
.connectTLS({
...tlsOptions,
onState: tls => {
console.info('TLS State: ', tls)
app.log(`TLS State: ${tls.state}`)
if($connection.state === 'connected' && tls.state === 'connected') {
app.log(`Connected TLS to peer ${destIP}:${destPort}`)
state = 'connected'
Expand All @@ -72,8 +72,7 @@ export default function ({ app, mesh }) {
.connect(() => `${destIP}:${destPort}`, {
bind: bound,
onState: function (conn) {
console.info("Conn Info: ", conn)

app.log(`Connection State: ${conn.state}`)
if (conn.state === 'open') {
conn.socket.setRawOption(1, 15, new Data([1, 0, 0, 0]))
} else if (conn.state === 'connected') {
Expand All @@ -88,13 +87,13 @@ export default function ({ app, mesh }) {

// Max Retry set to 10
if (retryTimes > 10 || state === 'fail') {
console.info(`Retry limit exceeded, punch failed.`)
app.log(`Retry limit exceeded, punch failed.`)
state = 'fail'
updateHoles()
}
},
})
.handleStreamEnd(evt => console.info('Hole connection end, retry: ', retryTimes + 1, ' reason: ', evt?.error))
.handleStreamEnd(evt => app.log(`Hole connection end, retry: ${retryTimes + 1}, reason: ${evt?.error}`))
)
)
)
Expand Down Expand Up @@ -127,17 +126,15 @@ export default function ({ app, mesh }) {
var listen = pipeline($ => $
.acceptTLS({
...tlsOptions,
onState: tls => console.info('TLS State: ', tls)
onState: tls => app.log(`TLS State: ${tls.state}`)
}).to($ => $
.handleMessage(msg => {
console.info('Server Received: ', msg)
$msg = msg
return new Data
}).pipe(() => svc(buildCtx())), () => $msg
)
)

console.info("Direct Server Listening...")
pipy.listen(bound, 'tcp', listen)

session = pipeline($ => $
Expand Down Expand Up @@ -168,7 +165,6 @@ export default function ({ app, mesh }) {
return new StreamEnd
})
.onEnd(() => {
console.info('Answers in hole: ', $response, store)
if (callback)
callback($response)
return $response
Expand All @@ -182,7 +178,7 @@ export default function ({ app, mesh }) {
state = 'handshake'
var start = Date.now()

console.info("Requesting punch")
app.log("Requesting punch")
request(new Message({
method: 'POST',
path: '/api/punch/request',
Expand All @@ -192,7 +188,7 @@ export default function ({ app, mesh }) {
})), (resp) => {
var end = Date.now()
rtt = (end - start) / 2000
console.info('Estimated RTT: ', rtt)
app.log(`Estimated RTT: ${2 * rtt}`)

if (resp.head.status != 200) {
app.log(`Failed on requesting`)
Expand All @@ -208,7 +204,7 @@ export default function ({ app, mesh }) {
state = 'handshake'
var start = Date.now()

console.info("Accepting punch")
app.log("Accepting punch")
request(new Message({
method: 'POST',
path: '/api/punch/accept',
Expand All @@ -218,7 +214,7 @@ export default function ({ app, mesh }) {
})), (resp) => {
var end = Date.now()
rtt = (end - start) / 2000
console.info('Estimated RTT: ', rtt)
app.log(`Estimated RTT: ${2 * rtt}`)

if (!resp || resp.head.status != 200) {
app.log(`Failed on accepting`)
Expand Down Expand Up @@ -255,12 +251,11 @@ export default function ({ app, mesh }) {

function addPeerCert(cert) {
var peerCert = new crypto.Certificate(cert)
console.info("TLS: ", tlsOptions)
tlsOptions['trusted'] = [peerCert]
}

function updateNatInfo(ip, port) {
console.info(`Peer NAT Info: ${ip}:${port}`)
app.log(`Peer NAT Info: ${ip}:${port}`)
destIP = ip
destPort = port
}
Expand All @@ -271,22 +266,22 @@ export default function ({ app, mesh }) {
function punch() {
state = 'punching'

console.info(`Punching to ${destIP}:${destPort} (${ep})`)
app.log(`Punching to ${destIP}:${destPort} (${ep})`)
if (role === 'server') {
makeFakeCall(destIP, destPort)
}
directSession()
}

function makeRespTunnel() {
console.info("Created Resp Tunnel")
app.log("Created Resp Tunnel")
state = 'connected'

return pipeline($ => $
.acceptHTTPTunnel(() => new Message({ status: 200 })).to($ => $
.onStart(new Data)
.swap(() => pHub)
.onEnd(() => console.info(`Direct Connection from ${ep} lost`))
.onEnd(() => app.log(`Direct Connection from ${ep} lost`))
)
)
}
Expand All @@ -297,7 +292,7 @@ export default function ({ app, mesh }) {
// The hole has been released.
return
} else if (state != 'connected') {
console.info(`Current state ${state}, made the hole failed`)
app.log(`Current state ${state}, force the hole failed`)
state = 'fail'
updateHoles()
}
Expand All @@ -306,7 +301,7 @@ export default function ({ app, mesh }) {
// send a SYN to dest, expect no return.
// this will cheat the firewall to allow inbound connection from peer.
function makeFakeCall(destIP, destPort) {
console.info("Making fake call")
app.log("Making fake call")
pipeline($ => $
.onStart(new Data).connect(`${destIP}:${destPort}`, {
bind: bound,
Expand All @@ -316,7 +311,6 @@ export default function ({ app, mesh }) {

// abort this connection.
if (conn.state === 'connecting') {
console.info('Performing early close')
conn.close()
}
}
Expand Down Expand Up @@ -366,7 +360,6 @@ export default function ({ app, mesh }) {
var round = 0
var cont = true

console.info('Pacemaking......')
pipeline($ => $
.onStart(new Data)
.repeat(() => {
Expand All @@ -382,7 +375,6 @@ export default function ({ app, mesh }) {
cont = false
heartbeat(false)
}
console.info('Pacemaker: ', resp)
return new StreamEnd
})
)
Expand Down Expand Up @@ -435,17 +427,17 @@ export default function ({ app, mesh }) {
fails[key] += 1
}
})
console.info(`Holes after updating: `, holes)
app.log(`Holes after updating: ${holes.size}`)
}

function createInboundHole(ep) {
updateHoles()
if (findHole(ep)) return
if (fails[ep] && fails[ep] >= 3) {
console.info(`Won't create hole to ${ep}, too many fails!`)
app.log(`Won't create hole to ${ep}, too many fails!`)
return
}
console.info(`Creating Inbound Hole to ${ep}`)
app.log(`Creating Inbound Hole to ${ep}`)
try {
var hole = Hole(ep)
hole.requestPunch()
Expand All @@ -461,7 +453,7 @@ export default function ({ app, mesh }) {
function createOutboundHole(ep, natIp, natPort) {
updateHoles()
if (findHole(ep)) return
console.info(`Creating Outbound Hole to ${ep}`)
app.log(`Creating Outbound Hole to ${ep}`)
try {
var hole = Hole(ep)
hole.acceptPunch()
Expand Down

0 comments on commit b8a9c5b

Please sign in to comment.