diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d59a9f1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +__debug_bin +__debug_bin.exe +# .vscode +openp2p +openp2p.exe* +*.log +go.sum +*.tar.gz +*.zip +*.exe \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0e58450 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 OpenP2P.cn + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README-ZH.md b/README-ZH.md new file mode 100644 index 0000000..54a48d6 --- /dev/null +++ b/README-ZH.md @@ -0,0 +1,119 @@ +[English](/README.md)|中文 +## OpenP2P是什么 +它是一个开源、免费、轻量级的P2P共享网络。任何设备接入OpenP2P,就可以随时随地访问它们。 + +## 为什么选择OpenP2P +### 免费 +完全免费,满足大部分用户的核心白票需求。不像其它类似的产品,我们不需要有公网IP的服务器,不需要花钱买服务。 +### 安全 +代码开源,接受各位大佬检验。下面详细展开 +### 轻量 +文件大小2MB+,运行内存2MB+;全部在应用层实现,没有虚拟网卡,没有内核程序 +### 跨平台 +因为轻量,所以很容易支持各个平台。支持主流的操作系统:Windows,Linux,MacOS;和主流的cpu架构:386、amd64、arm、arm64、mipsle、mipsle64、mips、mips64 +### 高效 +P2P直连可以让你的设备跑满带宽。不论你的设备在任何网络环境,无论NAT1-4(Cone或Symmetric),都支持。依靠Quic协议优秀的拥塞算法,能在糟糕的网络环境获得高带宽低延时。 + +### 二次开发 +基于OpenP2P只需数行代码,就能让原来只能局域网通信的程序,变成任何内网都能通信 + +## 快速入门 +以一个最常见的例子说明OpenP2P如何使用:远程办公,在家里连入办公室Windows电脑。 +相信很多人在疫情下远程办公是刚需。 +1. 先确认办公室电脑已开启远程桌面功能(如何开启参考官方说明https://docs.microsoft.com/zh-cn/windows-server/remote/remote-desktop-services/clients/remote-desktop-allow-access) +2. 在办公室下载最新的OpenP2P(补上URL),解压出来,在命令行执行 + ``` + openp2p.exe -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1 + ``` + + `切记将标记大写的参数改成自己的` + + ![image](/doc/images/officelisten.png) +3. 在家里下载最新的OpenP2P(补上URL),解压出来,在命令行执行 + ``` + openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 --peernode OFFICEPC1 --dstip 127.0.0.1 --dstport 3389 --srcport 23389 --protocol tcp + ``` + `切记将标记大写的参数改成自己的` + ![image](/doc/images/homeconnect.png) + ![image](/doc/images/mem.png) + `LISTEN ON PORT 23389 START` 看到这行日志表示P2PApp建立成功,监听23389端口。只需连接本机的127.0.0.1:23389就相当于连接公司Windows电脑的3389端口。 + +4. 在家里Windows电脑,按Win+R输入mstsc打开远程桌面,输入127.0.0.1:23389 /admin + ![image](/doc/images/mstscconnect.png) + + ![image](/doc/images/afterconnect.png) + +## [详细使用说明](/USAGE-ZH.md) +## 典型应用场景 +特别适合大流量的内网访问 +### 远程办公 +Windows MSTSC、VNC等远程桌面,SSH,内网各种ERP系统 +### 远程访问NAS +管理大量视频、图片 +### 远程监控摄像头 +### 远程刷机 +### 远程数据备份 +--- +## 概要设计 +### 原型 +![image](/doc/images/prototype.png) +### 客户端架构 +![image](/doc/images/architecture.png) +### P2PApp +它是项目里最重要的概念,一个P2PApp就是把远程的一个服务(mstsc/ssh等)通过P2P网络映射到本地监听。二次开发或者我们提供的Restful API,主要工作就是管理P2PApp +![image](/doc/images/appdetail.png) +## 共享 +默认会开启共享限速10mbps,只有你用户下提供了共享节点才能使用别人的共享节点。这非常公平,也是这个项目的初衷。 +我们建议你在带宽足够的地方(比如办公室,家里的百兆光纤)加入共享网络。 +如果你仍然不想共享任何节点,请查看运行参数 +## 安全性 +加入OpenP2P共享网络的节点,只能凭授权访问。共享节点只会中转数据,别人无法访问内网任何资源。 +### TLS1.3+AES +两个节点间通信数据走业界最安全的TLS1.3通道。通信内容还会使用AES加密,双重安全,密钥是通过服务端作换。有效阻止中间人攻击 +### 共享的中转节点是否会获得我的数据 +没错,中转节点天然就是一个中间人,所以才加上AES加密通信内容保证安全。中转节点是无法获取明文的 + +### 中转节点是如何校验权限的 +服务端有个调度模型,根据带宽、ping值、稳定性、服务时长,尽可能地使共享节点均匀地提供服务。连接共享节点使用TOTP密码,hmac-sha256算法校验,它是一次性密码,和我们平时使用的手机验证码或银行密码器一样的原理。 + +## 编译 +cd到代码根目录,执行 +``` +export GOPROXY=https://goproxy.io,direct +go mod tidy +go build +``` + +## TODO +近期计划: +1. 支持IPv6 +2. 支持随系统自动启动,安装成系统服务 +3. 提供一些免费服务器给特别差的网络,如广电网络 +4. 建立网站,用户可以在网站管理所有P2PApp和设备。查看设备在线状态,升级,增删查改重启P2PApp等 +5. 建立公众号,用户可在微信公众号管理所有P2PApp和设备 +6. 客户端提供WebUI +7. 支持自有服务器高并发连接 +8. 共享节点调度模型优化,对不同的运营商优化 +9. 方便二次开发,提供API和lib +10. 应用层支持UDP协议,实现很简单,但UDP应用较少暂不急 +11. 底层通信支持KCP协议,目前仅支持Quic;KCP专门对延时优化,被游戏加速器广泛使用,可以牺牲一定的带宽降低延时 +12. 支持Android系统,让旧手机焕发青春变成移动网关 +13. 支持Windows网上邻居共享文件 +14. 内网直连优化,用处不大,估计就用户测试时用到 + +远期计划: +1. 彻底地分布式去中心化设计 +2. 企业级支持,可以更好地管理大量设备,和更安全更细的权限控制 + +## 参与贡献 +TODO或ISSUE里如果有你擅长的领域,或者你有特别好的主意,可以加入OpenP2P项目,贡献你的代码。待项目茁壮成长后,你们就是知名开源项目的主要代码贡献者,岂不快哉。 +## 商业合作 +它是一个中国人发起的项目,更懂国内网络环境,更懂用户需求,更好的企业级支持 +## 技术交流 +QQ群:16947733 +邮箱:openp2p.cn@gmail.com 271357901@qq.com +第一时间获得最新版本消息,以及一些最新IT业界动态 +微信公众号:openp2p +微博:https://weibo.com/openp2p +## 免责声明 +本项目开源供大家学习和免费使用,禁止用于非法用途,任何不当使用本项目或意外造成的损失,本项目及相关人员不会承担任何责任。 diff --git a/README.md b/README.md new file mode 100644 index 0000000..11035ef --- /dev/null +++ b/README.md @@ -0,0 +1,144 @@ +English|[中文](/README-ZH.md) +## What is OpenP2P +It is an open source, free, and lightweight P2P sharing network. As long as any device joins in, you can access them anywhere +## Why OpenP2P +### Free +Totaly free, fullfills most of users(especially free-rider). Unlike other similar products, we don't need a server with public IP, and don't need to pay for services. + +### Safe +Open source, trustable(see details below) + +### Lightweight +2MB+ filesize, 2MB+ memory. It runs at appllication layer, no vitrual NIC, no kernel driver. + +### Cross-platform +Benefit from lightweight, it easily supports most of major OS, like Windows, Linux, MacOS, also most of CPU architecture, like 386、amd64、arm、arm64、mipsle、mipsle64、mips、mips64. + +### Efficient +P2P direct connection lets your devices make good use of bandwidth. Your device can be connected in any network environments, even supports NAT1-4 (Cone or Symmetric). Relying on the excellent congestion algorithm of the Quic protocol, high bandwidth and low latency can be obtained in a bad network environment. + +### Integration +Your applicaiton can call OpenP2P with a few code to make any internal networks communicate with each other. + +## Get Started +A common scenario to introduce OpenP2P: remote work. At home connects to office's Linux PC . +Under the outbreak of covid-19 pandemic, surely remote work becomes a fundamental demand. + +1. Make sure your office device(Linux) has opened the access of ssh. + ``` + netstat -nl | grep 22 + ``` + Output sample + ![image](/doc/images/officelisten_linux.png) + +2. Download the latest version of OpenP2P(TBC),unzip the downloaded package, and execute below command line. + ``` + tar xvf openp2p0.95.3.linux-amd64.tar.gz + openp2p -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1 + ``` + `Must change the parameters marked in uppercase to your own` + + Output sample + ![image](/doc/images/officeexecute_linux.png) + +3. Download the same package of OpenP2P(TBC) on your home device,unzip and execute below command line. + ``` + openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 --peernode OFFICEPC1 --dstip 127.0.0.1 --dstport 22 --srcport 22022 --protocol tcp + ``` + `Must change the parameters marked in uppercase to your own` + Output sample + ![image](/doc/images/homeconnect_windows.png) + The log of `LISTEN ON PORT 22022 START` indicates P2PApp runs successfully on your home device, listing port is 22022. Once connects to local ip:port,127.0.0.1:22022, it means the home device has conneccted to the office device's port, 22. + ![image](/doc/images/officelisten_2_linux.png) + + +4. Test the connection between office device and home device.In your home deivce, run SSH to login the office device. + ``` + ssh -p22022 root@127.0.0.1:22022 + ``` + ![image](/doc/images/sshconnect.png) + + +## [Usage](/USAGE.md) + +## Scenarios +Especially suitable for large traffic intranet access. +### Remote work +Windows MSTSC, VNC and other remote desktops, SSH, various ERP systems in the intranet + +### Remote Access NAS +Manage a large number of videos and pictures +### Remote Access Camera +### Remote Flashing Phone +### Remotely Data Backup +--- +## Overview Design +### Prototype +![image](/doc/images/prototype.png) +### Client architecture +![image](/doc/images/architecture.png) +### P2PApp +P2PAPP is the most import concept in this project, one P2PApp is able to map the remote service(mstsc/ssh) to the local listening. The main job of re-development or restful API we provide is to manage P2PApp. + +![image](/doc/images/appdetail.png) +## Share +10mbps is its default setting of share speed limit. Only when your users have shared their nodes, they are allowed to use others' shared nodes. This is very fair, and it is also the original intention of this project. +We recommend that you join a shared network in a place with sufficient bandwidth (such as an office or home with 100M optical fiber). +If you are still not willing to contribute any node to the OpenP2P share network, please refer to the operating parameters for your own setting. +## Safety +The nodes which have joined the OpenP2P share network can vist each other by authentications. Shared nodes will only relay data, and others cannot access any resources in the intranet. + +### TLS1.3+AES +The communication data between the two nodes uses the industry's most secure TLS1.3 channel. The communication content will also use AES encryption, double security, the key is exchanged through the server. Effectively prevent man-in-the-middle attacks. + +### Will the shared node capture my data? +That's right, the relay node is naturally an man-in-middle, so AES encryption is added to ensure the security of the communication content. The relay node cannot obtain the plaintext. +### How does the shared relay node verify the authority? +The server side has a scheduling model, which calculate bandwith, ping value,stability and service duration to provide a well-proportioned service to every share node. It uses TOTP(Time-based One-time Password) with hmac-sha256 algorithem, its theory as same as the cellphone validation code or bank cipher coder. + +## Build +cd root directory of the socure code and execute +``` +export GOPROXY=https://goproxy.io,direct +go mod tidy +go build +``` + +## TODO +Short-Term: +1. Support IPv6. +2. Support auto run when system boot, setup system service. +3. Provide free servers to some low-performance network. +4. Build website, users can manage all P2PApp and devices via it. View devices' online status, upgrade, restart or CURD P2PApp . +5. Provide wechat official account, user can manage P2PApp nodes and deivce as same as website. +6. Provide WebUI on client side. +7. Support high concurrency on server side. +8. Optimize our share scheduling model for different network operators. +9. Provide REST APIs and libary for secondary development. +10. Support UDP at application layer, it is easy to implement but not urgent due to only a few applicaitons using UDP protocol. +11. Support KCP protocol underlay, currently support Quic only. KCP focus on delay optimization,which has been widely used as game accelerator,it can sacrifice part of bandwidth to reduce timelag. +12. Support Android platform, let the phones to be mobile gateway . +13. Support SMB Windows neighborhood. +14. Direct connection on intranet, for testing. + + +Long-Term: +1. Decentration and distribution. +2. Enterprise-level product can well manage large scale equipment and ACL. + + +## Contribute +If the items in TODO or ISSUE is your domain, or you have sepical good idea, welcome to join this OpenP2P project and contribute your code. When this project grows stronger, you will be the major outstanding contributors. That's cool. + +## Contact +QQ Group: 16947733 + +Email: openp2p.cn@gmail.com tenderiron@139.com + +Get the latest version news, and some of the latest IT industry trends + +WeChat public account: openp2p + +## Disclaimer +This project is open source for everyone to learn and use for free. It is forbidden to be used for illegal purposes. Any loss caused by improper use of this project or accident, this project and related personnel will not bear any responsibility. + diff --git a/USAGE-ZH.md b/USAGE-ZH.md new file mode 100644 index 0000000..f3accf0 --- /dev/null +++ b/USAGE-ZH.md @@ -0,0 +1,41 @@ +# 详细运行参数说明 + +## 监听 +``` +openp2p.exe -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1 +``` +>* -d daemon模式,推荐使用。发现worker进程意外退出就会自动启动新的worker进程 +>* -node 独一无二的节点名字,唯一标识 +>* -user 独一无二的用户名字,该节点属于这个user +>* -password 密码 +>* -sharebandwidth 作为共享节点时提供带宽,默认10mbps. 如果是光纤大带宽,设置越大效果越好 +>* -loglevel 需要查看更多调试日志,设置0;默认是1 +>* -noshare 不共享,该节点只在私有的P2P网络使用。不加入共享的P2P网络,这样也意味着无法使用别人的共享节点 + +## 连接 +``` +openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 -peernode OFFICEPC1 -dstip 127.0.0.1 -dstport 3389 -srcport 23389 -protocol tcp +``` +>* -peernode 目标节点名字 +>* -dstip 目标服务地址,默认本机127.0.0.1 +>* -dstport 目标服务端口,常见的如windows远程桌面3389,Linux ssh 22 +>* -protocol 目标服务协议 tcp、udp +>* -peeruser 目标用户,如果是同一个用户下的节点,则无需设置 +>* -peerpassword 目标密码,如果是同一个用户下的节点,则无需设置 +>* -f 配置文件,如果希望配置多个P2PApp参考[config.json](/config.json) + +## 升级客户端 +``` +# update local client +openp2p update +# update remote client +curl --insecure 'https://openp2p.cn:27182/api/v1/device/YOUR-NODE-NAME/update?user=&password=' +``` + +Windows系统需要设置防火墙放行本程序,程序会自动设置,如果设置失败会影响连接功能。 +Linux系统(Ubuntu和CentOS7)的防火墙默认配置均不会有影响,如果不行可尝试关闭防火墙 +``` +systemctl stop firewalld.service +systemctl start firewalld.service +firewall-cmd --state +``` \ No newline at end of file diff --git a/USAGE.md b/USAGE.md new file mode 100644 index 0000000..e31c597 --- /dev/null +++ b/USAGE.md @@ -0,0 +1,41 @@ +# Parameters details + +## Listen +``` +openp2p.exe -d -node OFFICEPC1 -user USERNAME1 -password PASSWORD1 +``` +>* -d daemon mode is recommand. When the worker process is found to exit unexpectedly, a new worker process will be automatically started +>* -node Unique node name, unique identification +>* -user Unique user name, the node belongs to this user +>* -password Password +>* -sharebandwidth Provides bandwidth when used as a shared node, the default is 10mbps. If it is a large bandwidth of optical fiber, the larger the setting, the better the effect +>* -loglevel Need to view more debug logs, set 0; the default is 1 +>* -noshare Not shared, the node is only used in a private P2P network. Do not join the shared P2P network, which also means that you CAN NOT use other people’s shared nodes + +## Connect +``` +openp2p.exe -d -node HOMEPC123 -user USERNAME1 -password PASSWORD1 -peernode OFFICEPC1 -dstip 127.0.0.1 -dstport 3389 -srcport 23389 -protocol tcp +``` +>* -peernode Target node name +>* -dstip Target service address, default local 127.0.0.1 +>* -dstport Target service port, such as windows remote desktop 3389, Linux ssh 22 +>* -protocol Target service protocol tcp, udp +>* -peeruser The target user, if it is a node under the same user, no need to set +>* -peerpassword The target password, if it is a node under the same user, no need to set +>* -f Configuration file, if you want to configure multiple P2PApp refer to [config.json](/config.json) + +## Client update +``` +# update local client +openp2p update +# update remote client +curl --insecure 'https://openp2p.cn:27182/api/v1/device/YOUR-NODE-NAME/update?user=&password=' +``` + +Windows system needs to set up firewall for this program, the program will automatically set the firewall, if the setting fails, the UDP punching will be affected. +The default firewall configuration of Linux system (Ubuntu and CentOS7) will not have any effect, if not, you can try to turn off the firewall +``` +systemctl stop firewalld.service +systemctl start firewalld.service +firewall-cmd --state +``` \ No newline at end of file diff --git a/bandwidthLimit.go b/bandwidthLimit.go new file mode 100644 index 0000000..a6c800c --- /dev/null +++ b/bandwidthLimit.go @@ -0,0 +1,45 @@ +package main + +import ( + "sync" + "time" +) + +// BandwidthLimiter ... +type BandwidthLimiter struct { + freeFlowTime time.Time + bandwidth int // mbps + freeFlow int // bytes + maxFreeFlow int // bytes + freeFlowMtx sync.Mutex +} + +// mbps +func newBandwidthLimiter(bw int) *BandwidthLimiter { + return &BandwidthLimiter{ + bandwidth: bw, + freeFlowTime: time.Now(), + maxFreeFlow: bw * 1024 * 1024 / 8, + freeFlow: bw * 1024 * 1024 / 8, + } +} + +// Add ... +func (bl *BandwidthLimiter) Add(bytes int) { + if bl.bandwidth <= 0 { + return + } + bl.freeFlowMtx.Lock() + defer bl.freeFlowMtx.Unlock() + // calc free flow 1000*1000/1024/1024=0.954; 1024*1024/1000/1000=1.048 + bl.freeFlow += int(time.Now().Sub(bl.freeFlowTime) * time.Duration(bl.bandwidth) / 8 / 954) + if bl.freeFlow > bl.maxFreeFlow { + bl.freeFlow = bl.maxFreeFlow + } + bl.freeFlow -= bytes + bl.freeFlowTime = time.Now() + if bl.freeFlow < 0 { + // sleep for the overflow + time.Sleep(time.Millisecond * time.Duration(-bl.freeFlow/(bl.bandwidth*1048/8))) + } +} diff --git a/common.go b/common.go new file mode 100644 index 0000000..8568eb6 --- /dev/null +++ b/common.go @@ -0,0 +1,101 @@ +package main + +import ( + "crypto/aes" + "crypto/cipher" + "fmt" + "net" +) + +func getmac(ip string) string { + //get mac relative to the ip address which connected to the mq. + ifaces, err := net.Interfaces() + if err != nil { + return "" + } + firstMac := "" + for _, iface := range ifaces { + addrs, _ := iface.Addrs() + for _, addr := range addrs { + if firstMac == "" { + firstMac = iface.HardwareAddr.String() + } + if ipNet, ok := addr.(*net.IPNet); ok && ipNet.IP.String() == ip { + if iface.HardwareAddr.String() != "" { + return iface.HardwareAddr.String() + } + return firstMac + } + } + } + return firstMac +} + +var cbcIVBlock = []byte("UHNJUSBACIJFYSQN") + +var paddingArray = [][]byte{ + {0}, + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + {2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}, + {3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}, + {4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4}, + {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5}, + {6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6}, + {7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7}, + {8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8}, + {9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9}, + {10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10}, + {11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11}, + {12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12}, + {13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13}, + {14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14}, + {15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}, + {16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16}, +} + +func pkcs7Padding(plainData []byte, dataLen, blockSize int) int { + padLen := blockSize - dataLen%blockSize + pPadding := plainData[dataLen : dataLen+padLen] + + copy(pPadding, paddingArray[padLen][:padLen]) + return padLen +} + +func pkcs7UnPadding(origData []byte, dataLen int) ([]byte, error) { + unPadLen := int(origData[dataLen-1]) + if unPadLen <= 0 || unPadLen > 16 { + return nil, fmt.Errorf("wrong pkcs7 padding head size:%d", unPadLen) + } + return origData[:(dataLen - unPadLen)], nil +} + +func encryptBytes(key []byte, out, in []byte, plainLen int) ([]byte, error) { + if len(key) == 0 { + return in[:plainLen], nil + } + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + //iv := out[:aes.BlockSize] + //if _, err := io.ReadFull(rand.Reader, iv); err != nil { + // return nil, err + //} + mode := cipher.NewCBCEncrypter(block, cbcIVBlock) + total := pkcs7Padding(in, plainLen, aes.BlockSize) + plainLen + mode.CryptBlocks(out[:total], in[:total]) + return out[:total], nil +} + +func decryptBytes(key []byte, out, in []byte, dataLen int) ([]byte, error) { + if len(key) == 0 { + return in[:dataLen], nil + } + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + mode := cipher.NewCBCDecrypter(block, cbcIVBlock) + mode.CryptBlocks(out[:dataLen], in[:dataLen]) + return pkcs7UnPadding(out, dataLen) +} diff --git a/common_test.go b/common_test.go new file mode 100644 index 0000000..c6444b7 --- /dev/null +++ b/common_test.go @@ -0,0 +1,41 @@ +package main + +import ( + "log" + "testing" +) + +func TestAESCBC(t *testing.T) { + for packetSize := 1; packetSize <= 8192; packetSize++ { + log.Println("test packetSize=", packetSize) + data := make([]byte, packetSize) + for i := 0; i < packetSize; i++ { + data[i] = byte('0' + i%10) + } + p2pEncryptBuf := make([]byte, len(data)+PaddingSize) + inBuf := make([]byte, len(data)+PaddingSize) + copy(inBuf, data) + cryptKey := []byte("0123456789ABCDEF") + sendBuf, err := encryptBytes(cryptKey, p2pEncryptBuf, inBuf, len(data)) + if err != nil { + t.Errorf("encrypt packet failed:%s", err) + } + log.Printf("encrypt data len=%d\n", len(sendBuf)) + + decryptBuf := make([]byte, len(sendBuf)) + outBuf, err := decryptBytes(cryptKey, decryptBuf, sendBuf, len(sendBuf)) + if err != nil { + t.Errorf("decrypt packet failed:%s", err) + } + // log.Printf("len=%d,content=%s\n", len(outBuf), outBuf) + log.Printf("decrypt data len=%d\n", len(outBuf)) + log.Println("validate") + for i := 0; i < len(outBuf); i++ { + if outBuf[i] != byte('0'+i%10) { + t.Error("validate failed") + } + } + log.Println("validate ok") + } + +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..3f220ec --- /dev/null +++ b/config.go @@ -0,0 +1,85 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "time" +) + +var gConf Config + +type AppConfig struct { + // required + Protocol string + SrcPort int + PeerNode string + DstPort int + DstHost string + PeerUser string + PeerPassword string + // runtime info + peerToken uint64 + peerNatType int + peerIP string + peerConeNatPort int + retryNum int + retryTime time.Time + shareBandwidth int +} + +type Config struct { + Network NetworkConfig `json:"network"` + Apps []AppConfig `json:"apps"` + daemonMode bool +} + +func (c *Config) add(app AppConfig) { + if app.SrcPort == 0 || app.DstPort == 0 { + return + } + for i := 0; i < len(c.Apps); i++ { + if c.Apps[i].Protocol == app.Protocol && c.Apps[i].SrcPort == app.SrcPort { + return + } + } + c.Apps = append(c.Apps, app) +} + +// func (c *Config) save() { +// data, _ := json.MarshalIndent(c, "", "") +// ioutil.WriteFile("config.json", data, 0644) +// } + +func (c *Config) load() error { + data, err := ioutil.ReadFile("config.json") + if err != nil { + gLog.Println(LevelERROR, "read config.json error:", err) + return err + } + err = json.Unmarshal(data, &c) + if err != nil { + gLog.Println(LevelERROR, "parse config.json error:", err) + } + return err +} + +type NetworkConfig struct { + // local info + Node string + User string + Password string + NoShare bool + localIP string + ipv6 string + hostName string + mac string + os string + publicIP string + natType int + shareBandwidth int + // server info + ServerHost string + ServerPort int + UDPPort1 int + UDPPort2 int +} diff --git a/config.json b/config.json new file mode 100644 index 0000000..71a764b --- /dev/null +++ b/config.json @@ -0,0 +1,31 @@ +{ + "network": { + "Node": "hhd1207-222", + "User": "tenderiron", + "Password": "13760636579", + "ServerHost": "openp2p.cn", + "ServerPort": 27182, + "UDPPort1": 27182, + "UDPPort2": 27183 + }, + "apps": [ + { + "Protocol": "tcp", + "SrcPort": 53389, + "PeerNode": "dell720-902", + "DstPort": 3389, + "DstHost": "10.1.6.36", + "PeerUser": "", + "PeerPassword": "" + }, + { + "Protocol": "tcp", + "SrcPort": 22, + "PeerNode": "dell720-902", + "DstPort": 22, + "DstHost": "127.0.0.1", + "PeerUser": "", + "PeerPassword": "" + } + ] +} \ No newline at end of file diff --git a/daemon.go b/daemon.go new file mode 100644 index 0000000..f767af9 --- /dev/null +++ b/daemon.go @@ -0,0 +1,36 @@ +package main + +import ( + "os" + "time" +) + +type daemon struct { +} + +func (d *daemon) run() { + gLog.Println(LevelINFO, "daemon start") + defer gLog.Println(LevelINFO, "daemon end") + var args []string + // rm -d parameter + for i := 0; i < len(os.Args); i++ { + if os.Args[i] == "-d" { + args = append(os.Args[0:i], os.Args[i+1:]...) + break + } + } + args = append(args, "-bydaemon") + for { + // start worker + gLog.Println(LevelINFO, "start worker process") + execSpec := &os.ProcAttr{Files: []*os.File{os.Stdin, os.Stdout, os.Stderr}} + p, err := os.StartProcess(os.Args[0], args, execSpec) + if err != nil { + gLog.Printf(LevelERROR, "start worker error:%s", err) + return + } + _, _ = p.Wait() + gLog.Printf(LevelERROR, "worker stop, restart it after 10s") + time.Sleep(time.Second * 10) + } +} diff --git a/doc/images/afterconnect.png b/doc/images/afterconnect.png new file mode 100644 index 0000000..190a19e Binary files /dev/null and b/doc/images/afterconnect.png differ diff --git a/doc/images/afterconnect_linux.PNG b/doc/images/afterconnect_linux.PNG new file mode 100644 index 0000000..d9c640d Binary files /dev/null and b/doc/images/afterconnect_linux.PNG differ diff --git a/doc/images/appdetail.png b/doc/images/appdetail.png new file mode 100644 index 0000000..fda68fa Binary files /dev/null and b/doc/images/appdetail.png differ diff --git a/doc/images/architecture.png b/doc/images/architecture.png new file mode 100644 index 0000000..4996623 Binary files /dev/null and b/doc/images/architecture.png differ diff --git a/doc/images/homeconnect.png b/doc/images/homeconnect.png new file mode 100644 index 0000000..fb4f784 Binary files /dev/null and b/doc/images/homeconnect.png differ diff --git a/doc/images/homeconnect_windows.PNG b/doc/images/homeconnect_windows.PNG new file mode 100644 index 0000000..11c2933 Binary files /dev/null and b/doc/images/homeconnect_windows.PNG differ diff --git a/doc/images/mem.png b/doc/images/mem.png new file mode 100644 index 0000000..350f29d Binary files /dev/null and b/doc/images/mem.png differ diff --git a/doc/images/mstscconnect.png b/doc/images/mstscconnect.png new file mode 100644 index 0000000..3208b1f Binary files /dev/null and b/doc/images/mstscconnect.png differ diff --git a/doc/images/officeexecute_linux.PNG b/doc/images/officeexecute_linux.PNG new file mode 100644 index 0000000..dc8c7b0 Binary files /dev/null and b/doc/images/officeexecute_linux.PNG differ diff --git a/doc/images/officelisten.png b/doc/images/officelisten.png new file mode 100644 index 0000000..6b333d4 Binary files /dev/null and b/doc/images/officelisten.png differ diff --git a/doc/images/officelisten_2_linux.PNG b/doc/images/officelisten_2_linux.PNG new file mode 100644 index 0000000..ac61db1 Binary files /dev/null and b/doc/images/officelisten_2_linux.PNG differ diff --git a/doc/images/officelisten_linux.PNG b/doc/images/officelisten_linux.PNG new file mode 100644 index 0000000..7ca3643 Binary files /dev/null and b/doc/images/officelisten_linux.PNG differ diff --git a/doc/images/prototype.png b/doc/images/prototype.png new file mode 100644 index 0000000..6243829 Binary files /dev/null and b/doc/images/prototype.png differ diff --git a/doc/images/sshconnect.PNG b/doc/images/sshconnect.PNG new file mode 100644 index 0000000..a5b8bd7 Binary files /dev/null and b/doc/images/sshconnect.PNG differ diff --git a/doc/images/winscpconnect.PNG b/doc/images/winscpconnect.PNG new file mode 100644 index 0000000..f0b77f0 Binary files /dev/null and b/doc/images/winscpconnect.PNG differ diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..fc5c46d --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module openp2p + +go 1.16 + +require ( + github.com/gorilla/websocket v1.4.2 + github.com/lucas-clemente/quic-go v0.24.0 + golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 +) diff --git a/holepunch.go b/holepunch.go new file mode 100644 index 0000000..b14fbbd --- /dev/null +++ b/holepunch.go @@ -0,0 +1,180 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "math/rand" + "net" + "sync" + "time" +) + +func handshakeC2C(t *P2PTunnel) (err error) { + gLog.Printf(LevelDEBUG, "handshakeC2C %s:%d:%d to %s:%d", t.pn.config.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort) + defer gLog.Printf(LevelDEBUG, "handshakeC2C ok") + conn, err := net.ListenUDP("udp", t.la) + if err != nil { + return err + } + defer conn.Close() + _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) + if err != nil { + gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshake error:", err) + return err + } + ra, head, _, _, err := UDPRead(conn, 5000) + if err != nil { + time.Sleep(time.Millisecond * 200) + gLog.Println(LevelDEBUG, err, ", return this error when ip was not reachable, retry read") + ra, head, _, _, err = UDPRead(conn, 5000) + if err != nil { + gLog.Println(LevelDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) + return err + } + } + t.ra, _ = net.ResolveUDPAddr("udp", ra.String()) + // cone server side + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { + gLog.Printf(LevelDEBUG, "read %d handshake ", t.id) + UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + _, head, _, _, err = UDPRead(conn, 5000) + if err != nil { + gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) + return err + } + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { + gLog.Printf(LevelDEBUG, "read %d handshake ack ", t.id) + return nil + } + } + // cone client side will only read handshake ack + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { + gLog.Printf(LevelDEBUG, "read %d handshake ack ", t.id) + _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + if err != nil { + gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) + } + return err + } + return nil +} + +func handshakeC2S(t *P2PTunnel) error { + gLog.Printf(LevelDEBUG, "handshakeC2S start") + defer gLog.Printf(LevelDEBUG, "handshakeC2S end") + // even if read timeout, continue handshake + t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, SymmetricHandshakeAckTimeout) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + randPorts := r.Perm(65532) + conn, err := net.ListenUDP("udp", t.la) + if err != nil { + return err + } + defer conn.Close() + go func() error { + gLog.Printf(LevelDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort) + for i := 0; i < SymmetricHandshakeNum; i++ { + // TODO: auto calc cost time + time.Sleep(SymmetricHandshakeInterval) + dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2)) + if err != nil { + return err + } + _, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) + if err != nil { + gLog.Println(LevelDEBUG, "handshakeC2S write MsgPunchHandshake error:", err) + return err + } + } + gLog.Println(LevelDEBUG, "send symmetric handshake end") + return nil + }() + deadline := time.Now().Add(SymmetricHandshakeAckTimeout) + err = conn.SetReadDeadline(deadline) + if err != nil { + gLog.Println(LevelERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error") + return err + } + // read response of the punching hole ok port + result := make([]byte, 1024) + _, dst, err := conn.ReadFrom(result) + if err != nil { + gLog.Println(LevelERROR, "handshakeC2S wait timeout") + return err + } + head := &openP2PHeader{} + err = binary.Read(bytes.NewReader(result[:openP2PHeaderSize]), binary.LittleEndian, head) + if err != nil { + gLog.Println(LevelERROR, "parse p2pheader error:", err) + return err + } + t.ra, _ = net.ResolveUDPAddr("udp", dst.String()) + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { + gLog.Printf(LevelDEBUG, "handshakeC2S read %d handshake ack %s", t.id, dst.String()) + _, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + return err + } + return nil +} + +func handshakeS2C(t *P2PTunnel) error { + gLog.Printf(LevelDEBUG, "handshakeS2C start") + defer gLog.Printf(LevelDEBUG, "handshakeS2C end") + gotCh := make(chan *net.UDPAddr, 5) + // sequencely udp send handshake, do not parallel send + gLog.Printf(LevelDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort) + gotIt := false + gotMtx := sync.Mutex{} + for i := 0; i < SymmetricHandshakeNum; i++ { + // TODO: auto calc cost time + time.Sleep(SymmetricHandshakeInterval) + go func(t *P2PTunnel) error { + conn, err := net.ListenUDP("udp", nil) + if err != nil { + gLog.Printf(LevelDEBUG, "listen error") + return err + } + defer conn.Close() + UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) + _, head, _, _, err := UDPRead(conn, 10000) + if err != nil { + // gLog.Println(LevelDEBUG, "one of the handshake error:", err) + return err + } + gotMtx.Lock() + defer gotMtx.Unlock() + if gotIt { + return nil + } + gotIt = true + t.la, _ = net.ResolveUDPAddr("udp", conn.LocalAddr().String()) + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { + gLog.Printf(LevelDEBUG, "handshakeS2C read %d handshake ", t.id) + UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + _, head, _, _, err = UDPRead(conn, 5000) + if err != nil { + gLog.Println(LevelDEBUG, "handshakeS2C handshake error") + return err + } + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { + gLog.Printf(LevelDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String()) + gotCh <- t.la + return nil + } + } + return nil + }(t) + } + gLog.Printf(LevelDEBUG, "send symmetric handshake end") + gLog.Println(LevelDEBUG, "handshakeS2C ready, notify peer connect") + t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) + + select { + case <-time.After(SymmetricHandshakeAckTimeout): + return fmt.Errorf("wait handshake failed") + case la := <-gotCh: + gLog.Println(LevelDEBUG, "symmetric handshake ok", la) + } + return nil +} diff --git a/log.go b/log.go new file mode 100644 index 0000000..ba2a1e9 --- /dev/null +++ b/log.go @@ -0,0 +1,198 @@ +package main + +import ( + "fmt" + "log" + "os" + "runtime" + "sync" + "time" +) + +// LogLevel ... +type LogLevel int + +var gLog *V8log + +// LevelDEBUG ... +const ( + LevelDEBUG LogLevel = iota + LevelINFO + LevelWARN + LevelERROR +) + +var ( + logFileNames map[LogLevel]string + loglevel map[LogLevel]string +) + +func init() { + logFileNames = make(map[LogLevel]string) + loglevel = make(map[LogLevel]string) + logFileNames[0] = ".log" + loglevel[LevelDEBUG] = "DEBUG" + loglevel[LevelINFO] = "INFO" + loglevel[LevelWARN] = "WARN" + loglevel[LevelERROR] = "ERROR" + +} + +const ( + LogFile = iota + LogConsole + LogFileAndConsole +) + +// V8log ... +type V8log struct { + loggers map[LogLevel]*log.Logger + files map[LogLevel]*os.File + llevel LogLevel + stopSig chan bool + logDir string + mtx *sync.Mutex + stoped bool + lineEnding string + pid int + lastError string + maxLogSize int64 + mode int +} + +// InitLogger ... +func InitLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *V8log { + logger := make(map[LogLevel]*log.Logger) + openedfile := make(map[LogLevel]*os.File) + var ( + logdir string + ) + if path == "" { + logdir = "log/" + } else { + logdir = path + "/log/" + } + os.MkdirAll(logdir, 0777) + for l := range logFileNames { + logFilePath := logdir + filePrefix + logFileNames[l] + f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatal(err) + } + os.Chmod(logFilePath, 0666) + openedfile[l] = f + logger[l] = log.New(f, "", log.LstdFlags) + } + var le string + if runtime.GOOS == "windows" { + le = "\r\n" + } else { + le = "\n" + } + pLog := &V8log{logger, openedfile, level, make(chan bool, 10), logdir, &sync.Mutex{}, false, le, os.Getpid(), "", maxLogSize, mode} + go pLog.checkFile() + return pLog +} + +// UninitLogger ... +func (vl *V8log) UninitLogger() { + if !vl.stoped { + vl.stoped = true + close(vl.stopSig) + for l := range logFileNames { + if l >= vl.llevel { + vl.files[l].Close() + } + } + } +} + +func (vl *V8log) checkFile() { + if vl.maxLogSize <= 0 { + return + } + ticker := time.NewTicker(time.Minute) + for { + select { + case <-ticker.C: + vl.mtx.Lock() + for l, logFile := range vl.files { + f, e := logFile.Stat() + if e != nil { + break + } + if f.Size() <= vl.maxLogSize { + break + } + logFile.Close() + fname := f.Name() + backupPath := vl.logDir + fname + ".0" + os.Remove(backupPath) + os.Rename(vl.logDir+fname, backupPath) + newFile, e := os.OpenFile(vl.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if e == nil { + vl.loggers[l].SetOutput(newFile) + vl.files[l] = newFile + } + + } + vl.mtx.Unlock() + case <-vl.stopSig: + } + if vl.stoped { + break + } + } +} + +// Printf Warning: report error log depends on this Print format. +func (vl *V8log) Printf(level LogLevel, format string, params ...interface{}) { + vl.mtx.Lock() + defer vl.mtx.Unlock() + if vl.stoped { + return + } + if level < vl.llevel { + return + } + if level == LevelERROR { + vl.lastError = fmt.Sprintf(format, params...) + } + pidAndLevel := []interface{}{vl.pid, loglevel[level]} + params = append(pidAndLevel, params...) + if vl.mode == LogFile || vl.mode == LogFileAndConsole { + vl.loggers[0].Printf("%d %s "+format+vl.lineEnding, params...) + } + + if vl.mode == LogConsole || vl.mode == LogFileAndConsole { + log.Printf("%d %s "+format+vl.lineEnding, params...) + } +} + +// Println ... +func (vl *V8log) Println(level LogLevel, params ...interface{}) { + vl.mtx.Lock() + defer vl.mtx.Unlock() + if vl.stoped { + return + } + if level < vl.llevel { + return + } + if level == LevelERROR { + vl.lastError = fmt.Sprint(params...) + } + pidAndLevel := []interface{}{vl.pid, " ", loglevel[level], " "} + params = append(pidAndLevel, params...) + params = append(params, vl.lineEnding) + vl.loggers[0].Print(params...) + if vl.mode == LogConsole || vl.mode == LogFileAndConsole { + log.Print(params...) + } +} + +func (vl *V8log) getLastError() string { + vl.mtx.Lock() + defer vl.mtx.Unlock() + return vl.lastError +} diff --git a/nat.go b/nat.go new file mode 100644 index 0000000..a00a69d --- /dev/null +++ b/nat.go @@ -0,0 +1,94 @@ +package main + +import ( + "encoding/json" + "fmt" + "math/rand" + "net" + "time" +) + +func natTest(serverHost string, serverPort int, localPort int) (publicIP string, isPublicIP int, publicPort int, err error) { + conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", localPort)) + if err != nil { + return "", 0, 0, err + } + defer conn.Close() + + dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", serverHost, serverPort)) + if err != nil { + return "", 0, 0, err + } + + // The connection can write data to the desired address. + msg, err := newMessage(MsgNATDetect, 0, &NatDetectReq{SrcPort: localPort, EchoPort: EchoPort}) + _, err = conn.WriteTo(msg, dst) + if err != nil { + return "", 0, 0, err + } + deadline := time.Now().Add(NatTestTimeout) + err = conn.SetReadDeadline(deadline) + if err != nil { + return "", 0, 0, err + } + buffer := make([]byte, 1024) + nRead, _, err := conn.ReadFrom(buffer) + if err != nil { + gLog.Println(LevelERROR, "NAT detect error:", err) + return "", 0, 0, err + } + natRsp := NatDetectRsp{} + err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp) + return natRsp.IP, natRsp.IsPublicIP, natRsp.Port, nil +} + +func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, err error) { + // the random local port may be used by other. + go echo() + localPort := int(rand.Uint32()%10000 + 50000) + ip1, isPublicIP, port1, err := natTest(host, udp1, localPort) + gLog.Printf(LevelDEBUG, "local port:%d nat port:%d", localPort, port1) + if err != nil { + return "", 0, err + } + if isPublicIP == 1 { + return ip1, NATNone, nil + } + ip2, _, port2, err := natTest(host, udp2, localPort) + gLog.Printf(LevelDEBUG, "local port:%d nat port:%d", localPort, port2) + if err != nil { + return "", 0, err + } + if ip1 != ip2 { + return "", 0, fmt.Errorf("ip have changed, please retry again") + } + natType := NATSymmetric + if port1 == port2 { + natType = NATCone + } + //TODO: NATNone + return ip1, natType, nil +} + +const ( + UDPPort1 = 27182 + UDPPort2 = 27183 + EchoPort = 31415 +) + +func echo() { + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: EchoPort}) + if err != nil { + gLog.Println(LevelERROR, "echo server listen error:", err) + return + } + buf := make([]byte, 1600) + defer conn.Close() + // wait 5s for echo testing + conn.SetReadDeadline(time.Now().Add(time.Second * 5)) + n, addr, err := conn.ReadFromUDP(buf) + if err != nil { + return + } + conn.WriteToUDP(buf[0:n], addr) +} diff --git a/openp2p.go b/openp2p.go new file mode 100644 index 0000000..3e14bc8 --- /dev/null +++ b/openp2p.go @@ -0,0 +1,158 @@ +package main + +import ( + "flag" + "fmt" + "math/rand" + "os" + "path/filepath" + "time" +) + +func main() { + rand.Seed(time.Now().UnixNano()) + // TODO: install sub command, deamon process + // groups := flag.String("groups", "", "you could join in several groups. like: GroupName1:Password1;GroupName2:Password2; group name 8-31 characters") + if len(os.Args) > 1 { + switch os.Args[1] { + case "version", "-v", "--version": + fmt.Println(OpenP2PVersion) + return + case "update": + gLog = InitLogger(filepath.Dir(os.Args[0]), "openp2p", LevelDEBUG, 1024*1024, LogConsole) + update() + return + } + } + + user := flag.String("user", "", "user name. 8-31 characters") + node := flag.String("node", "", "node name. 8-31 characters") + password := flag.String("password", "", "user password. 8-31 characters") + peerNode := flag.String("peernode", "", "peer node name that you want to connect") + peerUser := flag.String("peeruser", "", "peer node user (default peeruser=user)") + peerPassword := flag.String("peerpassword", "", "peer node password (default peerpassword=password)") + dstIP := flag.String("dstip", "127.0.0.1", "destination ip ") + serverHost := flag.String("serverhost", "openp2p.cn", "server host ") + // serverHost := flag.String("serverhost", "127.0.0.1", "server host ") // for debug + dstPort := flag.Int("dstport", 0, "destination port ") + srcPort := flag.Int("srcport", 0, "source port ") + protocol := flag.String("protocol", "tcp", "tcp or udp") + noShare := flag.Bool("noshare", false, "disable using the huge numbers of shared nodes in OpenP2P network, your connectivity will be weak. also this node will not shared with others") + shareBandwidth := flag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private node no limit") + configFile := flag.Bool("f", false, "config file") + daemonMode := flag.Bool("d", false, "daemonMode") + byDaemon := flag.Bool("bydaemon", false, "start by daemon") + logLevel := flag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error") + flag.Parse() + gLog = InitLogger(filepath.Dir(os.Args[0]), "openp2p", LogLevel(*logLevel), 1024*1024, LogConsole) + gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion) + if *daemonMode { + d := daemon{} + d.run() + return + } + if !*configFile { + // validate cmd params + if *node == "" { + gLog.Println(LevelERROR, "node name not set", os.Args, len(os.Args), os.Args[0]) + return + } + if *user == "" { + gLog.Println(LevelERROR, "user name not set") + return + } + if *password == "" { + gLog.Println(LevelERROR, "password not set") + return + } + if *peerNode != "" { + if *dstPort == 0 { + gLog.Println(LevelERROR, "dstPort not set") + return + } + if *srcPort == 0 { + gLog.Println(LevelERROR, "srcPort not set") + return + } + } + } + + config := AppConfig{} + config.PeerNode = *peerNode + config.PeerUser = *peerUser + config.PeerPassword = *peerPassword + config.DstHost = *dstIP + config.DstPort = *dstPort + config.SrcPort = *srcPort + config.Protocol = *protocol + gLog.Println(LevelINFO, config) + if *configFile { + if err := gConf.load(); err != nil { + gLog.Println(LevelERROR, "load config error. exit.") + return + } + } else { + gConf.add(config) + gConf.Network = NetworkConfig{ + Node: *node, + User: *user, + Password: *password, + NoShare: *noShare, + ServerHost: *serverHost, + ServerPort: 27182, + UDPPort1: 27182, + UDPPort2: 27183, + ipv6: "240e:3b7:621:def0:fda4:dd7f:36a1:2803", // TODO: detect real ipv6 + shareBandwidth: *shareBandwidth, + } + } + // gConf.save() // not change config file + gConf.daemonMode = *byDaemon + + gLog.Println(LevelINFO, gConf) + setFirewall() + network := P2PNetworkInstance(&gConf.Network) + if ok := network.Connect(30000); !ok { + gLog.Println(LevelERROR, "P2PNetwork login error") + return + } + for _, app := range gConf.Apps { + // set default peer user password + if app.PeerPassword == "" { + app.PeerPassword = gConf.Network.Password + } + if app.PeerUser == "" { + app.PeerUser = gConf.Network.User + } + err := network.AddApp(app) + if err != nil { + gLog.Println(LevelERROR, "addTunnel error") + } + } + + // test + // go func() { + + // time.Sleep(time.Second * 30) + // config := AppConfig{} + // config.PeerNode = *peerNode + // config.PeerUser = *peerUser + // config.PeerPassword = *peerPassword + // config.DstHost = *dstIP + // config.DstPort = *dstPort + // config.SrcPort = 32 + // config.Protocol = *protocol + // network.AddApp(config) + // // time.Sleep(time.Second * 30) + // // network.DeleteTunnel(config) + // // time.Sleep(time.Second * 30) + // // network.DeleteTunnel(config) + // }() + + // // TODO: http api + // api := ClientAPI{} + // go api.run() + gLog.Println(LevelINFO, "waiting for connection...") + forever := make(chan bool) + <-forever +} diff --git a/overlaytcp.go b/overlaytcp.go new file mode 100644 index 0000000..88880e3 --- /dev/null +++ b/overlaytcp.go @@ -0,0 +1,85 @@ +package main + +import ( + "bytes" + "encoding/binary" + "net" + "time" +) + +// implement io.Writer +type overlayTCP struct { + tunnel *P2PTunnel + conn net.Conn + id uint64 + rtid uint64 + running bool + isClient bool + appID uint64 + appKey uint64 + appKeyBytes []byte +} + +func (otcp *overlayTCP) run() { + gLog.Printf(LevelINFO, "%d overlayTCP run start", otcp.id) + defer gLog.Printf(LevelINFO, "%d overlayTCP run end", otcp.id) + otcp.running = true + buffer := make([]byte, ReadBuffLen+PaddingSize) + readBuf := buffer[:ReadBuffLen] + encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding + tunnelHead := new(bytes.Buffer) + relayHead := new(bytes.Buffer) + binary.Write(relayHead, binary.LittleEndian, otcp.rtid) + binary.Write(tunnelHead, binary.LittleEndian, otcp.id) + for otcp.running && otcp.tunnel.isRuning() { + otcp.conn.SetReadDeadline(time.Now().Add(time.Second * 5)) + dataLen, err := otcp.conn.Read(readBuf) + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Timeout() { + continue + } + // overlay tcp connection normal close, debug log + gLog.Printf(LevelDEBUG, "overlayTCP %d read error:%s,close it", otcp.id, err) + break + } else { + payload := readBuf[:dataLen] + if otcp.appKey != 0 { + payload, _ = encryptBytes(otcp.appKeyBytes, encryptData, buffer[:dataLen], dataLen) + } + writeBytes := append(tunnelHead.Bytes(), payload...) + if otcp.rtid == 0 { + otcp.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes) + } else { + // write raley data + all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...) + all = append(all, writeBytes...) + otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all) + gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", otcp.rtid, otcp.id, len(writeBytes)) + } + } + } + otcp.conn.Close() + otcp.tunnel.overlayConns.Delete(otcp.id) + // notify peer disconnect + if otcp.isClient { + req := OverlayDisconnectReq{ID: otcp.id} + if otcp.rtid == 0 { + otcp.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req) + } else { + // write relay data + msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req) + msgWithHead := append(relayHead.Bytes(), msg...) + otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + } + } +} + +// calling by p2pTunnel +func (otcp *overlayTCP) Write(buff []byte) (n int, err error) { + // add mutex when multi-thread calling + n, err = otcp.conn.Write(buff) + if err != nil { + otcp.tunnel.overlayConns.Delete(otcp.id) + } + return +} diff --git a/p2papp.go b/p2papp.go new file mode 100644 index 0000000..c25e110 --- /dev/null +++ b/p2papp.go @@ -0,0 +1,142 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "math/rand" + "net" + "sync" + "time" +) + +type p2pApp struct { + config AppConfig + listener net.Listener + tunnel *P2PTunnel + rtid uint64 + hbTime time.Time + hbMtx sync.Mutex + running bool + id uint64 + key uint64 + wg sync.WaitGroup +} + +func (app *p2pApp) isActive() bool { + if app.rtid == 0 { // direct mode app heartbeat equals to tunnel heartbeat + return app.tunnel.isActive() + } + // relay mode calc app heartbeat + app.hbMtx.Lock() + defer app.hbMtx.Unlock() + return time.Now().Before(app.hbTime.Add(TunnelIdleTimeout)) +} + +func (app *p2pApp) updateHeartbeat() { + app.hbMtx.Lock() + defer app.hbMtx.Unlock() + app.hbTime = time.Now() +} + +func (app *p2pApp) listenTCP() error { + var err error + app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) + if err != nil { + gLog.Printf(LevelERROR, "listen error:%s", err) + return err + } + for { + conn, err := app.listener.Accept() + if err != nil { + gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err) + break + } + otcp := overlayTCP{ + tunnel: app.tunnel, + conn: conn, + id: rand.Uint64(), + isClient: true, + rtid: app.rtid, + appID: app.id, + appKey: app.key, + } + // calc key bytes for encrypt + if otcp.appKey != 0 { + encryptKey := make([]byte, AESKeySize) + binary.LittleEndian.PutUint64(encryptKey, otcp.appKey) + binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey) + otcp.appKeyBytes = encryptKey + } + app.tunnel.overlayConns.Store(otcp.id, &otcp) + gLog.Printf(LevelINFO, "Accept overlayID:%d", otcp.id) + // tell peer connect + req := OverlayConnectReq{ID: otcp.id, + User: app.config.PeerUser, + Password: app.config.PeerPassword, + DstIP: app.config.DstHost, + DstPort: app.config.DstPort, + Protocol: app.config.Protocol, + AppID: app.id, + } + if app.rtid == 0 { + app.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayConnectReq, &req) + } else { + req.RelayTunnelID = app.tunnel.id + relayHead := new(bytes.Buffer) + binary.Write(relayHead, binary.LittleEndian, app.rtid) + msg, _ := newMessage(MsgP2P, MsgOverlayConnectReq, &req) + msgWithHead := append(relayHead.Bytes(), msg...) + app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + } + + go otcp.run() + } + return nil +} + +func (app *p2pApp) listen() error { + gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort) + defer gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort) + app.wg.Add(1) + defer app.wg.Done() + app.running = true + if app.rtid != 0 { + go app.relayHeartbeatLoop() + } + for app.running { + if app.config.Protocol == "tcp" { + app.listenTCP() + } + time.Sleep(time.Second * 5) + // TODO: listen UDP + } + return nil +} + +func (app *p2pApp) close() { + app.running = false + if app.listener != nil { + app.listener.Close() + } + app.tunnel.closeOverlayConns(app.id) + app.wg.Wait() +} + +// TODO: many relay app on the same P2PTunnel will send a lot of relay heartbeat +func (app *p2pApp) relayHeartbeatLoop() { + app.wg.Add(1) + defer app.wg.Done() + gLog.Printf(LevelDEBUG, "relayHeartbeat to %d start", app.rtid) + defer gLog.Printf(LevelDEBUG, "relayHeartbeat to %d end", app.rtid) + relayHead := new(bytes.Buffer) + binary.Write(relayHead, binary.LittleEndian, app.rtid) + req := RelayHeartbeat{RelayTunnelID: app.tunnel.id, + AppID: app.id} + msg, _ := newMessage(MsgP2P, MsgRelayHeartbeat, &req) + msgWithHead := append(relayHead.Bytes(), msg...) + for app.tunnel.isRuning() && app.running { + app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + time.Sleep(TunnelHeartbeatTime) + } +} diff --git a/p2pappkeys.go b/p2pappkeys.go new file mode 100644 index 0000000..3660eec --- /dev/null +++ b/p2pappkeys.go @@ -0,0 +1,19 @@ +package main + +import ( + "sync" +) + +var p2pAppKeys sync.Map + +func GetKey(appID uint64) uint64 { + i, ok := p2pAppKeys.Load(appID) + if !ok { + return 0 + } + return i.(uint64) +} + +func SaveKey(appID uint64, appKey uint64) { + p2pAppKeys.Store(appID, appKey) +} diff --git a/p2pconn.go b/p2pconn.go new file mode 100644 index 0000000..55bf87f --- /dev/null +++ b/p2pconn.go @@ -0,0 +1,17 @@ +package main + +import ( + "time" +) + +type p2pConn interface { + ReadMessage() (*openP2PHeader, []byte, error) + WriteBytes(uint16, uint16, []byte) error + WriteBuffer([]byte) error + WriteMessage(uint16, uint16, interface{}) error + Close() error + Accept() error + CloseListener() + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error +} diff --git a/p2pnetwork.go b/p2pnetwork.go new file mode 100644 index 0000000..d757aca --- /dev/null +++ b/p2pnetwork.go @@ -0,0 +1,691 @@ +package main + +import ( + "bytes" + "crypto/tls" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math" + "math/rand" + "net/url" + "os" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +var ( + instance *P2PNetwork + once sync.Once +) + +type P2PNetwork struct { + conn *websocket.Conn + online bool + running bool + restartCh chan bool + wg sync.WaitGroup + writeMtx sync.Mutex + serverTs uint64 + // msgMap sync.Map + msgMap map[uint64]chan []byte //key: nodeID + msgMapMtx sync.Mutex + config NetworkConfig + allTunnels sync.Map + apps sync.Map + limiter *BandwidthLimiter +} + +func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { + if instance == nil { + once.Do(func() { + instance = &P2PNetwork{ + restartCh: make(chan bool, 2), + online: false, + running: true, + msgMap: make(map[uint64]chan []byte), + limiter: newBandwidthLimiter(config.shareBandwidth), + } + instance.msgMap[0] = make(chan []byte) // for gateway + if config != nil { + instance.config = *config + } + instance.init() + go instance.run() + }) + } + return instance +} + +func (pn *P2PNetwork) run() { + go pn.autoReconnectApp() + heartbeatTimer := time.NewTicker(NetworkHeartbeatTime) + for pn.running { + select { + case <-heartbeatTimer.C: // TODO: deal with connect failed, no send hb + pn.write(MsgHeartbeat, 0, "") + + case <-pn.restartCh: + pn.online = false + pn.wg.Wait() // wait read/write goroutine exited + time.Sleep(NetworkHeartbeatTime) + err := pn.init() + if err != nil { + gLog.Println(LevelERROR, "P2PNetwork init error:", err) + } + } + } +} + +func (pn *P2PNetwork) Connect(timeout int) bool { + // waiting for login response + for i := 0; i < (timeout / 1000); i++ { + if pn.serverTs != 0 { + return true + } + time.Sleep(time.Second) + } + return false +} + +func (pn *P2PNetwork) autoReconnectApp() { + gLog.Println(LevelINFO, "autoReconnectApp start") + retryApps := make([]AppConfig, 0) + for pn.running { + time.Sleep(time.Second) + if !pn.online { + continue + } + if len(retryApps) > 0 { + gLog.Printf(LevelINFO, "retryApps len=%d", len(retryApps)) + thisRound := make([]AppConfig, 0) + for i := 0; i < len(retryApps); i++ { + // reset retryNum when running 15min continuously + delay := math.Exp(float64(retryApps[i].retryNum+1)/2) * 5 + if delay > 1800 { // max delay 30min + delay = 1800 + } + if retryApps[i].retryTime.Add(time.Minute * 15).Before(time.Now()) { + retryApps[i].retryNum = 0 + } + retryApps[i].retryNum++ + retryApps[i].retryTime = time.Now() + if retryApps[i].retryNum > MaxRetry { + gLog.Printf(LevelERROR, "app %s%d retry more than %d times, exit.", retryApps[i].Protocol, retryApps[i].SrcPort, MaxRetry) + continue + } + pn.DeleteApp(retryApps[i]) + if err := pn.AddApp(retryApps[i]); err != nil { + gLog.Printf(LevelERROR, "AddApp %s%d error:%s", retryApps[i].Protocol, retryApps[i].SrcPort, err) + thisRound = append(thisRound, retryApps[i]) + time.Sleep(RetryInterval) + } + } + retryApps = thisRound + } + pn.apps.Range(func(_, i interface{}) bool { + app := i.(*p2pApp) + if app.isActive() { + return true + } + gLog.Printf(LevelINFO, "detect app %s%d disconnect,last hb %s reconnecting...", app.config.Protocol, app.config.SrcPort, app.hbTime) + config := app.config + // clear peerinfo + config.peerConeNatPort = 0 + config.peerIP = "" + config.peerNatType = 0 + config.peerToken = 0 + pn.DeleteApp(config) + retryApps = append(retryApps, config) + return true + }) + } + gLog.Println(LevelINFO, "autoReconnectApp end") +} + +func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint64) (*P2PTunnel, uint64, error) { + gLog.Printf(LevelINFO, "addRelayTunnel to %s start", config.PeerNode) + defer gLog.Printf(LevelINFO, "addRelayTunnel to %s end", config.PeerNode) + pn.write(MsgRelay, MsgRelayNodeReq, nil) + head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, time.Second*10) + if head == nil { + return nil, 0, errors.New("read MsgRelayNodeRsp error") + } + rsp := RelayNodeRsp{} + err := json.Unmarshal(body, &rsp) + if err != nil { + gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err) + return nil, 0, errors.New("unmarshal MsgRelayNodeRsp error") + } + if rsp.RelayName == "" || rsp.RelayToken == 0 { + gLog.Printf(LevelERROR, "MsgRelayNodeReq error") + return nil, 0, errors.New("MsgRelayNodeReq error") + } + gLog.Printf(LevelINFO, "got relay node:%s", rsp.RelayName) + relayConfig := config + relayConfig.PeerNode = rsp.RelayName + relayConfig.peerToken = rsp.RelayToken + t, err := pn.addDirectTunnel(relayConfig, 0) + if err != nil { + gLog.Println(LevelERROR, "direct connect error:", err) + return nil, 0, err + } + // notify peer addRelayTunnel + req := AddRelayTunnelReq{ + From: pn.config.Node, + RelayName: rsp.RelayName, + RelayToken: rsp.RelayToken, + AppID: appid, + AppKey: appkey, + } + gLog.Printf(LevelINFO, "push relay %s---------%s", config.PeerNode, rsp.RelayName) + pn.push(config.PeerNode, MsgPushAddRelayTunnelReq, &req) + + // wait relay ready + head, body = pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount) // TODO: const value + if head == nil { + gLog.Printf(LevelERROR, "read MsgPushAddRelayTunnelRsp error") + return nil, 0, errors.New("read MsgPushAddRelayTunnelRsp error") + } + rspID := TunnelMsg{} + err = json.Unmarshal(body, &rspID) + if err != nil { + gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err) + return nil, 0, errors.New("unmarshal MsgRelayNodeRsp error") + } + return t, rspID.ID, err +} + +func (pn *P2PNetwork) AddApp(config AppConfig) error { + gLog.Printf(LevelINFO, "addApp %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) + defer gLog.Printf(LevelINFO, "addApp %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) + if !pn.online { + return errors.New("P2PNetwork offline") + } + // check if app already exist? + appExist := false + pn.apps.Range(func(_, i interface{}) bool { + app := i.(*p2pApp) + if app.config.Protocol == config.Protocol && app.config.SrcPort == config.SrcPort { + appExist = true + return false + } + return true + }) + if appExist { + return errors.New("P2PApp already exist") + } + appID := rand.Uint64() + appKey := uint64(0) + t, err := pn.addDirectTunnel(config, 0) + var rtid uint64 + relayNode := "" + peerNatType := 100 + peerIP := "" + errMsg := "" + if err != nil && err == ErrorHandshake { + gLog.Println(LevelERROR, "direct connect failed, try to relay") + appKey = rand.Uint64() + t, rtid, err = pn.addRelayTunnel(config, appID, appKey) + if t != nil { + relayNode = t.config.PeerNode + } + } + if t != nil { + peerNatType = t.config.peerNatType + peerIP = t.config.peerIP + } + if err != nil { + errMsg = err.Error() + } + req := ReportConnect{ + Error: errMsg, + Protocol: config.Protocol, + SrcPort: config.SrcPort, + NatType: pn.config.natType, + PeerNode: config.PeerNode, + DstPort: config.DstPort, + DstHost: config.DstHost, + PeerUser: config.PeerUser, + PeerNatType: peerNatType, + PeerIP: peerIP, + ShareBandwidth: pn.config.shareBandwidth, + RelayNode: relayNode, + Version: OpenP2PVersion, + } + pn.write(MsgReport, MsgReportConnect, &req) + if err != nil { + return err + } + + app := p2pApp{ + id: appID, + key: appKey, + tunnel: t, + config: config, + rtid: rtid, + hbTime: time.Now()} + pn.apps.Store(appID, &app) + go app.listen() + return err +} + +func (pn *P2PNetwork) DeleteApp(config AppConfig) { + gLog.Printf(LevelINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort) + defer gLog.Printf(LevelINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort) + // close the apps of this config + pn.apps.Range(func(_, i interface{}) bool { + app := i.(*p2pApp) + if app.config.Protocol == config.Protocol && app.config.SrcPort == config.SrcPort { + gLog.Printf(LevelINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) + app := i.(*p2pApp) + app.close() + pn.apps.Delete(app.id) + return false + } + return true + }) +} + +func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) { + gLog.Printf(LevelINFO, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) + defer gLog.Printf(LevelINFO, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) + isClient := false + // client side tid=0, assign random uint64 + if tid == 0 { + tid = rand.Uint64() + isClient = true + } + exist := false + // find existing tunnel to peer + var t *P2PTunnel + pn.allTunnels.Range(func(id, i interface{}) bool { + t = i.(*P2PTunnel) + if t.config.PeerNode == config.PeerNode { + // server side force close existing tunnel + if !isClient { + t.close() + return false + } + + // client side checking + gLog.Println(LevelINFO, "tunnel already exist ", config.PeerNode) + isActive := t.checkActive() + // inactive, close it + if !isActive { + gLog.Println(LevelINFO, "but it's not active, close it ", config.PeerNode) + t.close() + } else { + // active + exist = true + } + return false + } + return true + }) + // create tunnel if not exist + if !exist { + t = &P2PTunnel{pn: pn, + config: config, + id: tid, + } + pn.msgMapMtx.Lock() + pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50) + pn.msgMapMtx.Unlock() + t.init() + if isClient { + if err := t.connect(); err != nil { + gLog.Println(LevelERROR, "p2pTunnel connect error:", err) + return t, err + } + } else { + rsp := PushConnectRsp{ + Error: 0, + Detail: "connect ok", + To: t.config.PeerNode, + From: pn.config.Node, + NatType: pn.config.natType, + FromIP: pn.config.publicIP, + ConeNatPort: t.coneNatPort, + ID: t.id} + t.pn.push(t.config.PeerNode, MsgPushConnectRsp, rsp) + if err := t.listen(); err != nil { + gLog.Println(LevelERROR, "p2pTunnel listen error:", err) + return t, err + } + } + } + // store it when success + gLog.Printf(LevelDEBUG, "store tunnel %d", tid) + pn.allTunnels.Store(tid, t) + return t, nil +} + +func (pn *P2PNetwork) init() error { + gLog.Println(LevelINFO, "init start") + var err error + for { + pn.config.hostName, err = os.Hostname() + if err != nil { + break + } + + // detect nat type + pn.config.publicIP, pn.config.natType, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2) + // TODO rm test s2s + if pn.config.Node == "hhd1207-222S2S" { + pn.config.natType = NATSymmetric + } + if err != nil { + gLog.Println(LevelINFO, "detect NAT type error:", err) + break + } + gLog.Println(LevelINFO, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP) + gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) + forwardPath := "/openp2p/v1/login" + config := tls.Config{InsecureSkipVerify: true} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert + websocket.DefaultDialer.TLSClientConfig = &config + u := url.URL{Scheme: "wss", Host: gatewayURL, Path: forwardPath} + q := u.Query() + q.Add("node", pn.config.Node) + q.Add("user", pn.config.User) + q.Add("password", pn.config.Password) + q.Add("version", OpenP2PVersion) + q.Add("nattype", fmt.Sprintf("%d", pn.config.natType)) + q.Add("timestamp", fmt.Sprintf("%d", time.Now().Unix())) + + noShareStr := "false" + if pn.config.NoShare { + noShareStr = "true" + } + q.Add("noshare", noShareStr) + u.RawQuery = q.Encode() + var ws *websocket.Conn + ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + break + } + pn.online = true + pn.conn = ws + localAddr := strings.Split(ws.LocalAddr().String(), ":") + if len(localAddr) == 2 { + pn.config.localIP = localAddr[0] + } else { + err = errors.New("get local ip failed") + break + } + go pn.readLoop() + + pn.config.mac = getmac(pn.config.localIP) + pn.config.os = getOsName() + req := ReportBasic{ + Mac: pn.config.mac, + LanIP: pn.config.localIP, + OS: pn.config.os, + IPv6: pn.config.ipv6, + Version: OpenP2PVersion, + } + pn.write(MsgReport, MsgReportBasic, &req) + gLog.Println(LevelINFO, "P2PNetwork init ok") + break + } + if err != nil { + // init failed, retry + pn.restartCh <- true + gLog.Println(LevelERROR, "P2PNetwork init error:", err) + } + return err +} + +func (pn *P2PNetwork) handleMessage(t int, msg []byte) { + head := openP2PHeader{} + err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, &head) + if err != nil { + gLog.Println(LevelERROR, "handleMessage error:", err) + return + } + switch head.MainType { + case MsgLogin: + // gLog.Println(LevelINFO,string(msg)) + rsp := LoginRsp{} + err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp) + if err != nil { + gLog.Printf(LevelERROR, "wrong login response:%s", err) + return + } + if rsp.Error != 0 { + gLog.Printf(LevelERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail) + pn.running = false + } else { + gLog.Printf(LevelINFO, "login ok. Server ts=%d, local ts=%d", rsp.Ts, time.Now().Unix()) + pn.serverTs = rsp.Ts + } + case MsgHeartbeat: + gLog.Printf(LevelDEBUG, "P2PNetwork heartbeat ok") + case MsgPush: + pn.handlePush(head.SubType, msg) + default: + pn.msgMapMtx.Lock() + ch := pn.msgMap[0] + pn.msgMapMtx.Unlock() + ch <- msg + return + } +} + +func (pn *P2PNetwork) readLoop() { + gLog.Printf(LevelINFO, "P2PNetwork readLoop start") + pn.wg.Add(1) + defer pn.wg.Done() + for pn.running { + pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second)) + t, msg, err := pn.conn.ReadMessage() + if err != nil { + gLog.Printf(LevelERROR, "P2PNetwork read error:%s", err) + pn.conn.Close() + pn.restartCh <- true + break + } + pn.handleMessage(t, msg) + } + gLog.Printf(LevelINFO, "P2PNetwork readLoop end") +} + +func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{}) error { + if !pn.online { + return errors.New("P2P network offline") + } + msg, err := newMessage(mainType, subType, packet) + if err != nil { + return err + } + pn.writeMtx.Lock() + defer pn.writeMtx.Unlock() + if err = pn.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil { + gLog.Printf(LevelERROR, "write msgType %d,%d error:%s", mainType, subType, err) + pn.conn.Close() + } + return err +} + +func (pn *P2PNetwork) relay(to uint64, body []byte) error { + gLog.Printf(LevelDEBUG, "relay data to %d", to) + i, ok := pn.allTunnels.Load(to) + if !ok { + return nil + } + tunnel := i.(*P2PTunnel) + if tunnel.config.shareBandwidth > 0 { + pn.limiter.Add(len(body)) + } + tunnel.conn.WriteBuffer(body) + return nil +} + +func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error { + gLog.Printf(LevelDEBUG, "push msgType %d to %s", subType, to) + if !pn.online { + return errors.New("client offline") + } + pushHead := PushHeader{} + pushHead.From = nodeNameToID(pn.config.Node) + pushHead.To = nodeNameToID(to) + pushHeadBuf := new(bytes.Buffer) + err := binary.Write(pushHeadBuf, binary.LittleEndian, pushHead) + if err != nil { + return err + } + data, err := json.Marshal(packet) + if err != nil { + return err + } + // gLog.Println(LevelINFO,"write packet:", string(data)) + pushMsg := append(encodeHeader(MsgPush, subType, uint32(len(data)+PushHeaderSize)), pushHeadBuf.Bytes()...) + pushMsg = append(pushMsg, data...) + pn.writeMtx.Lock() + defer pn.writeMtx.Unlock() + if err = pn.conn.WriteMessage(websocket.BinaryMessage, pushMsg); err != nil { + gLog.Printf(LevelERROR, "push to %s error:%s", to, err) + pn.conn.Close() + } + return err +} + +func (pn *P2PNetwork) read(node string, mainType uint16, subType uint16, timeout time.Duration) (head *openP2PHeader, body []byte) { + var nodeID uint64 + if node == "" { + nodeID = 0 + } else { + nodeID = nodeNameToID(node) + } + for { + select { + case <-time.After(timeout): + gLog.Printf(LevelERROR, "wait msg%d:%d timeout", mainType, subType) + return + case msg := <-pn.msgMap[nodeID]: + head = &openP2PHeader{} + err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, head) + if err != nil { + gLog.Println(LevelERROR, "read msg error:", err) + break + } + if head.MainType != mainType || head.SubType != subType { + continue + } + if mainType == MsgPush { + body = msg[openP2PHeaderSize+PushHeaderSize:] + } else { + body = msg[openP2PHeaderSize:] + } + return + } + } +} + +func (pn *P2PNetwork) handlePush(subType uint16, msg []byte) error { + pushHead := PushHeader{} + err := binary.Read(bytes.NewReader(msg[openP2PHeaderSize:openP2PHeaderSize+PushHeaderSize]), binary.LittleEndian, &pushHead) + if err != nil { + return err + } + gLog.Printf(LevelDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) + switch subType { + case MsgPushConnectReq: + req := PushConnectReq{} + err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) + if err != nil { + gLog.Printf(LevelERROR, "wrong MsgPushConnectReq:%s", err) + return err + } + gLog.Printf(LevelINFO, "%s is connecting...", req.From) + gLog.Println(LevelDEBUG, "push connect response to ", req.From) + // verify token or name&password + if VerifyTOTP(req.Token, pn.config.User, pn.config.Password, time.Now().Unix()) || (req.User == pn.config.User && req.Password == pn.config.Password) { + gLog.Printf(LevelINFO, "Access Granted\n") + config := AppConfig{} + config.peerNatType = req.NatType + config.peerConeNatPort = req.ConeNatPort + config.peerIP = req.FromIP + config.PeerNode = req.From + // share relay node will limit bandwidth + if req.User != pn.config.User || req.Password != pn.config.Password { + gLog.Printf(LevelINFO, "set share bandwidth %d mbps", pn.config.shareBandwidth) + config.shareBandwidth = pn.config.shareBandwidth + } + // go pn.AddTunnel(config, req.ID) + go pn.addDirectTunnel(config, req.ID) + break + } + gLog.Println(LevelERROR, "Access Denied:", req.From) + rsp := PushConnectRsp{ + Error: 1, + Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node), + To: req.From, + From: pn.config.Node, + } + pn.push(req.From, MsgPushConnectRsp, rsp) + case MsgPushRsp: + rsp := PushRsp{} + err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp) + if err != nil { + gLog.Printf(LevelERROR, "wrong pushRsp:%s", err) + return err + } + if rsp.Error == 0 { + gLog.Printf(LevelDEBUG, "push ok, detail:%s", rsp.Detail) + } else { + gLog.Printf(LevelERROR, "push error:%d, detail:%s", rsp.Error, rsp.Detail) + } + case MsgPushAddRelayTunnelReq: + req := AddRelayTunnelReq{} + err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) + if err != nil { + gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err) + return err + } + config := AppConfig{} + config.PeerNode = req.RelayName + config.peerToken = req.RelayToken + // set user password, maybe the relay node is your private node + config.PeerUser = pn.config.User + config.PeerPassword = pn.config.Password + go func(r AddRelayTunnelReq) { + t, errDt := pn.addDirectTunnel(config, 0) + if errDt == nil { + // notify peer relay ready + msg := TunnelMsg{ID: t.id} + pn.push(r.From, MsgPushAddRelayTunnelRsp, msg) + SaveKey(req.AppID, req.AppKey) + } + + }(req) + case MsgPushUpdate: + update() + if gConf.daemonMode { + os.Exit(0) + } + default: + pn.msgMapMtx.Lock() + ch := pn.msgMap[pushHead.From] + pn.msgMapMtx.Unlock() + ch <- msg + } + return nil +} + +func (pn *P2PNetwork) updateAppHeartbeat(appID uint64) { + pn.apps.Range(func(id, i interface{}) bool { + key := id.(uint64) + if key != appID { + return true + } + app := i.(*p2pApp) + app.updateHeartbeat() + return false + }) +} diff --git a/p2ptunnel.go b/p2ptunnel.go new file mode 100644 index 0000000..18fca56 --- /dev/null +++ b/p2ptunnel.go @@ -0,0 +1,418 @@ +package main + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math/rand" + "net" + "sync" + "time" +) + +type P2PTunnel struct { + pn *P2PNetwork + conn p2pConn + hbTime time.Time + hbMtx sync.Mutex + hbTimeRelay time.Time + config AppConfig + la *net.UDPAddr // local hole address + ra *net.UDPAddr // remote hole address + overlayConns sync.Map // both TCP and UDP + id uint64 + running bool + runMtx sync.Mutex + isServer bool // 0:server 1:client + coneLocalPort int + coneNatPort int +} + +func (t *P2PTunnel) init() { + t.running = true + t.hbMtx.Lock() + t.hbTime = time.Now() + t.hbMtx.Unlock() + t.hbTimeRelay = time.Now().Add(time.Second * 600) // TODO: test fake time + localPort := int(rand.Uint32()%10000 + 50000) + if t.pn.config.natType == NATCone { + // prepare one random cone hole + _, _, port1, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort) + t.coneLocalPort = localPort + t.coneNatPort = port1 + t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort} + } else { + t.coneLocalPort = localPort + t.coneNatPort = localPort // NATNONE or symmetric doesn't need coneNatPort + t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort} + } + gLog.Printf(LevelDEBUG, "prepare punching port %d:%d", t.coneLocalPort, t.coneNatPort) +} + +func (t *P2PTunnel) connect() error { + gLog.Printf(LevelINFO, "start p2pTunnel to %s ", t.config.PeerNode) + t.isServer = false + req := PushConnectReq{ + User: t.config.PeerUser, + Password: t.config.PeerPassword, + Token: t.config.peerToken, + From: t.pn.config.Node, + FromIP: t.pn.config.publicIP, + ConeNatPort: t.coneNatPort, + NatType: t.pn.config.natType, + ID: t.id} + t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) + head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, time.Second*10) + if head == nil { + return errors.New("connect error") + } + rsp := PushConnectRsp{} + err := json.Unmarshal(body, &rsp) + if err != nil { + gLog.Printf(LevelERROR, "wrong MsgPushConnectRsp:%s", err) + return err + } + // gLog.Println(LevelINFO, rsp) + if rsp.Error != 0 { + return errors.New(rsp.Detail) + } + t.config.peerNatType = int(rsp.NatType) + t.config.peerConeNatPort = rsp.ConeNatPort + t.config.peerIP = rsp.FromIP + err = t.handshake() + if err != nil { + gLog.Println(LevelERROR, "handshake error:", err) + err = ErrorHandshake + } + return err +} + +func (t *P2PTunnel) isRuning() bool { + t.runMtx.Lock() + defer t.runMtx.Unlock() + return t.running +} + +func (t *P2PTunnel) setRun(running bool) { + t.runMtx.Lock() + defer t.runMtx.Unlock() + t.running = running +} + +func (t *P2PTunnel) isActive() bool { + t.hbMtx.Lock() + defer t.hbMtx.Unlock() + return time.Now().Before(t.hbTime.Add(TunnelIdleTimeout)) +} + +func (t *P2PTunnel) checkActive() bool { + hbt := time.Now() + t.hbMtx.Lock() + if t.hbTime.Before(time.Now().Add(-TunnelHeartbeatTime)) { + t.hbMtx.Unlock() + return false + } + t.hbMtx.Unlock() + // hbtime within TunnelHeartbeatTime, check it now + t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil) + isActive := false + // wait at most 5s + for i := 0; i < 50 && !isActive; i++ { + t.hbMtx.Lock() + if t.hbTime.After(hbt) { + isActive = true + } + t.hbMtx.Unlock() + time.Sleep(time.Millisecond * 100) + } + return isActive +} + +// call when user delete tunnel +func (t *P2PTunnel) close() { + t.setRun(false) + t.pn.allTunnels.Delete(t.id) +} + +func (t *P2PTunnel) handshake() error { + if t.config.peerConeNatPort > 0 { + var err error + t.ra, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort)) + if err != nil { + return err + } + } + gLog.Println(LevelINFO, "handshake to ", t.config.PeerNode) + var err error + // TODO: handle NATNone, nodes with public ip has no punching + if (t.pn.config.natType == NATCone && t.config.peerNatType == NATCone) || (t.pn.config.natType == NATNone || t.config.peerNatType == NATNone) { + err = handshakeC2C(t) + } else if t.config.peerNatType == NATSymmetric && t.pn.config.natType == NATSymmetric { + err = ErrorS2S + t.close() + } else if t.config.peerNatType == NATSymmetric && t.pn.config.natType == NATCone { + err = handshakeC2S(t) + } else if t.config.peerNatType == NATCone && t.pn.config.natType == NATSymmetric { + err = handshakeS2C(t) + } else { + return errors.New("unknown error") + } + if err != nil { + gLog.Println(LevelERROR, "punch handshake error:", err) + return err + } + gLog.Printf(LevelINFO, "handshake to %s ok", t.config.PeerNode) + err = t.run() + if err != nil { + gLog.Println(LevelERROR, err) + return err + } + return nil +} + +func (t *P2PTunnel) run() error { + if t.isServer { + qConn, e := listenQuic(t.la.String(), TunnelIdleTimeout) + if e != nil { + gLog.Println(LevelINFO, "listen quic error:", e, ", retry...") + time.Sleep(time.Millisecond * 10) + qConn, e = listenQuic(t.la.String(), TunnelIdleTimeout) + if e != nil { + return fmt.Errorf("listen quic error:%s", e) + } + } + t.pn.push(t.config.PeerNode, MsgPushQuicConnect, nil) + e = qConn.Accept() + if e != nil { + qConn.CloseListener() + return fmt.Errorf("accept quic error:%s", e) + } + _, buff, err := qConn.ReadMessage() + if e != nil { + qConn.listener.Close() + return fmt.Errorf("read start msg error:%s", err) + } + if buff != nil { + gLog.Println(LevelDEBUG, string(buff)) + } + qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) + gLog.Println(LevelINFO, "quic connection ok") + t.conn = qConn + t.setRun(true) + go t.readLoop() + go t.writeLoop() + return nil + } + + //else + conn, e := net.ListenUDP("udp", t.la) + if e != nil { + time.Sleep(time.Millisecond * 10) + conn, e = net.ListenUDP("udp", t.la) + if e != nil { + return fmt.Errorf("quic listen error:%s", e) + } + } + t.pn.read(t.config.PeerNode, MsgPush, MsgPushQuicConnect, time.Second*5) + gLog.Println(LevelINFO, "quic dial to ", t.ra.String()) + qConn, e := dialQuic(conn, t.ra, TunnelIdleTimeout) + if e != nil { + return fmt.Errorf("quic dial to %s error:%s", t.ra.String(), e) + } + handshakeBegin := time.Now() + qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) + _, buff, err := qConn.ReadMessage() + if e != nil { + qConn.listener.Close() + return fmt.Errorf("read MsgTunnelHandshake error:%s", err) + } + if buff != nil { + gLog.Println(LevelDEBUG, string(buff)) + } + + gLog.Println(LevelINFO, "rtt=", time.Since(handshakeBegin)) + gLog.Println(LevelINFO, "quic connection ok") + t.conn = qConn + t.setRun(true) + go t.readLoop() + go t.writeLoop() + return nil +} + +func (t *P2PTunnel) readLoop() { + decryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding + gLog.Printf(LevelINFO, "%d tunnel readloop start", t.id) + for t.isRuning() { + t.conn.SetReadDeadline(time.Now().Add(TunnelIdleTimeout)) + head, body, err := t.conn.ReadMessage() + if err != nil { + if t.isRuning() { + gLog.Printf(LevelERROR, "%d tunnel read error:%s", t.id, err) + } + break + } + if head.MainType != MsgP2P { + continue + } + switch head.SubType { + case MsgTunnelHeartbeat: + t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil) + gLog.Printf(LevelDEBUG, "%d read tunnel heartbeat", t.id) + case MsgTunnelHeartbeatAck: + t.hbMtx.Lock() + t.hbTime = time.Now() + t.hbMtx.Unlock() + gLog.Printf(LevelDEBUG, "%d read tunnel heartbeat ack", t.id) + case MsgOverlayData: + if len(body) < overlayHeaderSize { + continue + } + overlayID := binary.LittleEndian.Uint64(body[:8]) + gLog.Printf(LevelDEBUG, "%d tunnel read overlay data %d", t.id, overlayID) + s, ok := t.overlayConns.Load(overlayID) + if !ok { + // debug level, when overlay connection closed, always has some packet not found tunnel + gLog.Printf(LevelDEBUG, "%d tunnel not found overlay connection %d", t.id, overlayID) + continue + } + overlayConn, ok := s.(*overlayTCP) + if !ok { + continue + } + payload := body[overlayHeaderSize:] + var err error + if overlayConn.appKey != 0 { + payload, _ = decryptBytes(overlayConn.appKeyBytes, decryptData, body[overlayHeaderSize:], int(head.DataLen-uint32(overlayHeaderSize))) + } + _, err = overlayConn.Write(payload) + if err != nil { + gLog.Println(LevelERROR, "overlay write error:", err) + } + case MsgRelayData: + gLog.Printf(LevelDEBUG, "got relay data datalen=%d", head.DataLen) + if len(body) < 8 { + continue + } + tunnelID := binary.LittleEndian.Uint64(body[:8]) + t.pn.relay(tunnelID, body[8:]) + case MsgRelayHeartbeat: + req := RelayHeartbeat{} + err := json.Unmarshal(body, &req) + if err != nil { + gLog.Printf(LevelERROR, "wrong RelayHeartbeat:%s", err) + continue + } + gLog.Printf(LevelDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID) + relayHead := new(bytes.Buffer) + binary.Write(relayHead, binary.LittleEndian, req.RelayTunnelID) + msg, _ := newMessage(MsgP2P, MsgRelayHeartbeatAck, &req) + msgWithHead := append(relayHead.Bytes(), msg...) + t.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + case MsgRelayHeartbeatAck: + req := RelayHeartbeat{} + err := json.Unmarshal(body, &req) + if err != nil { + gLog.Printf(LevelERROR, "wrong RelayHeartbeat:%s", err) + continue + } + gLog.Printf(LevelDEBUG, "got MsgRelayHeartbeatAck to %d", req.AppID) + t.pn.updateAppHeartbeat(req.AppID) + case MsgOverlayConnectReq: + req := OverlayConnectReq{} + err := json.Unmarshal(body, &req) + if err != nil { + gLog.Printf(LevelERROR, "wrong MsgOverlayConnectReq:%s", err) + continue + } + // app connect only accept user/password, avoid someone using the share relay node's token + if req.User != t.pn.config.User || req.Password != t.pn.config.Password { + gLog.Println(LevelERROR, "Access Denied:", req.User) + continue + } + + overlayID := req.ID + gLog.Printf(LevelINFO, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req) + if req.Protocol == "tcp" { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5) + if err != nil { + gLog.Println(LevelERROR, err) + continue + } + otcp := overlayTCP{ + tunnel: t, + conn: conn, + id: overlayID, + isClient: false, + rtid: req.RelayTunnelID, + appID: req.AppID, + appKey: GetKey(req.AppID), + } + // calc key bytes for encrypt + if otcp.appKey != 0 { + encryptKey := make([]byte, 16) + binary.LittleEndian.PutUint64(encryptKey, otcp.appKey) + binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey) + otcp.appKeyBytes = encryptKey + } + + t.overlayConns.Store(otcp.id, &otcp) + go otcp.run() + } + case MsgOverlayDisconnectReq: + req := OverlayDisconnectReq{} + err := json.Unmarshal(body, &req) + if err != nil { + gLog.Printf(LevelERROR, "wrong OverlayDisconnectRequest:%s", err) + continue + } + overlayID := req.ID + gLog.Printf(LevelINFO, "%d disconnect overlay connection %d", t.id, overlayID) + i, ok := t.overlayConns.Load(overlayID) + if ok { + otcp := i.(*overlayTCP) + otcp.running = false + } + default: + } + } + t.setRun(false) + t.conn.Close() + gLog.Printf(LevelINFO, "%d tunnel readloop end", t.id) +} + +func (t *P2PTunnel) writeLoop() { + tc := time.NewTicker(TunnelHeartbeatTime) + defer tc.Stop() + defer gLog.Printf(LevelINFO, "%d tunnel writeloop end", t.id) + for t.isRuning() { + select { + case <-tc.C: + // tunnel send + err := t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil) + if err != nil { + gLog.Printf(LevelERROR, "%d write tunnel heartbeat error %s", t.id, err) + t.setRun(false) + return + } + gLog.Printf(LevelDEBUG, "%d write tunnel heartbeat ok", t.id) + } + } +} + +func (t *P2PTunnel) listen() error { + gLog.Printf(LevelINFO, "p2ptunnel wait for connecting") + t.isServer = true + return t.handshake() +} + +func (t *P2PTunnel) closeOverlayConns(appID uint64) { + t.overlayConns.Range(func(_, i interface{}) bool { + otcp := i.(*overlayTCP) + if otcp.appID == appID { + otcp.conn.Close() + } + return true + }) +} diff --git a/protocol.go b/protocol.go new file mode 100644 index 0000000..c69e336 --- /dev/null +++ b/protocol.go @@ -0,0 +1,280 @@ +package main + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "hash/crc64" + "time" +) + +const OpenP2PVersion = "0.95.5" +const ProducnName string = "openp2p" + +type openP2PHeader struct { + DataLen uint32 + MainType uint16 + SubType uint16 +} + +var openP2PHeaderSize = binary.Size(openP2PHeader{}) + +type PushHeader struct { + From uint64 + To uint64 +} + +var PushHeaderSize = binary.Size(PushHeader{}) + +type overlayHeader struct { + id uint64 +} + +var overlayHeaderSize = binary.Size(overlayHeader{}) + +func decodeHeader(data []byte) (*openP2PHeader, error) { + head := openP2PHeader{} + rd := bytes.NewReader(data) + err := binary.Read(rd, binary.LittleEndian, &head) + if err != nil { + return nil, err + } + return &head, nil +} + +func encodeHeader(mainType uint16, subType uint16, len uint32) []byte { + head := openP2PHeader{ + len, + mainType, + subType, + } + headBuf := new(bytes.Buffer) + err := binary.Write(headBuf, binary.LittleEndian, head) + if err != nil { + return []byte("") + } + return headBuf.Bytes() +} + +// Message type +const ( + MsgLogin = 0 + MsgHeartbeat = 1 + MsgNATDetect = 2 + MsgPush = 3 + MsgP2P = 4 + MsgRelay = 5 + MsgReport = 6 +) + +const ( + MsgPushRsp = 0 + MsgPushConnectReq = 1 + MsgPushConnectRsp = 2 + MsgPushHandshakeStart = 3 + MsgPushAddRelayTunnelReq = 4 + MsgPushAddRelayTunnelRsp = 5 + MsgPushUpdate = 6 + MsgPushReportApps = 7 + MsgPushQuicConnect = 8 +) + +// MsgP2P sub type message +const ( + MsgPunchHandshake = iota + MsgPunchHandshakeAck + MsgTunnelHandshake + MsgTunnelHandshakeAck + MsgTunnelHeartbeat + MsgTunnelHeartbeatAck + MsgOverlayConnectReq + MsgOverlayConnectRsp + MsgOverlayDisconnectReq + MsgOverlayData + MsgRelayData + MsgRelayHeartbeat + MsgRelayHeartbeatAck +) + +// MsgRelay sub type message +const ( + MsgRelayNodeReq = iota + MsgRelayNodeRsp +) + +// MsgReport sub type message +const ( + MsgReportBasic = iota + MsgReportQuery + MsgReportConnect +) + +const ( + ReadBuffLen = 1024 + NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow + TunnelHeartbeatTime = time.Second * 15 + TunnelIdleTimeout = time.Minute + SymmetricHandshakeNum = 800 // 0.992379 + // SymmetricHandshakeNum = 1000 // 0.999510 + SymmetricHandshakeInterval = time.Millisecond + SymmetricHandshakeAckTimeout = time.Second * 11 + PeerAddRelayTimeount = time.Second * 20 + CheckActiveTimeout = time.Second * 5 + PaddingSize = 16 + AESKeySize = 16 + MaxRetry = 10 + RetryInterval = time.Second * 30 + PublicIPEchoTimeout = time.Second * 5 + NatTestTimeout = time.Second * 10 +) + +// error message +var ( + // ErrorS2S string = "s2s is not supported" + // ErrorHandshake string = "handshake error" + ErrorS2S = errors.New("s2s is not supported") + ErrorHandshake = errors.New("handshake error") +) + +// NATNone has public ip +const ( + NATNone = 0 + NATCone = 1 + NATSymmetric = 2 +) + +func newMessage(mainType uint16, subType uint16, packet interface{}) ([]byte, error) { + data, err := json.Marshal(packet) + if err != nil { + return nil, err + } + // gLog.Println(LevelINFO,"write packet:", string(data)) + head := openP2PHeader{ + uint32(len(data)), + mainType, + subType, + } + headBuf := new(bytes.Buffer) + err = binary.Write(headBuf, binary.LittleEndian, head) + if err != nil { + return nil, err + } + writeBytes := append(headBuf.Bytes(), data...) + return writeBytes, nil +} + +func nodeNameToID(name string) uint64 { + return crc64.Checksum([]byte(name), crc64.MakeTable(crc64.ISO)) +} + +type PushConnectReq struct { + From string `json:"from,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Token uint64 `json:"token,omitempty"` + ConeNatPort int `json:"coneNatPort,omitempty"` + NatType int `json:"natType,omitempty"` + FromIP string `json:"fromIP,omitempty"` + ID uint64 `json:"id,omitempty"` +} +type PushConnectRsp struct { + Error int `json:"error,omitempty"` + From string `json:"from,omitempty"` + To string `json:"to,omitempty"` + Detail string `json:"detail,omitempty"` + NatType int `json:"natType,omitempty"` + ConeNatPort int `json:"coneNatPort,omitempty"` + FromIP string `json:"fromIP,omitempty"` + ID uint64 `json:"id,omitempty"` +} +type PushRsp struct { + Error int `json:"error,omitempty"` + Detail string `json:"detail,omitempty"` +} + +type LoginRsp struct { + Error int `json:"error,omitempty"` + Detail string `json:"detail,omitempty"` + Ts uint64 `json:"ts,omitempty"` +} + +type NatDetectReq struct { + SrcPort int `json:"srcPort,omitempty"` + EchoPort int `json:"echoPort,omitempty"` +} + +type NatDetectRsp struct { + IP string `json:"IP,omitempty"` + Port int `json:"port,omitempty"` + IsPublicIP int `json:"isPublicIP,omitempty"` +} + +type P2PHandshakeReq struct { + ID uint64 `json:"id,omitempty"` +} + +type OverlayConnectReq struct { + ID uint64 `json:"id,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + DstIP string `json:"dstIP,omitempty"` + DstPort int `json:"dstPort,omitempty"` + Protocol string `json:"protocol,omitempty"` + RelayTunnelID uint64 `json:"relayTunnelID,omitempty"` // if not 0 relay + AppID uint64 `json:"appID,omitempty"` +} +type OverlayDisconnectReq struct { + ID uint64 `json:"id,omitempty"` +} +type TunnelMsg struct { + ID uint64 `json:"id,omitempty"` +} + +type RelayNodeRsp struct { + RelayName string `json:"relayName,omitempty"` + RelayToken uint64 `json:"relayToken,omitempty"` +} + +type AddRelayTunnelReq struct { + From string `json:"from,omitempty"` + RelayName string `json:"relayName,omitempty"` + RelayToken uint64 `json:"relayToken,omitempty"` + AppID uint64 `json:"appID,omitempty"` + AppKey uint64 `json:"appKey,omitempty"` +} + +type RelayHeartbeat struct { + RelayTunnelID uint64 `json:"relayTunnelID,omitempty"` + AppID uint64 `json:"appID,omitempty"` +} + +type ReportBasic struct { + OS string `json:"os,omitempty"` + Mac string `json:"mac,omitempty"` + LanIP string `json:"lanIP,omitempty"` + IPv6 string `json:"IPv6,omitempty"` + Version string `json:"version,omitempty"` +} + +type ReportConnect struct { + Error string `json:"error,omitempty"` + Protocol string `json:"protocol,omitempty"` + SrcPort int `json:"srcPort,omitempty"` + NatType int `json:"natType,omitempty"` + PeerNode string `json:"peerNode,omitempty"` + DstPort int `json:"dstPort,omitempty"` + DstHost string `json:"dsdtHost,omitempty"` + PeerUser string `json:"peerUser,omitempty"` + PeerNatType int `json:"peerNatType,omitempty"` + PeerIP string `json:"peerIP,omitempty"` + ShareBandwidth int `json:"shareBandWidth,omitempty"` + RelayNode string `json:"relayNode,omitempty"` + Version string `json:"version,omitempty"` +} + +type UpdateInfo struct { + Error int `json:"error,omitempty"` + ErrorDetail string `json:"errorDetail,omitempty"` + Url string `json:"url,omitempty"` +} diff --git a/quic.go b/quic.go new file mode 100644 index 0000000..7a722ad --- /dev/null +++ b/quic.go @@ -0,0 +1,151 @@ +package main + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/json" + "encoding/pem" + "fmt" + "io" + "math/big" + "net" + "sync" + "time" + + "github.com/lucas-clemente/quic-go" +) + +//quic.DialContext do not support version 44,disable it +var quicVersion []quic.VersionNumber + +type quicConn struct { + listener quic.Listener + writeMtx *sync.Mutex + quic.Stream + quic.Session +} + +func (conn *quicConn) ReadMessage() (*openP2PHeader, []byte, error) { + headBuf := make([]byte, openP2PHeaderSize) + _, err := io.ReadFull(conn, headBuf) + if err != nil { + return nil, nil, err + } + head, err := decodeHeader(headBuf) + if err != nil { + return nil, nil, err + } + dataBuf := make([]byte, head.DataLen) + _, err = io.ReadFull(conn, dataBuf) + return head, dataBuf, err +} + +func (conn *quicConn) WriteBytes(mainType uint16, subType uint16, data []byte) error { + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err := conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *quicConn) WriteBuffer(data []byte) error { + conn.writeMtx.Lock() + _, err := conn.Write(data) + conn.writeMtx.Unlock() + return err +} + +func (conn *quicConn) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { + // TODO: call newMessage + data, err := json.Marshal(packet) + if err != nil { + return err + } + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err = conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *quicConn) Close() error { + conn.Stream.CancelRead(1) + conn.Session.CloseWithError(0, "") + return nil +} +func (conn *quicConn) CloseListener() { + if conn.listener != nil { + conn.listener.Close() + } +} + +func (conn *quicConn) Accept() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + sess, err := conn.listener.Accept(ctx) + if err != nil { + return err + } + stream, err := sess.AcceptStream(context.Background()) + if err != nil { + return err + } + conn.Stream = stream + conn.Session = sess + return nil +} + +func listenQuic(addr string, idleTimeout time.Duration) (*quicConn, error) { + gLog.Println(LevelINFO, "quic listen on ", addr) + listener, err := quic.ListenAddr(addr, generateTLSConfig(), + &quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true}) + if err != nil { + return nil, fmt.Errorf("quic.ListenAddr error:%s", err) + } + return &quicConn{listener: listener, writeMtx: &sync.Mutex{}}, nil +} + +func dialQuic(conn *net.UDPConn, remoteAddr *net.UDPAddr, idleTimeout time.Duration) (*quicConn, error) { + tlsConf := &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{"openp2pv1"}, + } + session, err := quic.DialContext(context.Background(), conn, remoteAddr, conn.LocalAddr().String(), tlsConf, + &quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true}) + if err != nil { + return nil, fmt.Errorf("quic.DialContext error:%s", err) + } + stream, err := session.OpenStreamSync(context.Background()) + if err != nil { + return nil, fmt.Errorf("OpenStreamSync error:%s", err) + } + qConn := &quicConn{nil, &sync.Mutex{}, stream, session} + return qConn, nil +} + +// Setup a bare-bones TLS config for the server +func generateTLSConfig() *tls.Config { + key, err := rsa.GenerateKey(rand.Reader, 1024) + if err != nil { + panic(err) + } + template := x509.Certificate{SerialNumber: big.NewInt(1)} + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) + if err != nil { + panic(err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) + + tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + panic(err) + } + return &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + NextProtos: []string{"openp2pv1"}, + } +} diff --git a/sysinfodarwin.go b/sysinfodarwin.go new file mode 100644 index 0000000..5a11b6e --- /dev/null +++ b/sysinfodarwin.go @@ -0,0 +1,30 @@ +//go:build darwin +// +build darwin + +package main + +import ( + "strings" + "syscall" +) + +func getOsName() (osName string) { + output := execOutput("sw_vers", "-productVersion") + osName = "Mac OS X " + strings.TrimSpace(output) + return +} + +func setRLimit() error { + var limit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + limit.Cur = 10240 + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + return nil +} + +func setFirewall() { +} diff --git a/sysinfolinux.go b/sysinfolinux.go new file mode 100644 index 0000000..3a6e8c6 --- /dev/null +++ b/sysinfolinux.go @@ -0,0 +1,70 @@ +//go:build linux +// +build linux + +package main + +import ( + "bufio" + "bytes" + "io/ioutil" + "os" + "strings" + "syscall" +) + +func getOsName() (osName string) { + var sysnamePath string + sysnamePath = "/etc/redhat-release" + _, err := os.Stat(sysnamePath) + if err != nil && os.IsNotExist(err) { + str := "PRETTY_NAME=" + f, err := os.Open("/etc/os-release") + if err != nil && os.IsNotExist(err) { + str = "DISTRIB_ID=" + f, err = os.Open("/etc/openwrt_release") + } + if err == nil { + buf := bufio.NewReader(f) + for { + line, err := buf.ReadString('\n') + if err == nil { + line = strings.TrimSpace(line) + pos := strings.Count(line, str) + if pos > 0 { + len1 := len([]rune(str)) + 1 + rs := []rune(line) + osName = string(rs[len1 : (len(rs))-1]) + break + } + } else { + break + } + } + } + } else { + buff, err := ioutil.ReadFile(sysnamePath) + if err == nil { + osName = string(bytes.TrimSpace(buff)) + } + } + if osName == "" { + osName = "Linux" + } + return +} + +func setRLimit() error { + var limit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + limit.Max = 1024 * 1024 + limit.Cur = limit.Max + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + return nil +} + +func setFirewall() { +} diff --git a/sysinfowin.go b/sysinfowin.go new file mode 100644 index 0000000..1779fa8 --- /dev/null +++ b/sysinfowin.go @@ -0,0 +1,51 @@ +//go:build windows +// +build windows + +package main + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + + "golang.org/x/sys/windows/registry" +) + +func getOsName() (osName string) { + k, err := registry.OpenKey(registry.LOCAL_MACHINE, `SOFTWARE\Microsoft\Windows NT\CurrentVersion`, registry.QUERY_VALUE|registry.WOW64_64KEY) + if err != nil { + return + } + defer k.Close() + pn, _, err := k.GetStringValue("ProductName") + if err == nil { + osName = pn + } + return +} + +func setRLimit() error { + return nil +} + +func setFirewall() { + fullPath, err := filepath.Abs(os.Args[0]) + if err != nil { + gLog.Println(LevelERROR, "add firewall error:", err) + return + } + isXP := false + osName := getOsName() + if strings.Contains(osName, "XP") || strings.Contains(osName, "2003") { + isXP = true + } + if isXP { + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall del allowedprogram "%s"`, fullPath)).Run() + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall add allowedprogram "%s" "%s" ENABLE`, ProducnName, fullPath)).Run() + } else { // win7 or later + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall del rule name="%s"`, ProducnName)).Run() + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall add rule name="%s" dir=in action=allow program="%s" enable=yes`, ProducnName, fullPath)).Run() + } +} diff --git a/totp.go b/totp.go new file mode 100644 index 0000000..cbf29cd --- /dev/null +++ b/totp.go @@ -0,0 +1,30 @@ +// Time-based One-time Password +package main + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/binary" +) + +const TOTPStep = 30 // 30s +func GenTOTP(user string, password string, ts int64) uint64 { + step := ts / TOTPStep + mac := hmac.New(sha256.New, []byte(user+password)) + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(step)) + mac.Write(b) + num := binary.LittleEndian.Uint64(mac.Sum(nil)[:8]) + // fmt.Printf("%x\n", mac.Sum(nil)) + return num +} + +func VerifyTOTP(code uint64, user string, password string, ts int64) bool { + if code == 0 { + return false + } + if code == GenTOTP(user, password, ts) || code == GenTOTP(user, password, ts-TOTPStep) || code == GenTOTP(user, password, ts+TOTPStep) { + return true + } + return false +} diff --git a/totp_test.go b/totp_test.go new file mode 100644 index 0000000..f641637 --- /dev/null +++ b/totp_test.go @@ -0,0 +1,36 @@ +// Time-based One-time Password +package main + +import ( + "testing" + "time" +) + +func TestTOTP(t *testing.T) { + for i := 0; i < 20; i++ { + ts := time.Now().Unix() + code := GenTOTP("testuser1", "testpassword1", ts) + t.Log(code) + if !VerifyTOTP(code, "testuser1", "testpassword1", ts) { + t.Error("TOTP error") + } + if !VerifyTOTP(code, "testuser1", "testpassword1", ts-10) { + t.Error("TOTP error") + } + if !VerifyTOTP(code, "testuser1", "testpassword1", ts+10) { + t.Error("TOTP error") + } + if VerifyTOTP(code, "testuser1", "testpassword1", ts+60) { + t.Error("TOTP error") + } + if VerifyTOTP(code, "testuser2", "testpassword1", ts+1) { + t.Error("TOTP error") + } + if VerifyTOTP(code, "testuser1", "testpassword2", ts+1) { + t.Error("TOTP error") + } + time.Sleep(time.Second) + t.Log("round", i, " ", ts, " test ok") + } + +} diff --git a/udp.go b/udp.go new file mode 100644 index 0000000..3ec9a26 --- /dev/null +++ b/udp.go @@ -0,0 +1,44 @@ +package main + +import ( + "bytes" + "encoding/binary" + "net" + "time" +) + +func UDPWrite(conn *net.UDPConn, dst net.Addr, mainType uint16, subType uint16, packet interface{}) (len int, err error) { + msg, err := newMessage(mainType, subType, packet) + if err != nil { + return 0, err + } + if dst == nil { + return conn.Write(msg) + } + return conn.WriteTo(msg, dst) +} + +func UDPRead(conn *net.UDPConn, timeout int) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) { + if timeout > 0 { + deadline := time.Now().Add(time.Millisecond * time.Duration(timeout)) + err = conn.SetReadDeadline(deadline) + if err != nil { + gLog.Println(LevelERROR, "SetReadDeadline error") + return nil, nil, nil, 0, err + } + } + + result = make([]byte, 1024) + len, ra, err = conn.ReadFrom(result) + if err != nil { + // gLog.Println(LevelDEBUG, "ReadFrom error") + return nil, nil, nil, 0, err + } + head = &openP2PHeader{} + err = binary.Read(bytes.NewReader(result[:openP2PHeaderSize]), binary.LittleEndian, head) + if err != nil { + gLog.Println(LevelERROR, "parse p2pheader error:", err) + return nil, nil, nil, 0, err + } + return +} diff --git a/update.go b/update.go new file mode 100644 index 0000000..1650f5a --- /dev/null +++ b/update.go @@ -0,0 +1,207 @@ +package main + +import ( + "archive/tar" + "archive/zip" + "compress/gzip" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "runtime" + "time" +) + +// type updateFileInfo struct { +// Name string `json:"name,omitempty"` +// RelativePath string `json:"relativePath,omitempty"` +// Length int64 `json:"length,omitempty"` +// URL string `json:"url,omitempty"` +// Hash string `json:"hash,omitempty"` +// } + +func update() { + gLog.Println(LevelINFO, "update start") + defer gLog.Println(LevelINFO, "update end") + // TODO: download from gitee. save flow + c := http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: time.Second * 30, + } + goos := runtime.GOOS + goarch := runtime.GOARCH + rsp, err := c.Get(fmt.Sprintf("https://openp2p.cn:27182/api/v1/update?fromver=%s&os=%s&arch=%s", OpenP2PVersion, goos, goarch)) + if err != nil { + gLog.Println(LevelERROR, "update:query update list failed:", err) + return + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + gLog.Println(LevelERROR, "get update info error:", rsp.Status) + return + } + rspBuf, err := ioutil.ReadAll(rsp.Body) + if err != nil { + gLog.Println(LevelERROR, "update:read update list failed:", err) + return + } + updateInfo := UpdateInfo{} + err = json.Unmarshal(rspBuf, &updateInfo) + if err != nil { + gLog.Println(LevelERROR, rspBuf, " update info decode error:", err) + return + } + if updateInfo.Error != 0 { + gLog.Println(LevelERROR, "update error:", updateInfo.Error, updateInfo.ErrorDetail) + return + } + os.MkdirAll("download", 0666) + err = updateFile(updateInfo.Url, "", "openp2p") + if err != nil { + gLog.Println(LevelERROR, "update: download failed:", err) + return + } +} + +// todo rollback on error +func updateFile(url string, checksum string, dst string) error { + gLog.Println(LevelINFO, "download ", url) + tmpFile := filepath.Dir(os.Args[0]) + "/openp2p.tmp" + output, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0776) + if err != nil { + gLog.Printf(LevelERROR, "OpenFile %s error:%s", tmpFile, err) + return err + } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + response, err := client.Get(url) + if err != nil { + gLog.Printf(LevelERROR, "download url %s error:%s", url, err) + output.Close() + return err + } + defer response.Body.Close() + n, err := io.Copy(output, response.Body) + if err != nil { + gLog.Printf(LevelERROR, "io.Copy error:%s", err) + output.Close() + return err + } + output.Sync() + output.Close() + gLog.Println(LevelINFO, "download ", url, " ok") + gLog.Printf(LevelINFO, "size: %d bytes", n) + + err = os.Rename(os.Args[0], os.Args[0]+"0") + if err != nil && os.IsExist(err) { + gLog.Printf(LevelINFO, " rename %s error:%s", os.Args[0], err) + } + // extract + gLog.Println(LevelINFO, "extract files") + err = extract(filepath.Dir(os.Args[0]), tmpFile) + if err != nil { + gLog.Printf(LevelERROR, "extract error:%s. revert rename", err) + os.Rename(os.Args[0]+"0", os.Args[0]) + return err + } + return nil +} + +func extract(dst, src string) (err error) { + if runtime.GOOS == "windows" { + return unzip(dst, src) + } else { + return extractTgz(dst, src) + } +} + +func unzip(dst, src string) (err error) { + archive, err := zip.OpenReader(src) + if err != nil { + return err + } + defer archive.Close() + + for _, f := range archive.File { + filePath := filepath.Join(dst, f.Name) + fmt.Println("unzipping file ", filePath) + + // if !strings.HasPrefix(filePath, filepath.Clean(dst)+string(os.PathSeparator)) { + // fmt.Println("invalid file path") + // return + // } + if f.FileInfo().IsDir() { + fmt.Println("creating directory...") + os.MkdirAll(filePath, os.ModePerm) + continue + } + if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { + return err + } + dstFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) + if err != nil { + return err + } + fileInArchive, err := f.Open() + if err != nil { + return err + } + if _, err := io.Copy(dstFile, fileInArchive); err != nil { + return err + } + dstFile.Close() + fileInArchive.Close() + } + return nil +} + +func extractTgz(dst, src string) error { + gzipStream, err := os.Open(src) + if err != nil { + return err + } + uncompressedStream, err := gzip.NewReader(gzipStream) + if err != nil { + return err + } + tarReader := tar.NewReader(uncompressedStream) + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return err + } + switch header.Typeflag { + case tar.TypeDir: + if err := os.Mkdir(header.Name, 0755); err != nil { + return err + } + case tar.TypeReg: + filePath := filepath.Join(dst, header.Name) + outFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.FileMode(header.Mode)) + if err != nil { + return err + } + if err != nil { + return err + } + defer outFile.Close() + if _, err := io.Copy(outFile, tarReader); err != nil { + return err + } + default: + return err + } + } + return nil +}