From b065e480daa79bc0f708fa59054ac3ca6cd00ed9 Mon Sep 17 00:00:00 2001 From: geolffreym Date: Sun, 7 Apr 2024 09:51:48 -0600 Subject: [PATCH 1/9] refactor: closing signal channel after listening get closed. Closing the channel is a control signal on the channel indicating that no more data follows. --- .vscode/targets.log | 41 ++++++++++---------- Makefile | 2 +- node.go | 11 ++++-- node_test.go | 92 +++++++++++++++++++++++++++------------------ peer.go | 2 +- router.go | 4 +- session.go | 3 +- subscriber.go | 4 ++ 8 files changed, 92 insertions(+), 67 deletions(-) diff --git a/.vscode/targets.log b/.vscode/targets.log index edac7bc..aaa54c6 100644 --- a/.vscode/targets.log +++ b/.vscode/targets.log @@ -6,7 +6,7 @@ make all --print-data-base --no-builtin-variables --no-builtin-rules --question # This is free software: you are free to change and redistribute it. # There is NO WARRANTY, to the extent permitted by law. -# Make data base, printed on Sat Feb 4 16:13:37 2023 +# Make data base, printed on Sat Aug 12 08:13:38 2023 # Variables @@ -31,6 +31,8 @@ LC_NAME = es_NI.UTF-8 # environment LC_NUMERIC = es_NI.UTF-8 # environment +VSCODE_CRASH_REPORTER_PROCESS_TYPE = extensionHost +# environment VSCODE_CWD = /home/gmena # environment WINDOWPATH = 2 @@ -47,7 +49,6 @@ GOPATH = /home/gmena/go # environment VSCODE_HANDLES_UNCAUGHT_ERRORS = true # makefile (from 'Makefile', line 13) - BINARY = main # default .VARIABLES := @@ -62,7 +63,7 @@ XDG_DATA_DIRS = /usr/share/ubuntu:/usr/share/gnome:/home/gmena/.local/share/flat # automatic %F = $(notdir $%) # environment -VSCODE_CODE_CACHE_PATH = /home/gmena/.config/Code/CachedData/97dec172d3256f8ca4bfb2143f3f76b503ca0534 +VSCODE_CODE_CACHE_PATH = /home/gmena/.config/Code/CachedData/6c3e3dba23e8fadc360aed75ce363ba185c49794 # environment LANG = C # environment @@ -99,11 +100,10 @@ XDG_CONFIG_DIRS = /etc/xdg/xdg-ubuntu:/etc/xdg XDG_SESSION_DESKTOP = ubuntu # makefile (from 'Makefile', line 1) MAKEFILE_LIST := Makefile - # automatic @F = $(notdir $@) # environment -VSCODE_PID = 5099 +VSCODE_PID = 9684 # environment XDG_SESSION_TYPE = x11 # automatic @@ -111,7 +111,7 @@ XDG_SESSION_TYPE = x11 # makefile (from 'Makefile', line 8) PACKAGE = p2p-noise # environment -SESSION_MANAGER = local/gmena-ThinkPad-P70:@/tmp/.ICE-unix/2791,unix/gmena-ThinkPad-P70:/tmp/.ICE-unix/2791 +SESSION_MANAGER = local/gmena-ThinkPad-P70:@/tmp/.ICE-unix/3633,unix/gmena-ThinkPad-P70:/tmp/.ICE-unix/3633 # automatic *F = $(notdir $*) # environment @@ -123,7 +123,7 @@ DBUS_SESSION_BUS_ADDRESS = unix:path=/run/user/1000/bus # automatic Date: Mon, 8 Apr 2024 13:24:14 -0600 Subject: [PATCH 2/9] test(refactor): refactored performance test + parallel run --- Makefile | 4 +- node_test.go | 327 ++++++++++++++++++++++++++----------------------- peer.go | 1 - router.go | 2 + router_test.go | 16 ++- 5 files changed, 187 insertions(+), 163 deletions(-) diff --git a/Makefile b/Makefile index 6c6dc9b..b062ad4 100644 --- a/Makefile +++ b/Makefile @@ -75,11 +75,11 @@ preview-doc: build: @go build -v ./... -code-fmt: +format: @go fmt ./... @echo "[OK] code format finished" -code-check: +check: @go vet -v ./... @echo "[OK] code check finished" diff --git a/node_test.go b/node_test.go index 3f629d5..ff0e423 100644 --- a/node_test.go +++ b/node_test.go @@ -7,22 +7,16 @@ import ( "io/ioutil" "log" "os" - "sync" "testing" "time" "github.com/geolffreym/p2p-noise/config" ) -// TODO reutilizar los encoders/decoders -// TODO test performance with big file sharing -// TODO session test -// TODO handshake test - // phase 1: metrics for adaptive lookup // phase 2: compression using brotli vs gzip // phase 2 discovery module -func traceMessageBetweenTwoPeers(nodeA *Node, nodeB *Node, expected string) bool { +func traceMessageBetweenTwoPeers(nodeB *Node, expected string) bool { ready := make(chan bool) // Node B events channel @@ -78,26 +72,23 @@ start: } -func whenReadyForIncomingDial(nodes []*Node) *sync.WaitGroup { +func whenReadyForIncomingDial(node *Node) <-chan bool { // Wait until all the nodes are ready for incoming connections. - var wg sync.WaitGroup - - for _, node := range nodes { - wg.Add(1) - go node.Listen() - // Populate wait group - go func(n *Node) { - signals, _ := n.Signals() - for signal := range signals { - if signal.Type() == SelfListening { - wg.Done() - return - } + ready := make(chan bool) + + go node.Listen() + // Populate wait group + go func(n *Node) { + signals, _ := n.Signals() + for signal := range signals { + if signal.Type() == SelfListening { + ready <- true + return } - }(node) - } + } + }(node) - return &wg + return ready } func TestWithZeroFutureDeadline(t *testing.T) { @@ -109,81 +100,6 @@ func TestWithZeroFutureDeadline(t *testing.T) { } -func BenchmarkNodesSecureMessageExchange(b *testing.B) { - // Discard logs to avoid extra allocations. - log.SetOutput(ioutil.Discard) - - expected := "Hello node B" - ready := make(chan bool) - configurationA := config.New() - configurationB := config.New() - - b.ResetTimer() - b.ReportAllocs() - - for n := 0; n < b.N; n++ { - b.StopTimer() - nodeA := New(configurationA) - nodeB := New(configurationB) - - go nodeB.Listen() - go nodeA.Listen() - - // Lets send a message from A to B and see - // if we receive the expected decrypted message - go func(node *Node) { - // Node A events channel - signalsA, _ := node.Signals() - for signalA := range signalsA { - switch signalA.Type() { - case SelfListening: - ready <- true - case NewPeerDetected: - // send a message to node b after handshake ready - id := signalA.Payload() // here we receive the remote peer id - // Send a message to nodeB. - // Underneath the message is encrypted and signed with local Private Key before send. - nodeA.Send(id, []byte(expected)) - case PeerDisconnected: - // fmt.Printf("peerDisconnected %x \n", signalA.Payload()) - return - } - } - }(nodeA) - - // wait until node a gets ready - <-ready - - // wait until handshake is done - nodeB.Dial(nodeA.LocalAddr().String()) - - // we need to measure message exchange only so we start time here - // sign + encryption + marshall + transmission - - b.StartTimer() - // Node B events channel - signalsB, cancel := nodeB.Signals() - for signalB := range signalsB { - switch signalB.Type() { - case MessageReceived: - // When a new message is received: - // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. - got := signalB.Payload() - cancel() // stop the signaling if not the loop will ream open forever - if got != expected { - b.Errorf("Expected incoming message equal to %s", expected) - } - - } - } - - b.StopTimer() - nodeA.Close() - nodeB.Close() - } - -} - func TestTwoNodesHandshakeTrace(t *testing.T) { expectedBehavior := []string{ @@ -207,13 +123,17 @@ func TestTwoNodesHandshakeTrace(t *testing.T) { nodeA := New(configurationA) nodeB := New(configurationB) + go nodeA.Listen() + // then just close nodes + defer nodeA.Close() + defer nodeB.Close() + // wait until node is listening to start dialing - whenReadyForIncomingDial([]*Node{nodeA, nodeB}).Wait() + <-whenReadyForIncomingDial(nodeA) + // Just dial to start handshake and close. nodeB.Dial(nodeASocket) // wait until handshake is done - // then just close nodes - nodeA.Close() - nodeB.Close() + }) } @@ -222,29 +142,61 @@ func TestPoolBufferSizeForMessageExchange(t *testing.T) { log.SetOutput(ioutil.Discard) configurationA := config.New() configurationB := config.New() - // Growing byte size dynamically - for x := 0; x < 10; x++ { - byteSize := 1 << x - b := make([]byte, byteSize) - rand.Read(b) - expected := string(b) - configurationA.Write(config.SetPoolBufferSize(byteSize)) - configurationB.Write(config.SetPoolBufferSize(byteSize)) + byteSize := 1 << 4 + ready := make(chan bool) + b := make([]byte, byteSize) - nodeA := New(configurationA) - nodeB := New(configurationB) + rand.Read(b) // fill buffer with pseudorandom numbers + expected := string(b) + configurationA.Write(config.SetPoolBufferSize(byteSize)) + configurationB.Write(config.SetPoolBufferSize(byteSize)) - go nodeB.Listen() - go nodeA.Listen() + nodeA := New(configurationA) + nodeB := New(configurationB) - validMessage := traceMessageBetweenTwoPeers(nodeA, nodeB, expected) - if !validMessage { - t.Errorf("expected valid message equal to %s", expected) + go nodeB.Listen() + go nodeA.Listen() + defer nodeA.Close() + defer nodeB.Close() + + // Lets send a message from A to B and see + // if we receive the expected decrypted message + go func(node *Node) { + // Node A events channel + signalsA, _ := node.Signals() + for signalA := range signalsA { + switch signalA.Type() { + case SelfListening: + ready <- true + case NewPeerDetected: + // send a message to node b after handshake ready + id := signalA.Payload() // here we receive the remote peer id + // Start interaction with remote peer + // Underneath the message is encrypted and signed with local Private Key before send. + nodeA.Send(id, []byte(expected)) + } } + }(nodeA) + + <-ready - nodeA.Close() - nodeB.Close() + // Node B events channel + nodeB.Dial(nodeA.LocalAddr().String()) + signalsB, cancel := nodeB.Signals() + + for signalB := range signalsB { + if signalB.Type() == MessageReceived { + // When a new message is received: + // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. + got := signalB.Payload() + cancel() // stop the signaling + + if got != expected { + t.Errorf("expected valid message equal to %s", expected) + } + + } } } @@ -271,18 +223,11 @@ func TestSomeNodesHandshake(t *testing.T) { nodeC := New(configurationC) nodeD := New(configurationD) - var nodes = []*Node{ - nodeA, - nodeB, - nodeC, - nodeD, - } - // When all peers are listening then start dialing between them. - whenReadyForIncomingDial(nodes).Wait() + <-whenReadyForIncomingDial(nodeA) + nodeB.Dial(nodeASocket) nodeC.Dial(nodeASocket) - nodeC.Dial(nodeBSocket) nodeD.Dial(nodeBSocket) // Network events channel @@ -308,44 +253,116 @@ func BenchmarkHandshakeProfile(b *testing.B) { // Discard logs to avoid extra allocations. log.SetOutput(ioutil.Discard) + configurationA := config.New() + configurationA.Write( + config.SetPoolBufferSize(1 << 2), + ) + + nodeA := New(configurationA) + go nodeA.Listen() + defer nodeA.Close() + + <-whenReadyForIncomingDial(nodeA) + b.ResetTimer() b.ReportAllocs() - for n := 0; n < b.N; n++ { - b.StopTimer() - - var peers []*Node - var peersNumber int = 1 + b.RunParallel(func(pb *testing.PB) { - nodeAddress := "127.0.0.1:9095" - configurationA := config.New() - configurationA.Write( - config.SetSelfListeningAddress(nodeAddress), - config.SetPoolBufferSize(1<<2), - ) + b.StopTimer() + for pb.Next() { - nodeA := New(configurationA) - for i := 0; i < peersNumber; i++ { - address := "127.0.0.1:" + b.StopTimer() configuration := config.New() configuration.Write( - config.SetSelfListeningAddress(address), - config.SetPoolBufferSize(1<<2), + config.SetPoolBufferSize(1 << 2), ) + node := New(configuration) - peers = append(peers, node) + + // Start timer to measure the handshake process. + // Handshake start when two nodes are connected and isn't happening before dial. + // Avoid to add prev initialization. + b.StartTimer() + node.Dial(nodeA.LocalAddr().String()) + node.Close() + + } + + }) + +} + +func BenchmarkNodesSecureMessageExchange(b *testing.B) { + // Discard logs to avoid extra allocations. + log.SetOutput(ioutil.Discard) + + ready := make(chan bool) + configurationA := config.New() + configurationB := config.New() + + nodeA := New(configurationA) + go nodeA.Listen() + defer nodeA.Close() + + // Lets send a message from A to B and see + // if we receive the expected decrypted message + go func(node *Node) { + // Node A events channel + signalsA, _ := node.Signals() + for signalA := range signalsA { + switch signalA.Type() { + case SelfListening: + ready <- true + case MessageReceived: + // When a new message is received: + // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. + signalA.Reply([]byte("pong")) + } } + }(nodeA) + + // wait until node a gets ready + <-ready + + b.ResetTimer() + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + + nodeB := New(configurationB) + defer nodeB.Close() + + // wait until handshake is done + nodeB.Dial(nodeA.LocalAddr().String()) + signalsB, cancel := nodeB.Signals() - whenReadyForIncomingDial(append(peers, nodeA)).Wait() - // Start timer to measure the handshake process. - // Handshake start when two nodes are connected and isn't happening before dial. - // Avoid to add prev initialization. b.StartTimer() - for _, peer := range peers { - peer.Dial(nodeAddress) - peer.Close() + + for pb.Next() { + + // we need to measure message exchange only so we start time here + // sign + encryption + marshall + transmission + // Node B events channel + + for signalB := range signalsB { + switch signalB.Type() { + case NewPeerDetected: + // send a message to node b after handshake ready + id := signalB.Payload() // here we receive the remote peer id + // Start interaction with remote peer + // Underneath the message is encrypted and signed with local Private Key before send. + nodeB.Send(id, []byte("ping")) + case MessageReceived: + // When a new message is received: + // Underneath the message is verified with remote PublicKey and decrypted with DH SharedKey. + if signalB.Payload() == "pong" { + cancel() + } + } + } + } - nodeA.Close() - } + }) } diff --git a/peer.go b/peer.go index ef8273e..810fdce 100644 --- a/peer.go +++ b/peer.go @@ -18,7 +18,6 @@ type packet struct { Msg []byte // 24 byte Digest } -// TODO marshall using embed encoded to reduce overhead? // TODO Usar pipeline -> compress, cypher and sign? // https://go.dev/blog/pipelines diff --git a/router.go b/router.go index 0acc0f2..59abb0f 100644 --- a/router.go +++ b/router.go @@ -7,6 +7,8 @@ import ( // router keep a hash table to associate ID with peer. // It implements a unstructured mesh topology. +// Unstructured P2P topologies do not attempt to organize all peers into a single, structured topology. +// Rather, each peer attempts to keep a "sensible" set of other peers in its routing table type router struct { sync.Map // embed map counter uint32 diff --git a/router_test.go b/router_test.go index bf2b328..fc0029f 100644 --- a/router_test.go +++ b/router_test.go @@ -79,15 +79,21 @@ func TestTable(t *testing.T) { router.Add(peerA) router.Add(peerB) - var counter int = 0 +LOOP: for peer := range router.Table() { got := peer.ID().String() - e := expected[counter] - if got != e { - t.Errorf("expected corresponding table peer entry %x, got %x", e, got) + + for _, expect := range expected { + if expect == got { + // move to loop and start again with next + // this approach is equivalent to run a needle in a haystack + // and avoid the error if match found forwarding the iteration to the main loop + continue LOOP + } } - counter++ + + t.Errorf("expected corresponding table matching entry %x", got) } } From 0e3ab9d531eb5368db78025475738b6dee548cb2 Mon Sep 17 00:00:00 2001 From: geolffreym Date: Mon, 8 Apr 2024 13:38:59 -0600 Subject: [PATCH 3/9] docs: update README performance stats --- Makefile | 2 +- README.md | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index b062ad4..b68cfba 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,7 @@ benchmark: # eg. go tool pprof -web bin/main-linux-amd64 cpu.prof profiling: - @perflock -governor=80% go test -benchmem -run=^$ -benchtime 1s -bench=. -cpu 1,2,4,8 -count 2 -memprofile mem.prof -cpup rofile cpu.prof + @perflock -governor=80% go test -benchmem -run=^$ -benchtime 1s -bench=. -cpu 1,2,4,8 -count=1 -memprofile mem.prof -cpup rofile cpu.prof @echo "[OK] profiling finished" coverage: diff --git a/README.md b/README.md index f5f7aa5..d6df1e6 100644 --- a/README.md +++ b/README.md @@ -80,21 +80,21 @@ func main() { Using [perflock](https://github.com/aclements/perflock) to prevent our benchmarks from using too much CPU at once. ```text -perflock -governor=80% go test -benchmem -run=^$ -benchtime 1s -bench=BenchmarkHandshakeProfile -cpu 1,2,4,8 -count 2 -memprofile mem.prof -cpuprofile cpu.prof +perflock -governor=80% go test -benchmem -run=^$ -benchtime 1s -bench=. -cpu 1,2,4,8 -count=1 goos: linux goarch: amd64 pkg: github.com/geolffreym/p2p-noise cpu: Intel(R) Xeon(R) CPU E3-1505M v5 @ 2.80GHz -BenchmarkHandshakeProfile 962 1061028 ns/op 34391 B/op 267 allocs/op -BenchmarkHandshakeProfile 1322 919647 ns/op 34383 B/op 267 allocs/op -BenchmarkHandshakeProfile-2 1294 834506 ns/op 37095 B/op 285 allocs/op -BenchmarkHandshakeProfile-2 1398 858845 ns/op 36872 B/op 284 allocs/op -BenchmarkHandshakeProfile-4 1395 875618 ns/op 41912 B/op 323 allocs/op -BenchmarkHandshakeProfile-4 1341 914046 ns/op 41858 B/op 323 allocs/op -BenchmarkHandshakeProfile-8 1276 879535 ns/op 42055 B/op 324 allocs/op -BenchmarkHandshakeProfile-8 1279 929125 ns/op 41812 B/op 323 allocs/op +BenchmarkHandshakeProfile 726 1575256 ns/op 46959 B/op 363 allocs/op +BenchmarkHandshakeProfile-2 1548 1037351 ns/op 47100 B/op 364 allocs/op +BenchmarkHandshakeProfile-4 2460 908573 ns/op 49885 B/op 383 allocs/op +BenchmarkHandshakeProfile-8 2127 736442 ns/op 60454 B/op 457 allocs/op +BenchmarkNodesSecureMessageExchange 29032570 35.03 ns/op 0 B/op 0 allocs/op +BenchmarkNodesSecureMessageExchange-2 59745247 16.78 ns/op 0 B/op 0 allocs/op +BenchmarkNodesSecureMessageExchange-4 124446950 9.454 ns/op 0 B/op 0 allocs/op +BenchmarkNodesSecureMessageExchange-8 151214516 7.088 ns/op 0 B/op 0 allocs/op PASS -ok github.com/geolffreym/p2p-noise 13.201s +ok github.com/geolffreym/p2p-noise 18.865s ``` From 79d73f54c1d65138b071047660e53ad335038a94 Mon Sep 17 00:00:00 2001 From: geolffreym Date: Wed, 10 Apr 2024 07:31:00 -0600 Subject: [PATCH 4/9] refactor: removed nested condition to improve readability --- node.go | 13 ++++++++----- node_test.go | 1 + router.go | 2 ++ 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/node.go b/node.go index 2189a94..e3c8005 100644 --- a/node.go +++ b/node.go @@ -297,11 +297,14 @@ func (n *Node) Close() error { // close peer connections go n.Disconnect() - if n.listener != nil { - // stop listener for listening node only - if err := n.listener.Close(); err != nil { - return err - } + + // stop listener for listening node only + if n.listener == nil { + return nil + } + + if err := n.listener.Close(); err != nil { + return err } return nil diff --git a/node_test.go b/node_test.go index ff0e423..3188b89 100644 --- a/node_test.go +++ b/node_test.go @@ -13,6 +13,7 @@ import ( "github.com/geolffreym/p2p-noise/config" ) +// TODO test exchange big messages // phase 1: metrics for adaptive lookup // phase 2: compression using brotli vs gzip // phase 2 discovery module diff --git a/router.go b/router.go index 59abb0f..58adad6 100644 --- a/router.go +++ b/router.go @@ -20,8 +20,10 @@ func newRouter() *router { // Table return fan out channel with routed peers. func (r *router) Table() <-chan *peer { + // buffered channel ch := make(chan *peer, r.Len()) // ref: https://pkg.go.dev/sync#Map.Range + // generate valid peers from table r.Range(func(_, value any) bool { if p, ok := value.(*peer); ok { ch <- p From 9f4c25b2701be655e2b1df948e4855a8db0ab940 Mon Sep 17 00:00:00 2001 From: geolffreym Date: Wed, 10 Apr 2024 09:34:42 -0600 Subject: [PATCH 5/9] refactor: in band error check for query peer in route table --- config/config.go | 4 +++- node.go | 8 +++----- router.go | 7 ++++--- router_test.go | 10 +++++----- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/config/config.go b/config/config.go index 1e452c7..e5ee816 100644 --- a/config/config.go +++ b/config/config.go @@ -1,5 +1,7 @@ // Package conf provide a "functional option" design pattern to handle node settings. -// See also: https://github.com/crazybber/awesome-patterns/blob/master/idiom/functional-options.md +// See more about [Functional Options]. +// +// [Functional Options]: https://github.com/crazybber/awesome-patterns/blob/master/idiom/functional-options.md package config import "time" diff --git a/node.go b/node.go index e3c8005..e58be46 100644 --- a/node.go +++ b/node.go @@ -1,5 +1,3 @@ -//Copyright (c) 2022, Geolffrey Mena - // P2P Noise Library. // Please read more about [Noise Protocol]. // @@ -103,8 +101,9 @@ func (n *Node) Disconnect() { func (n *Node) Send(rawID string, message []byte) (uint32, error) { id := newIDFromString(rawID) // Check if id exists in connected peers - peer := n.router.Query(id) - if peer == nil { + // check in-band error + peer, ok := n.router.Query(id) + if !ok { err := fmt.Errorf("remote peer disconnected: %s", id.String()) return 0, errSendingMessage(err) } @@ -175,7 +174,6 @@ func (n *Node) setupTCPConnection(conn *net.TCPConn) error { // handshake starts a new handshake for incoming or dialed connection. // After handshake completes a new session is created and a new peer is created to be added to router. -// Marshaling data to/from on the network path as a "chain of responsibility". // If TCP protocol is used connection is enforced to keep alive. // Return err if max peers connected exceed MaxPeerConnected otherwise return nil. func (n *Node) handshake(conn net.Conn, initialize bool) error { diff --git a/router.go b/router.go index 58adad6..56c4da4 100644 --- a/router.go +++ b/router.go @@ -38,16 +38,17 @@ func (r *router) Table() <-chan *peer { } // Query return connection interface based on socket parameter. -func (r *router) Query(id ID) *peer { +// In-band error returned +func (r *router) Query(id ID) (*peer, bool) { // exist socket related peer? p, exists := r.Load(id) peer, ok := p.(*peer) if !exists || !ok { - return nil + return nil, false } - return peer + return peer, ok } // Add forward method to internal sync.Map store for peer. diff --git a/router_test.go b/router_test.go index fc0029f..d60aec1 100644 --- a/router_test.go +++ b/router_test.go @@ -19,7 +19,7 @@ func TestAdd(t *testing.T) { for _, e := range expected { t.Run(fmt.Sprintf("%x", e), func(t *testing.T) { // Match recently added peer - if p := router.Query(e); p == nil { + if _, ok := router.Query(e); !ok { t.Errorf("expected routed peer id %x", e.String()) } }) @@ -39,7 +39,7 @@ func TestQuery(t *testing.T) { for _, e := range expected { t.Run(fmt.Sprintf("%x", e), func(t *testing.T) { // Return the socket related peer - if peer := router.Query(e); peer == nil { + if peer, ok := router.Query(e); !ok { t.Errorf("expected peer for valid socket %#v, got %v", e.String(), peer) } }) @@ -51,7 +51,7 @@ func TestQuery(t *testing.T) { func TestInvalidQuery(t *testing.T) { router := newRouter() id := mockID(PeerBPb) - if peer := router.Query(id); peer != nil { + if peer, ok := router.Query(id); ok { t.Errorf("expected nil for invalid socket %#v, got %v", PeerBPb, peer) } @@ -112,11 +112,11 @@ func TestDelete(t *testing.T) { router.Remove(peerB) router.Remove(peerE) - if router.Query(peerB.ID()) != nil { + if _, ok := router.Query(peerB.ID()); ok { t.Errorf("expected %v not registered in router after delete", peerB.ID()) } - if router.Query(peerE.ID()) != nil { + if _, ok := router.Query(peerE.ID()); ok { t.Errorf("expected %v not registered in router after delete", peerE.ID()) } From 940d675618a16007ab67d24d754fb51e706baafb Mon Sep 17 00:00:00 2001 From: geolffreym Date: Wed, 10 Apr 2024 10:14:11 -0600 Subject: [PATCH 6/9] refactor: rename cyphertext variable to domain name --- id.go | 2 -- node.go | 2 +- peer.go | 10 +++++----- router.go | 7 ++++--- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/id.go b/id.go index 941be6a..8e6ad96 100644 --- a/id.go +++ b/id.go @@ -30,9 +30,7 @@ func newIDFromString(s string) ID { // newBlake2ID creates a new id blake2 hash based. func newBlake2ID(plaintext []byte) ID { var id ID - // Hash hash := blake2(plaintext) - // Populate id copy(id[:], hash) return id } diff --git a/node.go b/node.go index e58be46..d212193 100644 --- a/node.go +++ b/node.go @@ -1,4 +1,4 @@ -// P2P Noise Library. +// Noise based P2P Library. // Please read more about [Noise Protocol]. // // [Noise Protocol]: http://www.noiseprotocol.org/noise.html diff --git a/peer.go b/peer.go index 810fdce..e210ad4 100644 --- a/peer.go +++ b/peer.go @@ -99,19 +99,19 @@ func (p *peer) Send(msg []byte) (uint32, error) { // Encrypt packet with message and signature inside. // we need to re-slice the buffer to avoid overflow slice in internal append. - digest, err := p.s.Encrypt(buffer[:0], packed.Bytes()) + ciphertext, err := p.s.Encrypt(buffer[:0], packed.Bytes()) if err != nil { return 0, err } // 4 bytes for message size. - err = binary.Write(p.s, binary.BigEndian, uint32(len(digest))) + err = binary.Write(p.s, binary.BigEndian, uint32(len(ciphertext))) if err != nil { return 0, err } // stream encrypted packet - bytes, err := p.s.Write(digest) + bytes, err := p.s.Write(ciphertext) if err != nil { return 0, err } @@ -145,9 +145,9 @@ func (p *peer) Listen() ([]byte, error) { if err == nil { // decrypt incoming messages - digest := buffer[:size] + ciphertext := buffer[:size] // Reuse the buffer[:0] = reset slice from byte pool. - raw, err := p.s.Decrypt(buffer[:0], digest) + raw, err := p.s.Decrypt(buffer[:0], ciphertext) if err != nil { return nil, err } diff --git a/router.go b/router.go index 56c4da4..6ca0fe7 100644 --- a/router.go +++ b/router.go @@ -38,9 +38,10 @@ func (r *router) Table() <-chan *peer { } // Query return connection interface based on socket parameter. -// In-band error returned +// In-band error returned. This return value may be an error, or a boolean when no explanation is needed. +// refer: https://go.dev/wiki/CodeReviewComments func (r *router) Query(id ID) (*peer, bool) { - // exist socket related peer? + p, exists := r.Load(id) peer, ok := p.(*peer) @@ -48,7 +49,7 @@ func (r *router) Query(id ID) (*peer, bool) { return nil, false } - return peer, ok + return peer, true } // Add forward method to internal sync.Map store for peer. From 1e16e0fea3737fa06971b94c244aae7805e3c7d8 Mon Sep 17 00:00:00 2001 From: geolffreym Date: Wed, 10 Apr 2024 12:03:24 -0600 Subject: [PATCH 7/9] refactor: rename typo cypher => cipher --- handshake.go | 4 ++-- id.go | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/handshake.go b/handshake.go index 8c6a734..9db848e 100644 --- a/handshake.go +++ b/handshake.go @@ -203,9 +203,9 @@ func newHandshake(conn net.Conn, initiator bool) (*handshake, error) { // Setup the max of size possible for tokens exchanged between peers. edKeyLen := ed25519.PublicKeySize // 32 bytes dhKeyLen := 2 * noise.DH25519.DHLen() // 64 bytes - cypherLen := 2 * chacha20poly1305.Overhead // 32 bytes + cipherLen := 2 * chacha20poly1305.Overhead // 32 bytes // Sum the needed memory size for pool - size := dhKeyLen + edKeyLen + cypherLen + headerSize + size := dhKeyLen + edKeyLen + cipherLen + headerSize pool := bpool.NewBytePool(bPools, size) // N bytes pool // Create a new session handler diff --git a/id.go b/id.go index 8e6ad96..1c76237 100644 --- a/id.go +++ b/id.go @@ -5,7 +5,8 @@ import ( "unsafe" ) -// [ID] it's identity provider for peer. +// [ID] serves as the identity for peers. +// It facilitates addressability in router table. type ID [32]byte // Bytes return a byte slice representation for id. @@ -14,14 +15,17 @@ func (i ID) Bytes() []byte { } // String return a string representation for 32-bytes hash. +// ref: https://go.dev/ref/spec#Conversions func (i ID) String() string { - return (string)(i[:]) + return string(i[:]) } // newIDFromString creates a new ID from string. // ref: https://stackoverflow.com/questions/59209493/how-to-use-unsafe-get-a-byte-slice-from-a-string-without-memory-copy +// ref: https://go.dev/ref/spec#Conversions func newIDFromString(s string) ID { // "no-copy" convert to ID from string. + // If the type starts with the operator * or <-, it must be parenthesized when necessary to avoid ambiguity. return *(*ID)(unsafe.Pointer( (*reflect.StringHeader)(unsafe.Pointer(&s)).Data), ) From 1f596fa0347bedf8080170f1cfe705c7979cb22e Mon Sep 17 00:00:00 2001 From: geolffreym Date: Fri, 12 Apr 2024 15:05:47 -0600 Subject: [PATCH 8/9] refactor: added makefile help --- Makefile | 41 +++++++++++++++++++++++++++++++++-------- README.md | 6 +++--- node.go | 46 ++++++++++++++++++++++++++-------------------- node_test.go | 4 +--- peer.go | 3 +-- router.go | 6 +++--- subscriber.go | 4 ++-- 7 files changed, 69 insertions(+), 41 deletions(-) diff --git a/Makefile b/Makefile index b68cfba..9f7109a 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,7 @@ OSX_64=${BINARY_LINUX}-${ARCH_64} # -count 1 idiomatic no cached testing # -race test race condition for routines # @ = dont echo the output +.PHONY: test ## run tests test: @go test -v ./... -count 1 -race -covermode=atomic @echo "[OK] test finished" @@ -36,8 +37,9 @@ test: # make benchmark > a.old # make benchmark > b.new # benchcmp a.old b.new +.PHONY: benchmark ## run benchmark tests benchmark: - @go test ./... -bench=. -benchtime 100000x -count 5 + @perflock -governor=80% go test -run=^Benchmarck$ -benchtime 1s -bench=. -count=1 @echo "[OK] benchmark finished" @@ -54,35 +56,46 @@ benchmark: # For fancy visualization: # Could use Graphviz (https://graphviz.org/download/) # eg. go tool pprof -web bin/main-linux-amd64 cpu.prof - +.PHONY: profiling ## run profiling tests profiling: - @perflock -governor=80% go test -benchmem -run=^$ -benchtime 1s -bench=. -cpu 1,2,4,8 -count=1 -memprofile mem.prof -cpup rofile cpu.prof + @perflock -governor=80% go test -run=^Benchmarck$ -benchmem -benchtime 1s -bench=. -cpu 1,2,4,8 -count=1 -memprofile mem.prof -cpuprofile cpu.prof @echo "[OK] profiling finished" +.PHONY: coverage ## run tests coverage coverage: @go test -v ./... -race -covermode=atomic -coverprofile coverage ./... @echo "[OK] coverage finished" - + +.PHONY: coverage-export ## run tests coverage export coverage-export: coverage @go tool cover -html=coverage @echo "[OK] code test coverage finished" # Allow to preview documentation. # Please verify your GOPATH before run this command +.PHONY: preview-doc ## run local documentation server preview-doc: @godoc -http=localhost:6060 -links=true +.PHONY: build ## compiles the command into and executable build: @go build -v ./... +imports: + @goimport + +.PHONY: format ## automatically formats Go source cod format: @go fmt ./... + @goimports -w . @echo "[OK] code format finished" +.PHONY: check ## examines Go source code and reports suspicious constructs check: @go vet -v ./... @echo "[OK] code check finished" +.PHONY: clean ## removes generated files and clean go cache clean: @go clean --cache ./... @rm -f mem.prof @@ -90,6 +103,7 @@ clean: @rm -rf bin @echo "[OK] cleaned" +.PHONY: compile-win ## compiles window exec compile-win: @GOOS=windows GOARCH=amd64 go build -o bin/${WIN_64} ${INPUT} @GOOS=windows GOARCH=386 go build -o bin/${WIN_32} ${INPUT} @@ -97,28 +111,39 @@ compile-win: #Go1.15 deprecates 32-bit macOS builds # go build -x to show compilation details #GOOS=darwin GOARCH=386 go build -o bin/main-mac-386 main.go +.PHONY: compile-mac ## compiles mac exec compile-mac: @GOOS=darwin GOARCH=amd64 go build -o bin/${OSX_64} ${INPUT} +.PHONY: compile-linux ## compiles linux exec compile-linux: @GOOS=linux GOARCH=amd64 go build -o bin/${LINUX_64} ${INPUT} @GOOS=linux GOARCH=386 go build -o bin/${LINUX_32} ${INPUT} +.PHONY: compile ## compiles all os exec compile: compile-linux compile-win compile-mac @echo "[OK] Compiling for every OS and Platform" -build-gc: - @go build -gcflags='-m -m' $(filter-out $@,$(MAKECMDGOALS)) - +.PHONY: run ## compiles and runs the named main Go package run: @go run ${INPUT} $(filter-out $@,$(MAKECMDGOALS)) +.PHONY: update-pkg-cache ## updated the package cache version update-pkg-cache: GOPROXY=https://proxy.golang.org GO111MODULE=on \ go get github.com/${USER}/${PACKAGE}@v${VERSION} +# https://go.dev/ref/mod#go-mod-vendor +.PHONY: vendorize ## lock dependencies vendorize: @go mod vendor @echo "[OK]" -all: build test check-test-coverage code-check compile \ No newline at end of file +all: build test check-test-coverage code-check compile + +.PHONY: help ## display this message +help: + @grep -E \ + '^.PHONY: .*?## .*$$' $(MAKEFILE_LIST) | \ + sort | \ + awk 'BEGIN {FS = ".PHONY: |## "}; {printf "\033[36m%-19s\033[0m %s\n", $$2, $$3}' \ No newline at end of file diff --git a/README.md b/README.md index d6df1e6..525fc13 100644 --- a/README.md +++ b/README.md @@ -107,12 +107,12 @@ Some available capabilities for dev support: * **Test Coverage**: `make coverage` * **Benchmark**: `make benchmark` * **Profiling**: `make profiling` -* **Code check**: `make code-check` -* **Code format**: `make code-fmt` +* **Code check**: `make check` +* **Code format**: `make format` * **Flush cache**: `make clean` * **Build**: `make build` -Note: Please check [Makefile](https://github.com/geolffreym/p2p-noise/Makefile) for more capabilities. +Note: Run `make help` to check for more capabilities. ## More info diff --git a/node.go b/node.go index d212193..220206e 100644 --- a/node.go +++ b/node.go @@ -1,5 +1,5 @@ -// Noise based P2P Library. -// Please read more about [Noise Protocol]. +// Package noise implements the Noise Protocol for peer-to-peer communication. +// For more information about the Noise Protocol, please visit: [Noise Protocol]. // // [Noise Protocol]: http://www.noiseprotocol.org/noise.html package noise @@ -14,7 +14,7 @@ import ( "github.com/oxtoacart/bpool" ) -// futureDeadline calculate a new time for deadline since now. +// futureDeadline calculate and return a new time for deadline since now. func futureDeadLine(deadline time.Duration) time.Time { if deadline == 0 { // deadline 0 = no deadline @@ -45,6 +45,8 @@ type Config interface { KeepAlive() time.Duration } +// Node represents a network node capable of handling connections, +// routing messages, and managing configurations. type Node struct { // Bound local network listener. listener net.Listener @@ -74,8 +76,9 @@ func New(config Config) *Node { } } -// Signals proxy channels to subscriber. -// The listening routine should be stopped using returned cancel func. +// Signals initiates the signaling process to proxy channels to subscribers. +// It returns a channel of type Signal to intercept events and a cancel function to stop the listening routine. +// The channel is closed during the cancellation of listening. func (n *Node) Signals() (<-chan Signal, context.CancelFunc) { ctx, cancel := context.WithCancel(context.Background()) // this channel is closed during listening cancellation @@ -95,9 +98,10 @@ func (n *Node) Disconnect() { } } -// Send emit a new message using peer id. -// If peer id doesn't exists or peer is not connected return error. -// Calling Send extends write deadline. +// Send emits a new message using a peer ID. +// It returns the total bytes sent if there is no error; otherwise, it returns 0. +// If the peer ID doesn't exist or the peer is not connected, it returns an error. +// Calling Send extends the write deadline. func (n *Node) Send(rawID string, message []byte) (uint32, error) { id := newIDFromString(rawID) // Check if id exists in connected peers @@ -116,9 +120,9 @@ func (n *Node) Send(rawID string, message []byte) (uint32, error) { return bytes, err } -// watch keep running waiting for incoming messages. -// After every new message the connection is verified, if local connection is closed or remote peer is disconnected the routine is stopped. -// Incoming message monitor is suggested to be processed in go routines. +// watch keeps running, waiting for incoming messages. +// After receiving each new message, the connection is verified. If the local connection is closed or the remote peer is disconnected, the routine stops. +// It is suggested to process incoming messages in separate goroutines. func (n *Node) watch(peer *peer) { KEEPALIVE: @@ -154,7 +158,9 @@ KEEPALIVE: } -// setupTCPConnection configure TCP connection behavior. +// setupTCPConnection configures the behavior of a TCP connection. +// It takes a net.TCPConn connection and modifies its settings according to the Node configuration. +// If any of the configurations cannot be fulfilled, it returns an error. func (n *Node) setupTCPConnection(conn *net.TCPConn) error { // If tcp enforce keep alive connection. // SetKeepAlive sets whether the operating system should send keep-alive messages on the connection. @@ -172,10 +178,10 @@ func (n *Node) setupTCPConnection(conn *net.TCPConn) error { return nil } -// handshake starts a new handshake for incoming or dialed connection. -// After handshake completes a new session is created and a new peer is created to be added to router. -// If TCP protocol is used connection is enforced to keep alive. -// Return err if max peers connected exceed MaxPeerConnected otherwise return nil. +// handshake initiates a new handshake for an incoming or dialed connection. +// After the handshake completes, a new session is created, and a new peer is added to the router. +// If the TCP protocol is used, the connection is enforced to keep alive. +// Returns an error if the maximum number of connected peers exceeds MaxPeersConnected; otherwise, returns nil. func (n *Node) handshake(conn net.Conn, initialize bool) error { // Assertion for tcp connection to keep alive @@ -223,8 +229,8 @@ func (n *Node) handshake(conn net.Conn, initialize bool) error { return nil } -// routing initialize route in routing table from session. -// Return the recent added peer. +// routing initializes a route in the routing table from a session. +// It returns the recently added peer. func (n *Node) routing(conn *session) *peer { // Initial deadline for connection. // A deadline is an absolute time after which I/O operations @@ -308,8 +314,8 @@ func (n *Node) Close() error { return nil } -// Dial attempt to connect to remote node and add connected peer to routing table. -// Return error if error occurred while dialing node. +// Dial attempts to connect to a remote node and adds the connected peer to the routing table. +// It returns an error if an error occurred while dialing the node. func (n *Node) Dial(addr string) error { protocol := n.config.Protocol() // eg. tcp timeout := n.config.DialTimeout() // max time waiting for dial. diff --git a/node_test.go b/node_test.go index 3188b89..54084f1 100644 --- a/node_test.go +++ b/node_test.go @@ -247,9 +247,7 @@ func TestSomeNodesHandshake(t *testing.T) { } -// go test -benchmem -run=^$ -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out -bench=BenchmarkHandshakeProfile -// go tool pprof {file} -func BenchmarkHandshakeProfile(b *testing.B) { +func BenchmarkHandshake(b *testing.B) { // Discard logs to avoid extra allocations. log.SetOutput(ioutil.Discard) diff --git a/peer.go b/peer.go index e210ad4..1fafcf5 100644 --- a/peer.go +++ b/peer.go @@ -43,8 +43,7 @@ func unmarshall(b []byte) packet { return p } -// peer its the trusty remote peer. -// Provide needed methods to interact with the secured session. +// peer represents a trusty remote peer, providing necessary methods to interact with the secured session. type peer struct { // Optimizing space with ordered types. // the attributes orders matters. diff --git a/router.go b/router.go index 6ca0fe7..4ed2e85 100644 --- a/router.go +++ b/router.go @@ -5,10 +5,10 @@ import ( "sync/atomic" ) -// router keep a hash table to associate ID with peer. -// It implements a unstructured mesh topology. +// router keeps a hash table to associate IDs with peers. +// It implements an unstructured mesh topology. // Unstructured P2P topologies do not attempt to organize all peers into a single, structured topology. -// Rather, each peer attempts to keep a "sensible" set of other peers in its routing table +// Rather, each peer attempts to keep a "sensible" set of other peers in its routing table. type router struct { sync.Map // embed map counter uint32 diff --git a/subscriber.go b/subscriber.go index cacb996..fe29d08 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,8 +2,8 @@ package noise import "context" -// subscriber intercept Signal from already subscribed topics in broker -// Handle actions to emit or receive events. +// subscriber intercept Signal from already subscribed topics in broker. +// It handles actions to emit or receive events. type subscriber struct { // No, you don't need to close the channel // https://stackoverflow.com/questions/8593645/is-it-ok-to-leave-a-channel-open From 481e2e2097be7cd77096f77818c377428ed7de91 Mon Sep 17 00:00:00 2001 From: geolffreym Date: Sun, 14 Apr 2024 08:36:10 -0600 Subject: [PATCH 9/9] refactor: added makefile help --- Makefile | 3 --- peer.go | 3 --- 2 files changed, 6 deletions(-) diff --git a/Makefile b/Makefile index 9f7109a..8ffdebd 100644 --- a/Makefile +++ b/Makefile @@ -81,9 +81,6 @@ preview-doc: build: @go build -v ./... -imports: - @goimport - .PHONY: format ## automatically formats Go source cod format: @go fmt ./... diff --git a/peer.go b/peer.go index 1fafcf5..575d7f0 100644 --- a/peer.go +++ b/peer.go @@ -18,9 +18,6 @@ type packet struct { Msg []byte // 24 byte Digest } -// TODO Usar pipeline -> compress, cypher and sign? -// https://go.dev/blog/pipelines - // TODO Establecer de manera dinámica el send buffer y receiver buffer en el peer y no en el nodo, de modo que se pued la establecerlo usando las métricas // https://community.f5.com/t5/technical-articles/the-tcp-send-buffer-in-depth/ta-p/290760