diff --git a/.vscode/targets.log b/.vscode/targets.log index edac7bc..aaa54c6 100644 --- a/.vscode/targets.log +++ b/.vscode/targets.log @@ -6,7 +6,7 @@ make all --print-data-base --no-builtin-variables --no-builtin-rules --question # This is free software: you are free to change and redistribute it. # There is NO WARRANTY, to the extent permitted by law. -# Make data base, printed on Sat Feb 4 16:13:37 2023 +# Make data base, printed on Sat Aug 12 08:13:38 2023 # Variables @@ -31,6 +31,8 @@ LC_NAME = es_NI.UTF-8 # environment LC_NUMERIC = es_NI.UTF-8 # environment +VSCODE_CRASH_REPORTER_PROCESS_TYPE = extensionHost +# environment VSCODE_CWD = /home/gmena # environment WINDOWPATH = 2 @@ -47,7 +49,6 @@ GOPATH = /home/gmena/go # environment VSCODE_HANDLES_UNCAUGHT_ERRORS = true # makefile (from 'Makefile', line 13) - BINARY = main # default .VARIABLES := @@ -62,7 +63,7 @@ XDG_DATA_DIRS = /usr/share/ubuntu:/usr/share/gnome:/home/gmena/.local/share/flat # automatic %F = $(notdir $%) # environment -VSCODE_CODE_CACHE_PATH = /home/gmena/.config/Code/CachedData/97dec172d3256f8ca4bfb2143f3f76b503ca0534 +VSCODE_CODE_CACHE_PATH = /home/gmena/.config/Code/CachedData/6c3e3dba23e8fadc360aed75ce363ba185c49794 # environment LANG = C # environment @@ -99,11 +100,10 @@ XDG_CONFIG_DIRS = /etc/xdg/xdg-ubuntu:/etc/xdg XDG_SESSION_DESKTOP = ubuntu # makefile (from 'Makefile', line 1) MAKEFILE_LIST := Makefile - # automatic @F = $(notdir $@) # environment -VSCODE_PID = 5099 +VSCODE_PID = 9684 # environment XDG_SESSION_TYPE = x11 # automatic @@ -111,7 +111,7 @@ XDG_SESSION_TYPE = x11 # makefile (from 'Makefile', line 8) PACKAGE = p2p-noise # environment -SESSION_MANAGER = local/gmena-ThinkPad-P70:@/tmp/.ICE-unix/2791,unix/gmena-ThinkPad-P70:/tmp/.ICE-unix/2791 +SESSION_MANAGER = local/gmena-ThinkPad-P70:@/tmp/.ICE-unix/3633,unix/gmena-ThinkPad-P70:/tmp/.ICE-unix/3633 # automatic *F = $(notdir $*) # environment @@ -123,7 +123,7 @@ DBUS_SESSION_BUS_ADDRESS = unix:path=/run/user/1000/bus # automatic a.old # make benchmark > b.new # benchcmp a.old b.new +.PHONY: benchmark ## run benchmark tests benchmark: - @go test ./... -bench=. -benchtime 100000x -count 5 + @perflock -governor=80% go test -run=^Benchmarck$ -benchtime 1s -bench=. -count=1 @echo "[OK] benchmark finished" @@ -54,35 +56,43 @@ benchmark: # For fancy visualization: # Could use Graphviz (https://graphviz.org/download/) # eg. go tool pprof -web bin/main-linux-amd64 cpu.prof - +.PHONY: profiling ## run profiling tests profiling: - @go test -bench=. -benchtime 100000x -run=^$ -cpuprofile=cpu.prof -memprofile=prof.mem + @perflock -governor=80% go test -run=^Benchmarck$ -benchmem -benchtime 1s -bench=. -cpu 1,2,4,8 -count=1 -memprofile mem.prof -cpuprofile cpu.prof @echo "[OK] profiling finished" +.PHONY: coverage ## run tests coverage coverage: @go test -v ./... -race -covermode=atomic -coverprofile coverage ./... @echo "[OK] coverage finished" - + +.PHONY: coverage-export ## run tests coverage export coverage-export: coverage @go tool cover -html=coverage @echo "[OK] code test coverage finished" # Allow to preview documentation. # Please verify your GOPATH before run this command +.PHONY: preview-doc ## run local documentation server preview-doc: @godoc -http=localhost:6060 -links=true +.PHONY: build ## compiles the command into and executable build: @go build -v ./... -code-fmt: +.PHONY: format ## automatically formats Go source cod +format: @go fmt ./... + @goimports -w . @echo "[OK] code format finished" -code-check: +.PHONY: check ## examines Go source code and reports suspicious constructs +check: @go vet -v ./... @echo "[OK] code check finished" +.PHONY: clean ## removes generated files and clean go cache clean: @go clean --cache ./... @rm -f mem.prof @@ -90,6 +100,7 @@ clean: @rm -rf bin @echo "[OK] cleaned" +.PHONY: compile-win ## compiles window exec compile-win: @GOOS=windows GOARCH=amd64 go build -o bin/${WIN_64} ${INPUT} @GOOS=windows GOARCH=386 go build -o bin/${WIN_32} ${INPUT} @@ -97,28 +108,39 @@ compile-win: #Go1.15 deprecates 32-bit macOS builds # go build -x to show compilation details #GOOS=darwin GOARCH=386 go build -o bin/main-mac-386 main.go +.PHONY: compile-mac ## compiles mac exec compile-mac: @GOOS=darwin GOARCH=amd64 go build -o bin/${OSX_64} ${INPUT} +.PHONY: compile-linux ## compiles linux exec compile-linux: @GOOS=linux GOARCH=amd64 go build -o bin/${LINUX_64} ${INPUT} @GOOS=linux GOARCH=386 go build -o bin/${LINUX_32} ${INPUT} +.PHONY: compile ## compiles all os exec compile: compile-linux compile-win compile-mac @echo "[OK] Compiling for every OS and Platform" -build-gc: - @go build -gcflags='-m -m' $(filter-out $@,$(MAKECMDGOALS)) - +.PHONY: run ## compiles and runs the named main Go package run: @go run ${INPUT} $(filter-out $@,$(MAKECMDGOALS)) +.PHONY: update-pkg-cache ## updated the package cache version update-pkg-cache: GOPROXY=https://proxy.golang.org GO111MODULE=on \ go get github.com/${USER}/${PACKAGE}@v${VERSION} +# https://go.dev/ref/mod#go-mod-vendor +.PHONY: vendorize ## lock dependencies vendorize: @go mod vendor @echo "[OK]" -all: build test check-test-coverage code-check compile \ No newline at end of file +all: build test check-test-coverage code-check compile + +.PHONY: help ## display this message +help: + @grep -E \ + '^.PHONY: .*?## .*$$' $(MAKEFILE_LIST) | \ + sort | \ + awk 'BEGIN {FS = ".PHONY: |## "}; {printf "\033[36m%-19s\033[0m %s\n", $$2, $$3}' \ No newline at end of file diff --git a/README.md b/README.md index 7e1aea2..98929fd 100644 --- a/README.md +++ b/README.md @@ -80,21 +80,21 @@ func main() { Using [perflock](https://github.com/aclements/perflock) to prevent our benchmarks from using too much CPU at once. ```text -perflock -governor=80% go test -benchmem -run=^$ -benchtime 1s -bench=BenchmarkHandshakeProfile -cpu 1,2,4,8 -count 2 -memprofile mem.prof -cpuprofile cpu.prof +perflock -governor=80% go test -benchmem -run=^$ -benchtime 1s -bench=. -cpu 1,2,4,8 -count=1 goos: linux goarch: amd64 pkg: github.com/geolffreym/p2p-noise cpu: Intel(R) Xeon(R) CPU E3-1505M v5 @ 2.80GHz -BenchmarkHandshakeProfile 962 1061028 ns/op 34391 B/op 267 allocs/op -BenchmarkHandshakeProfile 1322 919647 ns/op 34383 B/op 267 allocs/op -BenchmarkHandshakeProfile-2 1294 834506 ns/op 37095 B/op 285 allocs/op -BenchmarkHandshakeProfile-2 1398 858845 ns/op 36872 B/op 284 allocs/op -BenchmarkHandshakeProfile-4 1395 875618 ns/op 41912 B/op 323 allocs/op -BenchmarkHandshakeProfile-4 1341 914046 ns/op 41858 B/op 323 allocs/op -BenchmarkHandshakeProfile-8 1276 879535 ns/op 42055 B/op 324 allocs/op -BenchmarkHandshakeProfile-8 1279 929125 ns/op 41812 B/op 323 allocs/op +BenchmarkHandshakeProfile 726 1575256 ns/op 46959 B/op 363 allocs/op +BenchmarkHandshakeProfile-2 1548 1037351 ns/op 47100 B/op 364 allocs/op +BenchmarkHandshakeProfile-4 2460 908573 ns/op 49885 B/op 383 allocs/op +BenchmarkHandshakeProfile-8 2127 736442 ns/op 60454 B/op 457 allocs/op +BenchmarkNodesSecureMessageExchange 29032570 35.03 ns/op 0 B/op 0 allocs/op +BenchmarkNodesSecureMessageExchange-2 59745247 16.78 ns/op 0 B/op 0 allocs/op +BenchmarkNodesSecureMessageExchange-4 124446950 9.454 ns/op 0 B/op 0 allocs/op +BenchmarkNodesSecureMessageExchange-8 151214516 7.088 ns/op 0 B/op 0 allocs/op PASS -ok github.com/geolffreym/p2p-noise 13.201s +ok github.com/geolffreym/p2p-noise 18.865s ``` @@ -107,12 +107,12 @@ Some available capabilities for dev support: * **Test Coverage**: `make coverage` * **Benchmark**: `make benchmark` * **Profiling**: `make profiling` -* **Code check**: `make code-check` -* **Code format**: `make code-fmt` +* **Code check**: `make check` +* **Code format**: `make format` * **Flush cache**: `make clean` * **Build**: `make build` -Note: Please check [Makefile](https://github.com/geolffreym/p2p-noise/Makefile) for more capabilities. +Note: Run `make help` to check for more capabilities. ## More info diff --git a/config/config.go b/config/config.go index 1e452c7..e5ee816 100644 --- a/config/config.go +++ b/config/config.go @@ -1,5 +1,7 @@ // Package conf provide a "functional option" design pattern to handle node settings. -// See also: https://github.com/crazybber/awesome-patterns/blob/master/idiom/functional-options.md +// See more about [Functional Options]. +// +// [Functional Options]: https://github.com/crazybber/awesome-patterns/blob/master/idiom/functional-options.md package config import "time" diff --git a/handshake.go b/handshake.go index 8c6a734..9db848e 100644 --- a/handshake.go +++ b/handshake.go @@ -203,9 +203,9 @@ func newHandshake(conn net.Conn, initiator bool) (*handshake, error) { // Setup the max of size possible for tokens exchanged between peers. edKeyLen := ed25519.PublicKeySize // 32 bytes dhKeyLen := 2 * noise.DH25519.DHLen() // 64 bytes - cypherLen := 2 * chacha20poly1305.Overhead // 32 bytes + cipherLen := 2 * chacha20poly1305.Overhead // 32 bytes // Sum the needed memory size for pool - size := dhKeyLen + edKeyLen + cypherLen + headerSize + size := dhKeyLen + edKeyLen + cipherLen + headerSize pool := bpool.NewBytePool(bPools, size) // N bytes pool // Create a new session handler diff --git a/id.go b/id.go index 941be6a..1c76237 100644 --- a/id.go +++ b/id.go @@ -5,7 +5,8 @@ import ( "unsafe" ) -// [ID] it's identity provider for peer. +// [ID] serves as the identity for peers. +// It facilitates addressability in router table. type ID [32]byte // Bytes return a byte slice representation for id. @@ -14,14 +15,17 @@ func (i ID) Bytes() []byte { } // String return a string representation for 32-bytes hash. +// ref: https://go.dev/ref/spec#Conversions func (i ID) String() string { - return (string)(i[:]) + return string(i[:]) } // newIDFromString creates a new ID from string. // ref: https://stackoverflow.com/questions/59209493/how-to-use-unsafe-get-a-byte-slice-from-a-string-without-memory-copy +// ref: https://go.dev/ref/spec#Conversions func newIDFromString(s string) ID { // "no-copy" convert to ID from string. + // If the type starts with the operator * or <-, it must be parenthesized when necessary to avoid ambiguity. return *(*ID)(unsafe.Pointer( (*reflect.StringHeader)(unsafe.Pointer(&s)).Data), ) @@ -30,9 +34,7 @@ func newIDFromString(s string) ID { // newBlake2ID creates a new id blake2 hash based. func newBlake2ID(plaintext []byte) ID { var id ID - // Hash hash := blake2(plaintext) - // Populate id copy(id[:], hash) return id } diff --git a/node.go b/node.go index 5afdad7..220206e 100644 --- a/node.go +++ b/node.go @@ -1,7 +1,5 @@ -//Copyright (c) 2022, Geolffrey Mena - -// P2P Noise Library. -// Please read more about [Noise Protocol]. +// Package noise implements the Noise Protocol for peer-to-peer communication. +// For more information about the Noise Protocol, please visit: [Noise Protocol]. // // [Noise Protocol]: http://www.noiseprotocol.org/noise.html package noise @@ -16,7 +14,7 @@ import ( "github.com/oxtoacart/bpool" ) -// futureDeadline calculate a new time for deadline since now. +// futureDeadline calculate and return a new time for deadline since now. func futureDeadLine(deadline time.Duration) time.Time { if deadline == 0 { // deadline 0 = no deadline @@ -47,6 +45,8 @@ type Config interface { KeepAlive() time.Duration } +// Node represents a network node capable of handling connections, +// routing messages, and managing configurations. type Node struct { // Bound local network listener. listener net.Listener @@ -76,11 +76,14 @@ func New(config Config) *Node { } } -// Signals proxy channels to subscriber. -// The listening routine should be stopped using returned cancel func. +// Signals initiates the signaling process to proxy channels to subscribers. +// It returns a channel of type Signal to intercept events and a cancel function to stop the listening routine. +// The channel is closed during the cancellation of listening. func (n *Node) Signals() (<-chan Signal, context.CancelFunc) { ctx, cancel := context.WithCancel(context.Background()) + // this channel is closed during listening cancellation ch := make(chan Signal) + // forward signals to internal events signaling go n.events.Listen(ctx, ch) return ch, cancel // read only channel for raw messages } @@ -95,14 +98,16 @@ func (n *Node) Disconnect() { } } -// Send emit a new message using peer id. -// If peer id doesn't exists or peer is not connected return error. -// Calling Send extends write deadline. +// Send emits a new message using a peer ID. +// It returns the total bytes sent if there is no error; otherwise, it returns 0. +// If the peer ID doesn't exist or the peer is not connected, it returns an error. +// Calling Send extends the write deadline. func (n *Node) Send(rawID string, message []byte) (uint32, error) { id := newIDFromString(rawID) // Check if id exists in connected peers - peer := n.router.Query(id) - if peer == nil { + // check in-band error + peer, ok := n.router.Query(id) + if !ok { err := fmt.Errorf("remote peer disconnected: %s", id.String()) return 0, errSendingMessage(err) } @@ -115,9 +120,9 @@ func (n *Node) Send(rawID string, message []byte) (uint32, error) { return bytes, err } -// watch keep running waiting for incoming messages. -// After every new message the connection is verified, if local connection is closed or remote peer is disconnected the routine is stopped. -// Incoming message monitor is suggested to be processed in go routines. +// watch keeps running, waiting for incoming messages. +// After receiving each new message, the connection is verified. If the local connection is closed or the remote peer is disconnected, the routine stops. +// It is suggested to process incoming messages in separate goroutines. func (n *Node) watch(peer *peer) { KEEPALIVE: @@ -153,7 +158,9 @@ KEEPALIVE: } -// setupTCPConnection configure TCP connection behavior. +// setupTCPConnection configures the behavior of a TCP connection. +// It takes a net.TCPConn connection and modifies its settings according to the Node configuration. +// If any of the configurations cannot be fulfilled, it returns an error. func (n *Node) setupTCPConnection(conn *net.TCPConn) error { // If tcp enforce keep alive connection. // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection. @@ -171,11 +178,10 @@ func (n *Node) setupTCPConnection(conn *net.TCPConn) error { return nil } -// handshake starts a new handshake for incoming or dialed connection. -// After handshake completes a new session is created and a new peer is created to be added to router. -// Marshaling data to/from on the network path as a "chain of responsibility". -// If TCP protocol is used connection is enforced to keep alive. -// Return err if max peers connected exceed MaxPeerConnected otherwise return nil. +// handshake initiates a new handshake for an incoming or dialed connection. +// After the handshake completes, a new session is created, and a new peer is added to the router. +// If the TCP protocol is used, the connection is enforced to keep alive. +// Returns an error if the maximum number of connected peers exceeds MaxPeersConnected; otherwise, returns nil. func (n *Node) handshake(conn net.Conn, initialize bool) error { // Assertion for tcp connection to keep alive @@ -223,8 +229,8 @@ func (n *Node) handshake(conn net.Conn, initialize bool) error { return nil } -// routing initialize route in routing table from session. -// Return the recent added peer. +// routing initializes a route in the routing table from a session. +// It returns the recently added peer. func (n *Node) routing(conn *session) *peer { // Initial deadline for connection. // A deadline is an absolute time after which I/O operations @@ -295,17 +301,21 @@ func (n *Node) Close() error { // close peer connections go n.Disconnect() - // stop listener + + // stop listener for listening node only + if n.listener == nil { + return nil + } + if err := n.listener.Close(); err != nil { return err } - // runtime.GC() return nil } -// Dial attempt to connect to remote node and add connected peer to routing table. -// Return error if error occurred while dialing node. +// Dial attempts to connect to a remote node and adds the connected peer to the routing table. +// It returns an error if an error occurred while dialing the node. func (n *Node) Dial(addr string) error { protocol := n.config.Protocol() // eg. tcp timeout := n.config.DialTimeout() // max time waiting for dial. diff --git a/node_test.go b/node_test.go index 3ba01a5..54084f1 100644 --- a/node_test.go +++ b/node_test.go @@ -4,76 +4,45 @@ import ( "bufio" "bytes" "crypto/rand" - "fmt" "io/ioutil" "log" "os" - "sync" "testing" "time" "github.com/geolffreym/p2p-noise/config" ) -// TODO reutilizar los encoders/decoders -// TODO test performance with big file sharing -// TODO session test -// TODO handshake test - +// TODO test exchange big messages // phase 1: metrics for adaptive lookup // phase 2: compression using brotli vs gzip // phase 2 discovery module -func traceMessageBetweenTwoPeers(nodeA *Node, nodeB *Node, expected string) bool { +func traceMessageBetweenTwoPeers(nodeB *Node, expected string) bool { ready := make(chan bool) - // Lets send a message from A to B and see - // if we receive the expected decrypted message - go func(node *Node) { - // Node A events channel - signalsA, _ := node.Signals() - for signalA := range signalsA { - switch signalA.Type() { - case SelfListening: - ready <- true - case NewPeerDetected: - // send a message to node b after handshake ready - id := signalA.Payload() // here we receive the remote peer id - // Send a message to nodeB. - // Underneath the message is encrypted and signed with local Private Key before send. - nodeA.Send(id, []byte(expected)) - case PeerDisconnected: - break - } - } - }(nodeA) - - <-ready - - // wait until handshake is done - if err := nodeB.Dial(nodeA.LocalAddr().String()); err != nil { - fmt.Print(err) - return false - } // Node B events channel - signalsB, _ := nodeB.Signals() + signalsB, cancel := nodeB.Signals() for signalB := range signalsB { if signalB.Type() == MessageReceived { // When a new message is received: // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. got := signalB.Payload() + cancel() // stop the signaling return got == expected } } close(ready) - return true + // by default expected not message received as expected + return false } func matchExpectedLogs(expectedBehavior []string, t *testing.T, f func()) { // store logs in buffer while the function run. out := new(bytes.Buffer) log.SetFlags(0) + // store log output in buffer log.SetOutput(out) f() // Exec code to get log snapshot @@ -104,26 +73,23 @@ start: } -func whenReadyForIncomingDial(nodes []*Node) *sync.WaitGroup { +func whenReadyForIncomingDial(node *Node) <-chan bool { // Wait until all the nodes are ready for incoming connections. - var wg sync.WaitGroup - - for _, node := range nodes { - wg.Add(1) - go node.Listen() - // Populate wait group - go func(n *Node) { - signals, _ := n.Signals() - for signal := range signals { - if signal.Type() == SelfListening { - wg.Done() - return - } + ready := make(chan bool) + + go node.Listen() + // Populate wait group + go func(n *Node) { + signals, _ := n.Signals() + for signal := range signals { + if signal.Type() == SelfListening { + ready <- true + return } - }(node) - } + } + }(node) - return &wg + return ready } func TestWithZeroFutureDeadline(t *testing.T) { @@ -135,35 +101,6 @@ func TestWithZeroFutureDeadline(t *testing.T) { } -func BenchmarkNodesSecureMessageExchange(b *testing.B) { - // Discard logs to avoid extra allocations. - log.SetOutput(ioutil.Discard) - - expected := "Hello node B" - configurationA := config.New() - configurationB := config.New() - - b.ReportAllocs() - for n := 0; n < b.N; n++ { - nodeA := New(configurationA) - nodeB := New(configurationB) - - go nodeB.Listen() - go nodeA.Listen() - - b.StopTimer() - validMessageReceived := traceMessageBetweenTwoPeers(nodeA, nodeB, expected) - if !validMessageReceived { - b.Errorf("Expected incoming message equal to %s", expected) - } - - b.StartTimer() - nodeA.Close() - nodeB.Close() - } - -} - func TestTwoNodesHandshakeTrace(t *testing.T) { expectedBehavior := []string{ @@ -187,13 +124,17 @@ func TestTwoNodesHandshakeTrace(t *testing.T) { nodeA := New(configurationA) nodeB := New(configurationB) + go nodeA.Listen() + // then just close nodes + defer nodeA.Close() + defer nodeB.Close() + // wait until node is listening to start dialing - whenReadyForIncomingDial([]*Node{nodeA, nodeB}).Wait() + <-whenReadyForIncomingDial(nodeA) + // Just dial to start handshake and close. nodeB.Dial(nodeASocket) // wait until handshake is done - // then just close nodes - nodeA.Close() - nodeB.Close() + }) } @@ -202,29 +143,61 @@ func TestPoolBufferSizeForMessageExchange(t *testing.T) { log.SetOutput(ioutil.Discard) configurationA := config.New() configurationB := config.New() - // Growing byte size dynamically - for x := 0; x < 10; x++ { - byteSize := 1 << x - b := make([]byte, byteSize) - rand.Read(b) - expected := string(b) - configurationA.Write(config.SetPoolBufferSize(byteSize)) - configurationB.Write(config.SetPoolBufferSize(byteSize)) + byteSize := 1 << 4 + ready := make(chan bool) + b := make([]byte, byteSize) - nodeA := New(configurationA) - nodeB := New(configurationB) + rand.Read(b) // fill buffer with pseudorandom numbers + expected := string(b) + configurationA.Write(config.SetPoolBufferSize(byteSize)) + configurationB.Write(config.SetPoolBufferSize(byteSize)) - go nodeB.Listen() - go nodeA.Listen() + nodeA := New(configurationA) + nodeB := New(configurationB) + + go nodeB.Listen() + go nodeA.Listen() + defer nodeA.Close() + defer nodeB.Close() - validMessage := traceMessageBetweenTwoPeers(nodeA, nodeB, expected) - if !validMessage { - t.Errorf("expected valid message equal to %s", expected) + // Lets send a message from A to B and see + // if we receive the expected decrypted message + go func(node *Node) { + // Node A events channel + signalsA, _ := node.Signals() + for signalA := range signalsA { + switch signalA.Type() { + case SelfListening: + ready <- true + case NewPeerDetected: + // send a message to node b after handshake ready + id := signalA.Payload() // here we receive the remote peer id + // Start interaction with remote peer + // Underneath the message is encrypted and signed with local Private Key before send. + nodeA.Send(id, []byte(expected)) + } } + }(nodeA) + + <-ready - nodeA.Close() - nodeB.Close() + // Node B events channel + nodeB.Dial(nodeA.LocalAddr().String()) + signalsB, cancel := nodeB.Signals() + + for signalB := range signalsB { + if signalB.Type() == MessageReceived { + // When a new message is received: + // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. + got := signalB.Payload() + cancel() // stop the signaling + + if got != expected { + t.Errorf("expected valid message equal to %s", expected) + } + + } } } @@ -251,18 +224,11 @@ func TestSomeNodesHandshake(t *testing.T) { nodeC := New(configurationC) nodeD := New(configurationD) - var nodes = []*Node{ - nodeA, - nodeB, - nodeC, - nodeD, - } - // When all peers are listening then start dialing between them. - whenReadyForIncomingDial(nodes).Wait() + <-whenReadyForIncomingDial(nodeA) + nodeB.Dial(nodeASocket) nodeC.Dial(nodeASocket) - nodeC.Dial(nodeBSocket) nodeD.Dial(nodeBSocket) // Network events channel @@ -281,51 +247,121 @@ func TestSomeNodesHandshake(t *testing.T) { } -// go test -benchmem -run=^$ -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out -bench=BenchmarkHandshakeProfile -// go tool pprof {file} -func BenchmarkHandshakeProfile(b *testing.B) { +func BenchmarkHandshake(b *testing.B) { // Discard logs to avoid extra allocations. log.SetOutput(ioutil.Discard) + configurationA := config.New() + configurationA.Write( + config.SetPoolBufferSize(1 << 2), + ) + + nodeA := New(configurationA) + go nodeA.Listen() + defer nodeA.Close() + + <-whenReadyForIncomingDial(nodeA) + b.ResetTimer() b.ReportAllocs() - for n := 0; n < b.N; n++ { - b.StopTimer() - - var peers []*Node - var peersNumber int = 1 + b.RunParallel(func(pb *testing.PB) { - nodeAddress := "127.0.0.1:9095" - configurationA := config.New() - configurationA.Write( - config.SetSelfListeningAddress(nodeAddress), - config.SetPoolBufferSize(1<<2), - ) + b.StopTimer() + for pb.Next() { - nodeA := New(configurationA) - for i := 0; i < peersNumber; i++ { - address := "127.0.0.1:" + b.StopTimer() configuration := config.New() configuration.Write( - config.SetSelfListeningAddress(address), - config.SetPoolBufferSize(1<<2), + config.SetPoolBufferSize(1 << 2), ) + node := New(configuration) - peers = append(peers, node) + + // Start timer to measure the handshake process. + // Handshake start when two nodes are connected and isn't happening before dial. + // Avoid to add prev initialization. + b.StartTimer() + node.Dial(nodeA.LocalAddr().String()) + node.Close() + + } + + }) + +} + +func BenchmarkNodesSecureMessageExchange(b *testing.B) { + // Discard logs to avoid extra allocations. + log.SetOutput(ioutil.Discard) + + ready := make(chan bool) + configurationA := config.New() + configurationB := config.New() + + nodeA := New(configurationA) + go nodeA.Listen() + defer nodeA.Close() + + // Lets send a message from A to B and see + // if we receive the expected decrypted message + go func(node *Node) { + // Node A events channel + signalsA, _ := node.Signals() + for signalA := range signalsA { + switch signalA.Type() { + case SelfListening: + ready <- true + case MessageReceived: + // When a new message is received: + // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. + signalA.Reply([]byte("pong")) + } } + }(nodeA) + + // wait until node a gets ready + <-ready + + b.ResetTimer() + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + + nodeB := New(configurationB) + defer nodeB.Close() + + // wait until handshake is done + nodeB.Dial(nodeA.LocalAddr().String()) + signalsB, cancel := nodeB.Signals() - whenReadyForIncomingDial(append(peers, nodeA)).Wait() - // Start timer to measure the handshake process. - // Handshake start when two nodes are connected and isn't happening before dial. - // Avoid to add prev initialization. b.StartTimer() - for _, peer := range peers { - peer.Dial(nodeAddress) - peer.Close() + + for pb.Next() { + + // we need to measure message exchange only so we start time here + // sign + encryption + marshall + transmission + // Node B events channel + + for signalB := range signalsB { + switch signalB.Type() { + case NewPeerDetected: + // send a message to node b after handshake ready + id := signalB.Payload() // here we receive the remote peer id + // Start interaction with remote peer + // Underneath the message is encrypted and signed with local Private Key before send. + nodeB.Send(id, []byte("ping")) + case MessageReceived: + // When a new message is received: + // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. + if signalB.Payload() == "pong" { + cancel() + } + } + } + } - nodeA.Close() - } + }) } diff --git a/peer.go b/peer.go index 12fa03d..575d7f0 100644 --- a/peer.go +++ b/peer.go @@ -18,10 +18,6 @@ type packet struct { Msg []byte // 24 byte Digest } -// TODO marshall using embed encoded to reduce overhead? -// TODO Usar pipeline -> compress, cypher and sign? -// https://go.dev/blog/pipelines - // TODO Establecer de manera dinámica el send buffer y receiver buffer en el peer y no en el nodo, de modo que se pued la establecerlo usando las métricas // https://community.f5.com/t5/technical-articles/the-tcp-send-buffer-in-depth/ta-p/290760 @@ -44,8 +40,7 @@ func unmarshall(b []byte) packet { return p } -// peer its the trusty remote peer. -// Provide needed methods to interact with the secured session. +// peer represents a trusty remote peer, providing necessary methods to interact with the secured session. type peer struct { // Optimizing space with ordered types. // the attributes orders matters. @@ -80,7 +75,7 @@ func (p *peer) Close() error { return p.s.Close() } -// Close its a forward method for internal `SetDeadline` method in session. +// SetDeadline forward method for internal `SetDeadline` method in session. func (p *peer) SetDeadline(t time.Time) error { return p.s.SetDeadline(t) } @@ -100,19 +95,19 @@ func (p *peer) Send(msg []byte) (uint32, error) { // Encrypt packet with message and signature inside. // we need to re-slice the buffer to avoid overflow slice in internal append. - digest, err := p.s.Encrypt(buffer[:0], packed.Bytes()) + ciphertext, err := p.s.Encrypt(buffer[:0], packed.Bytes()) if err != nil { return 0, err } // 4 bytes for message size. - err = binary.Write(p.s, binary.BigEndian, uint32(len(digest))) + err = binary.Write(p.s, binary.BigEndian, uint32(len(ciphertext))) if err != nil { return 0, err } // stream encrypted packet - bytes, err := p.s.Write(digest) + bytes, err := p.s.Write(ciphertext) if err != nil { return 0, err } @@ -146,9 +141,9 @@ func (p *peer) Listen() ([]byte, error) { if err == nil { // decrypt incoming messages - digest := buffer[:size] + ciphertext := buffer[:size] // Reuse the buffer[:0] = reset slice from byte pool. - raw, err := p.s.Decrypt(buffer[:0], digest) + raw, err := p.s.Decrypt(buffer[:0], ciphertext) if err != nil { return nil, err } diff --git a/router.go b/router.go index aef2216..4ed2e85 100644 --- a/router.go +++ b/router.go @@ -5,11 +5,13 @@ import ( "sync/atomic" ) -// router keep a hash table to associate ID with peer. -// It implements a unstructured mesh topology. +// router keeps a hash table to associate IDs with peers. +// It implements an unstructured mesh topology. +// Unstructured P2P topologies do not attempt to organize all peers into a single, structured topology. +// Rather, each peer attempts to keep a "sensible" set of other peers in its routing table. type router struct { - sync.Map - counter uint32 + sync.Map // embed map + counter uint32 } func newRouter() *router { @@ -18,8 +20,10 @@ func newRouter() *router { // Table return fan out channel with routed peers. func (r *router) Table() <-chan *peer { + // buffered channel ch := make(chan *peer, r.Len()) // ref: https://pkg.go.dev/sync#Map.Range + // generate valid peers from table r.Range(func(_, value any) bool { if p, ok := value.(*peer); ok { ch <- p @@ -34,16 +38,18 @@ func (r *router) Table() <-chan *peer { } // Query return connection interface based on socket parameter. -func (r *router) Query(id ID) *peer { - // exist socket related peer? +// In-band error returned. This return value may be an error, or a boolean when no explanation is needed. +// refer: https://go.dev/wiki/CodeReviewComments +func (r *router) Query(id ID) (*peer, bool) { + p, exists := r.Load(id) peer, ok := p.(*peer) if !exists || !ok { - return nil + return nil, false } - return peer + return peer, true } // Add forward method to internal sync.Map store for peer. diff --git a/router_test.go b/router_test.go index bf2b328..d60aec1 100644 --- a/router_test.go +++ b/router_test.go @@ -19,7 +19,7 @@ func TestAdd(t *testing.T) { for _, e := range expected { t.Run(fmt.Sprintf("%x", e), func(t *testing.T) { // Match recently added peer - if p := router.Query(e); p == nil { + if _, ok := router.Query(e); !ok { t.Errorf("expected routed peer id %x", e.String()) } }) @@ -39,7 +39,7 @@ func TestQuery(t *testing.T) { for _, e := range expected { t.Run(fmt.Sprintf("%x", e), func(t *testing.T) { // Return the socket related peer - if peer := router.Query(e); peer == nil { + if peer, ok := router.Query(e); !ok { t.Errorf("expected peer for valid socket %#v, got %v", e.String(), peer) } }) @@ -51,7 +51,7 @@ func TestQuery(t *testing.T) { func TestInvalidQuery(t *testing.T) { router := newRouter() id := mockID(PeerBPb) - if peer := router.Query(id); peer != nil { + if peer, ok := router.Query(id); ok { t.Errorf("expected nil for invalid socket %#v, got %v", PeerBPb, peer) } @@ -79,15 +79,21 @@ func TestTable(t *testing.T) { router.Add(peerA) router.Add(peerB) - var counter int = 0 +LOOP: for peer := range router.Table() { got := peer.ID().String() - e := expected[counter] - if got != e { - t.Errorf("expected corresponding table peer entry %x, got %x", e, got) + + for _, expect := range expected { + if expect == got { + // move to loop and start again with next + // this approach is equivalent to run a needle in a haystack + // and avoid the error if match found forwarding the iteration to the main loop + continue LOOP + } } - counter++ + + t.Errorf("expected corresponding table matching entry %x", got) } } @@ -106,11 +112,11 @@ func TestDelete(t *testing.T) { router.Remove(peerB) router.Remove(peerE) - if router.Query(peerB.ID()) != nil { + if _, ok := router.Query(peerB.ID()); ok { t.Errorf("expected %v not registered in router after delete", peerB.ID()) } - if router.Query(peerE.ID()) != nil { + if _, ok := router.Query(peerE.ID()); ok { t.Errorf("expected %v not registered in router after delete", peerE.ID()) } diff --git a/session.go b/session.go index b35414a..8c7c32e 100644 --- a/session.go +++ b/session.go @@ -41,8 +41,7 @@ type session struct { // Create a new secure session func newSession(conn net.Conn, kr KeyRing) (*session, error) { - var pb PublicKey - return &session{conn, kr, pb, nil, nil}, nil + return &session{conn, kr, PublicKey{}, nil, nil}, nil } // Set encryption/decryption state for session. diff --git a/subscriber.go b/subscriber.go index 2647738..fe29d08 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,8 +2,8 @@ package noise import "context" -// subscriber intercept Signal from already subscribed topics in broker -// Handle actions to emit or receive events. +// subscriber intercept Signal from already subscribed topics in broker. +// It handles actions to emit or receive events. type subscriber struct { // No, you don't need to close the channel // https://stackoverflow.com/questions/8593645/is-it-ok-to-leave-a-channel-open @@ -29,6 +29,10 @@ func (s *subscriber) Listen(ctx context.Context, ch chan<- Signal) { // select await both of these values simultaneously, executing each one as it arrives. select { case <-ctx.Done(): + // It's OK to leave a Go channel open forever and never close it. + // When the channel is no longer used, it will be garbage collected. + // But "Closing the channel is a control signal on the channel indicating that no more data follows." + close(ch) return case msg := <-s.notification: ch <- msg // write only channel chan<-