diff --git a/cmd/dmsgweb/commands/dmsgweb.go b/cmd/dmsgweb/commands/dmsgweb.go index 8f74616a..5029e615 100644 --- a/cmd/dmsgweb/commands/dmsgweb.go +++ b/cmd/dmsgweb/commands/dmsgweb.go @@ -180,7 +180,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile dmsgWebLog.Info("dmsg client pk: ", pk.String()) if len(resolveDmsgAddr) > 0 { dialPK = make([]cipher.PubKey, len(resolveDmsgAddr)) - dmsgPorts = make([]uint, dmsgSessions) + dmsgPorts = make([]uint, len(resolveDmsgAddr)) for i, dmsgaddr := range resolveDmsgAddr { dmsgWebLog.Info("dmsg address to dial: ", dmsgaddr) dmsgAddr = strings.Split(dmsgaddr, ":") @@ -201,8 +201,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile } } } - dmsgWebLog.Info("test") - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) + dmsgC, closeDmsg, err = startDmsg(ctx, pk, sk) if err != nil { dmsgWebLog.WithError(err).Fatal("failed to start dmsg") } @@ -272,15 +271,19 @@ dmsgweb conf file detected: ` + dmsgwebconffile if len(resolveDmsgAddr) == 0 && len(webPort) == 1 { if rawTCP[0] { + dmsgWebLog.Debug("proxyTCPConn(-1)") proxyTCPConn(-1) } else { + dmsgWebLog.Debug("proxyHTTPConn(-1)") proxyHTTPConn(-1) } } else { for i := range resolveDmsgAddr { if rawTCP[i] { + dmsgWebLog.Debug("proxyTCPConn(" + fmt.Sprintf("%v", i) + ")") proxyTCPConn(i) } else { + dmsgWebLog.Debug("proxyHTTPConn(" + fmt.Sprintf("%v", i) + ")") proxyHTTPConn(i) } } @@ -386,7 +389,7 @@ func proxyTCPConn(n int) { } wg.Add(1) - go func(conn net.Conn, n int) { + go func(conn net.Conn, n int, dmsgC *dmsg.Client) { defer wg.Done() defer conn.Close() //nolint @@ -400,7 +403,7 @@ func proxyTCPConn(n int) { go func() { _, err := io.Copy(dmsgConn, conn) if err != nil { - log.Printf("Error copying data to dmsg server: %v", err) + log.Printf("Error copying data to dmsg client: %v", err) } dmsgConn.Close() //nolint }() @@ -408,11 +411,11 @@ func proxyTCPConn(n int) { go func() { _, err := io.Copy(conn, dmsgConn) if err != nil { - log.Printf("Error copying data from dmsg server: %v", err) + log.Printf("Error copying data from dmsg client: %v", err) } conn.Close() //nolint }() - }(conn, n) + }(conn, n, dmsgC) } } diff --git a/cmd/dmsgweb/commands/root.go b/cmd/dmsgweb/commands/root.go index 091a3123..00d5e7c5 100644 --- a/cmd/dmsgweb/commands/root.go +++ b/cmd/dmsgweb/commands/root.go @@ -26,6 +26,7 @@ import ( var ( httpC http.Client dmsgC *dmsg.Client + closeDmsg func() dmsgDisc string dmsgSessions int dmsgAddr []string diff --git a/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go b/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go new file mode 100644 index 00000000..0b04782a --- /dev/null +++ b/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go @@ -0,0 +1,211 @@ +package main + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "os" + "sync" + + cc "github.com/ivanpirog/coloredcobra" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/cmdutil" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/skycoin/skywire-utilities/pkg/skyenv" + "github.com/spf13/cobra" + + "github.com/skycoin/dmsg/pkg/disc" + dmsg "github.com/skycoin/dmsg/pkg/dmsg" +) + +func main() { + cc.Init(&cc.Config{ + RootCmd: srvCmd, + Headings: cc.HiBlue + cc.Bold, + Commands: cc.HiBlue + cc.Bold, + CmdShortDescr: cc.HiBlue, + Example: cc.HiBlue + cc.Italic, + ExecName: cc.HiBlue + cc.Bold, + Flags: cc.HiBlue + cc.Bold, + FlagsDescr: cc.HiBlue, + NoExtraNewlines: true, + NoBottomNewline: true, + }) + srvCmd.Execute() +} + +const help = "Usage:\r\n" + + " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + + "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + + "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + + "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + + "Flags:\r\n" + + "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + + "Global Flags:\r\n" + + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" + +var ( + httpC http.Client + dmsgC *dmsg.Client + closeDmsg func() + dmsgDisc string + dmsgSessions int + dmsgAddr []string + dialPK []cipher.PubKey + filterDomainSuffix string + sk cipher.SecKey + pk cipher.PubKey + dmsgWebLog *logging.Logger + logLvl string + webPort []uint + proxyPort uint + addProxy string + resolveDmsgAddr []string + wg sync.WaitGroup + isEnvs bool + dmsgPort uint + dmsgPorts []uint + dmsgSess int + wl []string + wlkeys []cipher.PubKey + localPort uint + err error + rawTCP []bool + RootCmd = srvCmd +) + +func init() { + srvCmd.Flags().UintVarP(&localPort, "lport", "l", 8086, "local application http interface port(s)") + srvCmd.Flags().UintVarP(&dmsgPort, "dport", "d", 8086, "dmsg port(s) to serve") + srvCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", skyenv.DmsgDiscAddr, "dmsg discovery url") + srvCmd.Flags().IntVarP(&dmsgSess, "dsess", "e", 1, "dmsg sessions") + srvCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") + + srvCmd.CompletionOptions.DisableDefaultCmd = true + var helpflag bool + srvCmd.SetUsageTemplate(help) + srvCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgweb") + srvCmd.SetHelpCommand(&cobra.Command{Hidden: true}) + srvCmd.PersistentFlags().MarkHidden("help") //nolint +} + +var srvCmd = &cobra.Command{ + Use: "srv", + Short: "serve raw TCP from local port over dmsg", + Long: `DMSG web server - serve http or raw TCP interface from local port over dmsg`, + Run: func(_ *cobra.Command, _ []string) { + server() + }, +} + +func server() { + log := logging.MustGetLogger("dmsgwebsrv") + + ctx, cancel := cmdutil.SignalContext(context.Background(), log) + + defer cancel() + pk, err = sk.PubKey() + if err != nil { + pk, sk = cipher.GenerateKeyPair() + } + log.Infof("dmsg client pk: %v", pk.String()) + + dmsgC := dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, log), dmsg.DefaultConfig()) + defer func() { + if err := dmsgC.Close(); err != nil { + log.WithError(err).Error() + } + }() + + go dmsgC.Serve(context.Background()) + + select { + case <-ctx.Done(): + log.WithError(ctx.Err()).Warn() + return + + case <-dmsgC.Ready(): + } + + lis, err := dmsgC.Listen(uint16(dmsgPort)) + if err != nil { + log.Fatalf("Error listening on port %d: %v", dmsgPort, err) + } + + go func(l net.Listener, port uint) { + <-ctx.Done() + if err := l.Close(); err != nil { + log.Printf("Error closing listener on port %d: %v", port, err) + log.WithError(err).Error() + } + }(lis, dmsgPort) + + wg := new(sync.WaitGroup) + + wg.Add(1) + go func(localPort uint, lis net.Listener) { + defer wg.Done() + proxyTCPConnections(localPort, lis, log) + }(localPort, lis) + + wg.Wait() +} + +func proxyTCPConnections(localPort uint, lis net.Listener, log *logging.Logger) { + for { + conn, err := lis.Accept() + if err != nil { + log.Printf("Error accepting connection: %v", err) + return + } + + go handleTCPConnection(conn, localPort, log) + } +} + +func handleTCPConnection(dmsgConn net.Conn, localPort uint, log *logging.Logger) { + defer dmsgConn.Close() //nolint + + localConn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", localPort)) + if err != nil { + log.Printf("Error connecting to local port %d: %v", localPort, err) + return + } + defer localConn.Close() //nolint + + copyConn := func(dst net.Conn, src net.Conn) { + _, err := io.Copy(dst, src) + if err != nil { + log.Printf("Error during copy: %v", err) + } + } + + go copyConn(dmsgConn, localConn) + go copyConn(localConn, dmsgConn) +} + +func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions}) + go dmsgC.Serve(context.Background()) + + stop = func() { + err := dmsgC.Close() + dmsgWebLog.WithError(err).Debug("Disconnected from dmsg network.") + fmt.Printf("\n") + } + dmsgWebLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). + Debug("Connecting to dmsg network...") + + select { + case <-ctx.Done(): + stop() + os.Exit(0) + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + dmsgWebLog.Debug("Dmsg network ready.") + return dmsgC, stop, nil + } +} diff --git a/examples/tcp-proxy/tcp-proxy.go b/examples/tcp-proxy/tcp-proxy.go new file mode 100644 index 00000000..96271c1a --- /dev/null +++ b/examples/tcp-proxy/tcp-proxy.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "io" + "log" + "net" + "os" + "strconv" + "sync" +) + +func main() { + if len(os.Args) < 3 { + log.Fatalf("requires two arguments; usage: tcp1 ") + } + sourcePort, err := strconv.Atoi(os.Args[2]) + if err != nil { + log.Fatalf("Failed to parse tcp source port string \"%v\" to int: %v", sourcePort, err) + } + targetPort, err := strconv.Atoi(os.Args[1]) + if err != nil { + log.Fatalf("Failed to parse tcp target port string \"%v\" to int: %v", targetPort, err) + } + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", sourcePort)) + if err != nil { + log.Fatalf("Failed to start TCP listener on port %d: %v", sourcePort, err) + } + defer listener.Close() + log.Printf("TCP proxy started: Listening on port %d and forwarding to port %d", sourcePort, targetPort) + + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Failed to accept connection: %v", err) + continue + } + + go handleConnection(conn, targetPort) + } +} + +func handleConnection(conn net.Conn, targetPort int) { + defer conn.Close() + + targetAddr := fmt.Sprintf("localhost:%d", targetPort) + target, err := net.Dial("tcp", targetAddr) + if err != nil { + log.Printf("Failed to dial target server %s: %v", targetAddr, err) + return + } + defer target.Close() + + var wg sync.WaitGroup + wg.Add(2) + + // Copy from client to target + go func() { + _, err := io.Copy(target, conn) + if err != nil && !isClosedConnErr(err) { + log.Printf("Error copying from client to target: %v", err) + } + target.Close() // Close target side after copy + wg.Done() + }() + + // Copy from target to client + go func() { + _, err := io.Copy(conn, target) + if err != nil && !isClosedConnErr(err) { + log.Printf("Error copying from target to client: %v", err) + } + conn.Close() // Close client side after copy + wg.Done() + }() + + // Wait for both copies to finish + wg.Wait() +} + +// isClosedConnErr checks if the error indicates a closed connection. +func isClosedConnErr(err error) bool { + if err == io.EOF { + return true + } + netErr, ok := err.(net.Error) + return ok && netErr.Timeout() // Check for timeout error indicating closed connection +} diff --git a/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go b/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go new file mode 100644 index 00000000..155829a0 --- /dev/null +++ b/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go @@ -0,0 +1,230 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "os/signal" + "path/filepath" + "strconv" + "strings" + "sync" + "syscall" + + cc "github.com/ivanpirog/coloredcobra" + "github.com/skycoin/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/cmdutil" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/skycoin/skywire-utilities/pkg/skyenv" + "github.com/spf13/cobra" + + "github.com/skycoin/dmsg/pkg/disc" + dmsg "github.com/skycoin/dmsg/pkg/dmsg" +) + +func main() { + cc.Init(&cc.Config{ + RootCmd: RootCmd, + Headings: cc.HiBlue + cc.Bold, + Commands: cc.HiBlue + cc.Bold, + CmdShortDescr: cc.HiBlue, + Example: cc.HiBlue + cc.Italic, + ExecName: cc.HiBlue + cc.Bold, + Flags: cc.HiBlue + cc.Bold, + FlagsDescr: cc.HiBlue, + NoExtraNewlines: true, + NoBottomNewline: true, + }) + RootCmd.Execute() +} + +const help = "Usage:\r\n" + + " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + + "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + + "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + + "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + + "Flags:\r\n" + + "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + + "Global Flags:\r\n" + + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" + +var ( + httpC http.Client + dmsgC *dmsg.Client + closeDmsg func() + dmsgDisc string + dmsgSessions int + dmsgAddr []string + dialPK cipher.PubKey + sk cipher.SecKey + pk cipher.PubKey + dmsgWebLog *logging.Logger + logLvl string + webPort uint + resolveDmsgAddr string + wg sync.WaitGroup + dmsgPort uint + dmsgSess int + err error +) + +func init() { + RootCmd.Flags().UintVarP(&webPort, "port", "p", 8080, "port to serve the web application") + RootCmd.Flags().StringVarP(&resolveDmsgAddr, "resolve", "t", "", "resolve the specified dmsg address:port on the local port & disable proxy") + RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "d", skyenv.DmsgDiscAddr, "dmsg discovery url") + RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") + RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") +} + +// RootCmd contains the root command for dmsgweb +var RootCmd = &cobra.Command{ + Use: func() string { + return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] + }(), + Short: "DMSG reverse tcp proxy", + Long: "DMSG reverse tcp proxy", + SilenceErrors: true, + SilenceUsage: true, + DisableSuggestions: true, + DisableFlagsInUseLine: true, + Version: buildinfo.Version(), + Run: func(cmd *cobra.Command, _ []string) { + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) //nolint + go func() { + <-c + os.Exit(1) + }() + if dmsgWebLog == nil { + dmsgWebLog = logging.MustGetLogger("dmsgweb") + } + if logLvl != "" { + if lvl, err := logging.LevelFromString(logLvl); err == nil { + logging.SetLevel(lvl) + } + } + + if dmsgDisc == "" { + dmsgDisc = skyenv.DmsgDiscAddr + } + ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgWebLog) + defer cancel() + + pk, err := sk.PubKey() + if err != nil { + pk, sk = cipher.GenerateKeyPair() + } + dmsgWebLog.Info("dmsg client pk: ", pk.String()) + + dmsgWebLog.Info("dmsg address to dial: ", resolveDmsgAddr) + dmsgAddr = strings.Split(resolveDmsgAddr, ":") + var setpk cipher.PubKey + err = setpk.Set(dmsgAddr[0]) + if err != nil { + log.Fatalf("failed to parse dmsg
: : %v", err) + } + dialPK = setpk + if len(dmsgAddr) > 1 { + dport, err := strconv.ParseUint(dmsgAddr[1], 10, 64) + if err != nil { + log.Fatalf("Failed to parse dmsg port: %v", err) + } + dmsgPort = uint(dport) + } else { + dmsgPort = uint(80) + } + + dmsgC, closeDmsg, err = startDmsg(ctx, pk, sk) + if err != nil { + dmsgWebLog.WithError(err).Fatal("failed to start dmsg") + } + defer closeDmsg() + + go func() { + <-ctx.Done() + cancel() + closeDmsg() + os.Exit(0) + }() + + proxyTCPConn() + wg.Wait() + }, +} + +func proxyTCPConn() { + listener, err := net.Listen("tcp", fmt.Sprintf(":%v", webPort)) + if err != nil { + dmsgWebLog.Fatalf("Failed to start TCP listener on port %v: %v", webPort, err) + } + defer listener.Close() //nolint + log.Printf("Serving TCP on 127.0.0.1:%v", webPort) + + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Failed to accept connection: %v", err) + continue + } + + wg.Add(1) + go func(conn net.Conn) { + defer wg.Done() + defer conn.Close() //nolint + + dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK, Port: uint16(dmsgPort)}) + if err != nil { + log.Printf("Failed to dial dmsg address %v:%v %v", dialPK.String(), dmsgPort, err) + return + } + defer dmsgConn.Close() //nolint + + go func() { + _, err := io.Copy(dmsgConn, conn) + if err != nil { + log.Printf("Error copying data to dmsg client: %v", err) + } + dmsgConn.Close() //nolint + }() + + go func() { + _, err := io.Copy(conn, dmsgConn) + if err != nil { + log.Printf("Error copying data from dmsg client: %v", err) + } + conn.Close() //nolint + }() + }(conn) + } +} + +func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions}) + go dmsgC.Serve(context.Background()) + + stop = func() { + err := dmsgC.Close() + dmsgWebLog.WithError(err).Debug("Disconnected from dmsg network.") + fmt.Printf("\n") + } + dmsgWebLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). + Debug("Connecting to dmsg network...") + + select { + case <-ctx.Done(): + stop() + os.Exit(0) + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + dmsgWebLog.Debug("Dmsg network ready.") + return dmsgC, stop, nil + } +} diff --git a/examples/tcp/tcp.go b/examples/tcp/tcp.go new file mode 100644 index 00000000..03f4acb6 --- /dev/null +++ b/examples/tcp/tcp.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + "net" + "os" +) + +func main() { + // Start a TCP server listening on port 8000 + listener, err := net.Listen("tcp", os.Args[1]) //":8000") + if err != nil { + fmt.Printf("Failed to start server: %v\n", err) + return + } + defer listener.Close() + fmt.Println("TCP server started on port " + os.Args[1]) + + // Accept and handle incoming connections + for { + conn, err := listener.Accept() + if err != nil { + fmt.Printf("Failed to accept connection: %v\n", err) + continue + } + go handleConnection(conn) + } +} + +func handleConnection(conn net.Conn) { + defer conn.Close() + + // Send a greeting message to the client + message := "Hello, World!\n" + _, err := conn.Write([]byte(message)) + if err != nil { + fmt.Printf("Error writing response: %v\n", err) + return + } +}