Skip to content

Commit

Permalink
DTLS transport (#164)
Browse files Browse the repository at this point in the history
DTLS transport implementation including transport establishment and tunneling as well as UDP packet handling in the rust detector portion of the station.
  • Loading branch information
mingyech authored Jul 26, 2023
1 parent bccc9aa commit 0acef79
Show file tree
Hide file tree
Showing 44 changed files with 2,576 additions and 364 deletions.
158 changes: 158 additions & 0 deletions cmd/application/connectingStats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package main

import (
"fmt"
"sync/atomic"

cj "github.com/refraction-networking/conjure/pkg/station/lib"
)

var _ cj.ConnectingTpStats = &connStats{}

type connectingCounts struct {
numCreatedConnecting int64
numDialSuccessfulConnecting int64
numListenSuccessfulConnecting int64
numSuccessfulConnecting int64
numTimeoutConnecting int64
numAuthFailConnecting int64
numOtherFailConnecting int64
}

func (c *connStats) AddCreatedConnecting(asn uint, cc string, tp string) {
atomic.AddInt64(&c.numCreatedConnecting, 1)

if isValidCC(cc) {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.v4geoIPMap[asn]; !ok {
// We haven't seen asn before, so add it to the map
c.v4geoIPMap[asn] = &asnCounts{}
c.v4geoIPMap[asn].cc = cc
}
atomic.AddInt64(&c.v4geoIPMap[asn].numCreatedConnecting, 1)
}
}

func (c *connStats) AddCreatedToListenSuccessfulConnecting(asn uint, cc string, tp string) {
atomic.AddInt64(&c.numCreatedConnecting, -1)
atomic.AddInt64(&c.numListenSuccessfulConnecting, 1)

if isValidCC(cc) {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.v4geoIPMap[asn]; !ok {
// We haven't seen asn before, so add it to the map
c.v4geoIPMap[asn] = &asnCounts{}
c.v4geoIPMap[asn].cc = cc
}
atomic.AddInt64(&c.v4geoIPMap[asn].numCreatedConnecting, -1)
atomic.AddInt64(&c.v4geoIPMap[asn].numListenSuccessfulConnecting, 1)
}
}

func (c *connStats) AddCreatedToDialSuccessfulConnecting(asn uint, cc string, tp string) {
atomic.AddInt64(&c.numCreatedConnecting, -1)
atomic.AddInt64(&c.numDialSuccessfulConnecting, 1)

if isValidCC(cc) {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.v4geoIPMap[asn]; !ok {
// We haven't seen asn before, so add it to the map
c.v4geoIPMap[asn] = &asnCounts{}
c.v4geoIPMap[asn].cc = cc
}
atomic.AddInt64(&c.v4geoIPMap[asn].numCreatedConnecting, -1)
atomic.AddInt64(&c.v4geoIPMap[asn].numDialSuccessfulConnecting, 1)
}
}

func (c *connStats) AddCreatedToSuccessfulConnecting(asn uint, cc string, tp string) {
atomic.AddInt64(&c.numCreatedConnecting, -1)
atomic.AddInt64(&c.numSuccessfulConnecting, 1)

if isValidCC(cc) {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.v4geoIPMap[asn]; !ok {
// We haven't seen asn before, so add it to the map
c.v4geoIPMap[asn] = &asnCounts{}
c.v4geoIPMap[asn].cc = cc
}
atomic.AddInt64(&c.v4geoIPMap[asn].numCreatedConnecting, -1)
atomic.AddInt64(&c.v4geoIPMap[asn].numSuccessfulConnecting, 1)
}
}

func (c *connStats) AddCreatedToTimeoutConnecting(asn uint, cc string, tp string) {
atomic.AddInt64(&c.numCreatedConnecting, -1)
atomic.AddInt64(&c.numTimeoutConnecting, 1)

if isValidCC(cc) {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.v4geoIPMap[asn]; !ok {
// We haven't seen asn before, so add it to the map
c.v4geoIPMap[asn] = &asnCounts{}
c.v4geoIPMap[asn].cc = cc
}
atomic.AddInt64(&c.v4geoIPMap[asn].numCreatedConnecting, -1)
atomic.AddInt64(&c.v4geoIPMap[asn].numTimeoutConnecting, 1)
}
}

func (c *connStats) AddSuccessfulToDiscardedConnecting(asn uint, cc string, tp string) {
}

func (c *connStats) AddAuthFailConnecting(asn uint, cc string, tp string) {
atomic.AddInt64(&c.numAuthFailConnecting, 1)

if isValidCC(cc) {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.v4geoIPMap[asn]; !ok {
// We haven't seen asn before, so add it to the map
c.v4geoIPMap[asn] = &asnCounts{}
c.v4geoIPMap[asn].cc = cc
}
atomic.AddInt64(&c.v4geoIPMap[asn].numAuthFailConnecting, 1)
}

}

func (c *connStats) AddOtherFailConnecting(asn uint, cc string, tp string) {
atomic.AddInt64(&c.numOtherFailConnecting, 1)

if isValidCC(cc) {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.v4geoIPMap[asn]; !ok {
// We haven't seen asn before, so add it to the map
c.v4geoIPMap[asn] = &asnCounts{}
c.v4geoIPMap[asn].cc = cc
}
atomic.AddInt64(&c.v4geoIPMap[asn].numOtherFailConnecting, 1)
}

}

func (c *connStats) resetConnecting() {
c.connectingCounts = connectingCounts{}
}

func (c *connectingCounts) string() string {
totalEndStates := atomic.LoadInt64(&c.numDialSuccessfulConnecting) + atomic.LoadInt64(&c.numListenSuccessfulConnecting) + atomic.LoadInt64(&c.numTimeoutConnecting) + atomic.LoadInt64(&c.numAuthFailConnecting) + atomic.LoadInt64(&c.numOtherFailConnecting)
if totalEndStates < 1 {
totalEndStates = 0
}
return fmt.Sprintf("%d %d %d %d %d %d %d",
atomic.LoadInt64(&c.numCreatedConnecting),
atomic.LoadInt64(&c.numDialSuccessfulConnecting),
atomic.LoadInt64(&c.numListenSuccessfulConnecting),
atomic.LoadInt64(&c.numTimeoutConnecting),
atomic.LoadInt64(&c.numAuthFailConnecting),
atomic.LoadInt64(&c.numOtherFailConnecting),
totalEndStates,
)
}
12 changes: 10 additions & 2 deletions cmd/application/conns.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ type statCounts struct {
totalTransitions int64 // Number of all transitions tracked
numNewConns int64 // Number new connections potentially handshaking
numResolved int64 // Number connections that have reached a terminal state.

connectingCounts
}

type asnCounts struct {
Expand All @@ -418,6 +420,8 @@ type connStats struct {
ipv6 statCounts
v4geoIPMap map[uint]*asnCounts
v6geoIPMap map[uint]*asnCounts

connectingCounts
}

func (c *connStats) PrintAndReset(logger *log.Logger) {
Expand All @@ -433,7 +437,7 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
}

if numASNs > 0 {
logger.Infof("conn-stats (IPv4): %d %d %d %d %d %.3f %d %.3f %d %.3f %d %.3f %d %.3f %d",
logger.Infof("conn-stats (IPv4): %d %d %d %d %d %.3f %d %.3f %d %.3f %d %.3f %d %.3f %d %s",
atomic.LoadInt64(&c.ipv4.numCreated),
atomic.LoadInt64(&c.ipv4.numReading),
atomic.LoadInt64(&c.ipv4.numChecking),
Expand All @@ -449,6 +453,7 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
atomic.LoadInt64(&c.ipv4.numClosed),
1000*float64(atomic.LoadInt64(&c.ipv4.numClosed))/epochDur,
numASNs,
c.connectingCounts.string(),
)
}

Expand Down Expand Up @@ -484,7 +489,7 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
}
for asn, counts := range val {
var tt = math.Max(1, float64(atomic.LoadInt64(&counts.totalTransitions)))
logger.Infof("conn-stats-verbose (IPv%d): %d %s %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %d %d %d %d",
logger.Infof("conn-stats-verbose (IPv%d): %d %s %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %d %d %d %d %s",
ip_ver,
asn,
counts.cc,
Expand Down Expand Up @@ -530,6 +535,7 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
atomic.LoadInt64(&counts.numNewConns),
atomic.LoadInt64(&c.ipv6.numResolved),
atomic.LoadInt64(&counts.numResolved),
counts.connectingCounts.string(),
)
}
}
Expand Down Expand Up @@ -602,6 +608,8 @@ func (c *connStats) reset() {
c.v6geoIPMap = make(map[uint]*asnCounts)

c.epochStart = time.Now()

c.resetConnecting()
}

func (c *connStats) addCreated(asn uint, cc string, isIPv4 bool) {
Expand Down
35 changes: 33 additions & 2 deletions cmd/application/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"net"
"os"
"os/signal"
"strconv"
Expand All @@ -12,6 +13,7 @@ import (

cj "github.com/refraction-networking/conjure/pkg/station/lib"
"github.com/refraction-networking/conjure/pkg/station/log"
"github.com/refraction-networking/conjure/pkg/transports/connecting/dtls"
"github.com/refraction-networking/conjure/pkg/transports/wrapping/min"
"github.com/refraction-networking/conjure/pkg/transports/wrapping/obfs4"
"github.com/refraction-networking/conjure/pkg/transports/wrapping/prefix"
Expand Down Expand Up @@ -53,7 +55,38 @@ func main() {
}
log.SetLevel(logLevel)

connManager := newConnManager(nil)

conf.RegConfig.ConnectingStats = connManager

regManager := cj.NewRegistrationManager(conf.RegConfig)

logIPDTLS := func(logger func(asn uint, cc, tp string)) func(*net.IP) {
return func(ip *net.IP) {
cc, err := regManager.GeoIP.CC(*ip)
if err != nil {
return
}

var asn uint = 0
if cc != "unk" {
asn, err = regManager.GeoIP.ASN(*ip)
if err != nil {
return
}
}

logger(asn, cc, "dtls")
}
}

dtlsTransport, err := dtls.NewTransport(logIPDTLS(connManager.AddAuthFailConnecting), logIPDTLS(connManager.AddOtherFailConnecting), logIPDTLS(connManager.AddCreatedToDialSuccessfulConnecting), logIPDTLS(connManager.AddCreatedToListenSuccessfulConnecting))

if err != nil {
log.Fatalf("failed to setup dtls: %v", err)
}
enabledTransports[pb.TransportType_DTLS] = dtlsTransport

sharedLogger = regManager.Logger
logger := sharedLogger
defer regManager.Cleanup()
Expand Down Expand Up @@ -98,8 +131,6 @@ func main() {
logger.Fatal("error creating ZMQ Ingest: %w", err)
}

connManager := newConnManager(nil)

cj.Stat().AddStatsModule(zmqIngester, false)
cj.Stat().AddStatsModule(regManager.LivenessTester, false)
cj.Stat().AddStatsModule(cj.GetProxyStats(), false)
Expand Down
2 changes: 2 additions & 0 deletions cmd/registration-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/refraction-networking/conjure/pkg/regserver/apiregserver"
"github.com/refraction-networking/conjure/pkg/regserver/dnsregserver"
"github.com/refraction-networking/conjure/pkg/station/lib"
"github.com/refraction-networking/conjure/pkg/transports/connecting/dtls"
"github.com/refraction-networking/conjure/pkg/transports/wrapping/min"
"github.com/refraction-networking/conjure/pkg/transports/wrapping/obfs4"
"github.com/refraction-networking/conjure/pkg/transports/wrapping/prefix"
Expand Down Expand Up @@ -52,6 +53,7 @@ var defaultTransports = map[pb.TransportType]lib.Transport{
pb.TransportType_Min: min.Transport{},
pb.TransportType_Obfs4: obfs4.Transport{},
pb.TransportType_Prefix: prefix.DefaultSet(),
pb.TransportType_DTLS: dtls.Transport{},
// [transports:enable]
}

Expand Down
15 changes: 11 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,48 @@ go 1.18

require (
github.com/BurntSushi/toml v1.2.1
github.com/Psiphon-Labs/psiphon-tunnel-core v0.0.14-beta-ios.0.20230714185657-14bf1ee6651a
github.com/flynn/noise v1.0.0
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/mux v1.8.0
github.com/hashicorp/golang-lru v0.6.0
github.com/libp2p/go-reuseport v0.3.0
github.com/mingyech/transport v0.1.1
github.com/mroth/weightedrand v1.0.0
github.com/oschwald/geoip2-golang v1.8.0
github.com/pebbe/zmq4 v1.2.9
github.com/pelletier/go-toml v1.9.5
github.com/pion/logging v0.2.2
github.com/pion/sctp v1.8.7
github.com/pion/stun v0.3.5
github.com/refraction-networking/ed25519 v0.1.2
github.com/refraction-networking/gotapdance v1.5.6
github.com/refraction-networking/gotapdance v1.6.0-dtlsbeta
github.com/refraction-networking/obfs4 v0.1.1
github.com/refraction-networking/utls v1.2.0
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/goptlib v1.4.0
golang.org/x/crypto v0.11.0
golang.org/x/net v0.12.0
google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.31.0
)

require github.com/pion/randutil v0.1.0 // indirect

require (
github.com/andybalholm/brotli v1.0.5-0.20220518190645-786ec621f618 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dchest/siphash v1.2.3 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gopacket v1.1.19
github.com/klauspost/compress v1.15.12 // indirect
github.com/mingyech/dtls v0.1.0
github.com/oschwald/maxminddb-golang v1.10.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sergeyfrolov/bsbuffer v0.0.0-20180903213811-94e85abb8507 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/sys v0.10.0
golang.org/x/text v0.11.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 0acef79

Please sign in to comment.