Skip to content

Fix/signal issue #3896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions signal/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"net"
"net/http"
"runtime"

// nolint:gosec
_ "net/http/pprof"
"strings"
Expand Down Expand Up @@ -176,9 +179,29 @@

func startPprof() {
go func() {
log.Debugf("Starting pprof server on 127.0.0.1:6060")
if err := http.ListenAndServe("127.0.0.1:6060", nil); err != nil {
log.Fatalf("pprof server failed: %v", err)
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/heap", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/block", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/mutex", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/cmdline", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/profile", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/symbol", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/trace", http.DefaultServeMux.ServeHTTP)
mux.HandleFunc("/debug/pprof/threadcreate", http.DefaultServeMux.ServeHTTP)

mux.HandleFunc("/debug/memstats", func(w http.ResponseWriter, r *http.Request) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(m)

Check failure on line 197 in signal/cmd/run.go

View workflow job for this annotation

GitHub Actions / Linux

Error return value of `(*encoding/json.Encoder).Encode` is not checked (errcheck)

Check failure on line 197 in signal/cmd/run.go

View workflow job for this annotation

GitHub Actions / Darwin

Error return value of `(*encoding/json.Encoder).Encode` is not checked (errcheck)

Check failure on line 197 in signal/cmd/run.go

View workflow job for this annotation

GitHub Actions / Windows

Error return value of `(*encoding/json.Encoder).Encode` is not checked (errcheck)
})

debugAddr := "127.0.0.1:6060"
log.Infof("started pprof and memstats server on %s", debugAddr)
err := http.ListenAndServe(debugAddr, mux)
if err != nil {
log.Errorf("failed to serve debug endpoint: %s", err)
}
}()
}
Expand Down
60 changes: 60 additions & 0 deletions signal/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import (
"sync"
"time"

"errors"

log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/signal/metrics"
"github.com/netbirdio/netbird/signal/proto"
)

var (
ErrPeerAlreadyRegistered = errors.New("peer already registered")
)

// Peer representation of a connected Peer
type Peer struct {
// a unique id of the Peer (e.g. sha256 fingerprint of the Wireguard public key)
Expand All @@ -23,6 +29,8 @@ type Peer struct {

// registration time
RegisteredAt time.Time

Cancel context.CancelFunc
}

// NewPeer creates a new instance of a connected Peer
Expand All @@ -35,6 +43,17 @@ func NewPeer(id string, stream proto.SignalExchange_ConnectStreamServer) *Peer {
}
}

// NewPeer creates a new instance of a connected Peer
func NewPeerPool(id string, stream proto.SignalExchange_ConnectStreamServer, cancel context.CancelFunc) *Peer {
return &Peer{
Id: id,
Stream: stream,
StreamID: time.Now().UnixNano(),
RegisteredAt: time.Now(),
Cancel: cancel,
}
}

// Registry that holds all currently connected Peers
type Registry struct {
// Peer.key -> Peer
Expand Down Expand Up @@ -94,6 +113,38 @@ func (registry *Registry) Register(peer *Peer) {
registry.metrics.Registrations.Add(context.Background(), 1)
}

// Register registers peer in the registry
func (registry *Registry) RegisterPool(peer *Peer) error {
start := time.Now()

// can be that peer already exists, but it is fine (e.g. reconnect)
p, loaded := registry.Peers.LoadOrStore(peer.Id, peer)
if loaded {
pp := p.(*Peer)
if peer.StreamID > pp.StreamID {
log.Tracef("peer [%s] is already registered [new streamID %d, previous StreamID %d]. Will override stream.",
peer.Id, peer.StreamID, pp.StreamID)
if swapped := registry.Peers.CompareAndSwap(peer.Id, pp, peer); !swapped {
return registry.RegisterPool(peer)
}
pp.Cancel()
log.Debugf("peer re-registered [%s]", peer.Id)
return nil
}
return ErrPeerAlreadyRegistered
}

log.Debugf("peer registered [%s]", peer.Id)
registry.metrics.ActivePeers.Add(context.Background(), 1)

// record time as milliseconds
registry.metrics.RegistrationDelay.Record(context.Background(), float64(time.Since(start).Nanoseconds())/1e6)

registry.metrics.Registrations.Add(context.Background(), 1)

return nil
}

// Deregister Peer from the Registry (usually once it disconnects)
func (registry *Registry) Deregister(peer *Peer) {
registry.regMutex.Lock()
Expand All @@ -113,3 +164,12 @@ func (registry *Registry) Deregister(peer *Peer) {
registry.metrics.Deregistrations.Add(context.Background(), 1)
}
}

// Deregister Peer from the Registry (usually once it disconnects)
func (registry *Registry) DeregisterPool(peer *Peer) {
if deleted := registry.Peers.CompareAndDelete(peer.Id, peer); deleted {
registry.metrics.ActivePeers.Add(context.Background(), -1)
log.Debugf("peer deregistered [%s]", peer.Id)
registry.metrics.Deregistrations.Add(context.Background(), 1)
}
}
Loading
Loading