Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pion/logging"
"github.com/pion/stun"
"github.com/pion/turn/v2/internal/proto"
"github.com/pion/turn/v2/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -227,7 +228,7 @@ func TestTCPClient(t *testing.T) {
require.NoError(t, err)

client, err := NewClient(&ClientConfig{
Conn: NewSTUNConn(conn),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this breaking the API by removing NewSTUNConn?

I agree that isnt a nice API. And I would much like to see it moved to pion/stun (see #308 for a related issue).

I think we currently have a lot STUN related code in pion/turn. E.g. PerformTransaction which would be better located in pion/stun. However, this is a different issue which we should not address in this PR.

Conn: utils.NewSTUNConn(conn),
STUNServerAddr: serverAddr,
TURNServerAddr: serverAddr,
Username: "foo",
Expand Down
3 changes: 2 additions & 1 deletion examples/turn-client/tcp-alloc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/pion/logging"
"github.com/pion/turn/v2"
"github.com/pion/turn/v2/utils"
)

func setupSignalingChannel(addrCh chan string, signaling bool, relayAddr string) {
Expand Down Expand Up @@ -98,7 +99,7 @@ func main() {
cfg := &turn.ClientConfig{
STUNServerAddr: turnServerAddr,
TURNServerAddr: turnServerAddr,
Conn: turn.NewSTUNConn(conn),
Conn: utils.NewSTUNConn(conn),
Username: cred[0],
Password: cred[1],
Realm: *realm,
Expand Down
3 changes: 2 additions & 1 deletion examples/turn-client/tcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/pion/logging"
"github.com/pion/turn/v2"
"github.com/pion/turn/v2/utils"
)

func main() {
Expand Down Expand Up @@ -51,7 +52,7 @@ func main() {
cfg := &turn.ClientConfig{
STUNServerAddr: turnServerAddr,
TURNServerAddr: turnServerAddr,
Conn: turn.NewSTUNConn(conn),
Conn: utils.NewSTUNConn(conn),
Username: cred[0],
Password: cred[1],
Realm: *realm,
Expand Down
1 change: 1 addition & 0 deletions examples/turn-server/tls/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func main() {
RelayAddress: net.ParseIP(*publicIP),
Address: "0.0.0.0",
},
Protocol: "tls",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we break this for existing users if they dont set the protocol?

},
},
})
Expand Down
182 changes: 163 additions & 19 deletions internal/allocation/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"net"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/pion/logging"
"github.com/pion/stun"
"github.com/pion/turn/v2/internal/ipnet"
"github.com/pion/turn/v2/internal/proto"
"golang.org/x/sys/unix"
)

type allocationResponse struct {
Expand All @@ -24,18 +26,23 @@ type allocationResponse struct {
// Allocation is tied to a FiveTuple and relays traffic
// use CreateAllocation and GetAllocation to operate
type Allocation struct {
RelayAddr net.Addr
Protocol Protocol
TurnSocket net.PacketConn
RelaySocket net.PacketConn
fiveTuple *FiveTuple
permissionsLock sync.RWMutex
permissions map[string]*Permission
channelBindingsLock sync.RWMutex
channelBindings []*ChannelBind
lifetimeTimer *time.Timer
closed chan interface{}
log logging.LeveledLogger
RelayAddr net.Addr
Protocol Protocol
RequestedTransportProtocol proto.Protocol
TurnSocket net.PacketConn
RelaySocket net.PacketConn
RelayListener net.Listener
fiveTuple *FiveTuple
permissionsLock sync.RWMutex
permissions map[string]*Permission
channelBindingsLock sync.RWMutex
channelBindings []*ChannelBind
lifetimeTimer *time.Timer
closed chan interface{}
log logging.LeveledLogger
connsLock sync.RWMutex
addrToConn map[string]net.Conn
cidToConn map[proto.ConnectionID]net.Conn

// Some clients (Firefox or others using resiprocate's nICE lib) may retry allocation
// with same 5 tuple when received 413, for compatible with these clients,
Expand All @@ -45,13 +52,16 @@ type Allocation struct {
}

// NewAllocation creates a new instance of NewAllocation.
func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger) *Allocation {
func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger, requestedTransportProtocol proto.Protocol) *Allocation {
return &Allocation{
TurnSocket: turnSocket,
fiveTuple: fiveTuple,
permissions: make(map[string]*Permission, 64),
closed: make(chan interface{}),
log: log,
TurnSocket: turnSocket,
RequestedTransportProtocol: requestedTransportProtocol,
fiveTuple: fiveTuple,
permissions: make(map[string]*Permission, 64),
addrToConn: make(map[string]net.Conn),
cidToConn: make(map[proto.ConnectionID]net.Conn),
closed: make(chan interface{}),
log: log,
}
}

Expand Down Expand Up @@ -208,7 +218,11 @@ func (a *Allocation) Close() error {
}
a.channelBindingsLock.RUnlock()

return a.RelaySocket.Close()
if a.RequestedTransportProtocol == proto.ProtoTCP {
return a.RelayListener.Close()
} else {
return a.RelaySocket.Close()
}
}

// https://tools.ietf.org/html/rfc5766#section-10.3
Expand Down Expand Up @@ -284,3 +298,133 @@ func (a *Allocation) packetHandler(m *Manager) {
}
}
}

func (a *Allocation) GetConnectionByAddr(peerAddr string) net.Conn {
a.connsLock.RLock()
defer a.connsLock.RUnlock()
return a.addrToConn[peerAddr]
}

func (a *Allocation) GetConnectionByID(cid proto.ConnectionID) net.Conn {
a.connsLock.RLock()
defer a.connsLock.RUnlock()
return a.cidToConn[cid]
}

func (a *Allocation) newConnection(cid proto.ConnectionID, dst string) error {
a.connsLock.Lock()
a.addrToConn[dst] = nil
a.cidToConn[cid] = nil
a.connsLock.Unlock()

dialer := &net.Dialer{
LocalAddr: a.RelayAddr,
Control: func(network, address string, c syscall.RawConn) error {
var err error
c.Control(func(fd uintptr) {
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR|unix.SO_REUSEPORT, 1)
})
return err
},
}

conn, err := dialer.Dial("tcp", dst)
if err != nil {
return err
}

a.connsLock.Lock()
a.addrToConn[dst] = conn
a.cidToConn[cid] = conn
a.connsLock.Unlock()

return nil
}

func (a *Allocation) removeConnection(cid proto.ConnectionID, dst string) {
a.connsLock.Lock()
c := a.cidToConn[cid]
delete(a.addrToConn, dst)
delete(a.cidToConn, cid)
a.connsLock.Unlock()
c.Close()
}

func (a *Allocation) connectionHandler(m *Manager) {
for {
// When a server receives an incoming TCP connection on a relayed
// transport address, it processes the request as follows.
// The server MUST accept the connection. If it is not successful,
// nothing is sent to the client over the control connection.
conn, err := a.RelayListener.Accept()
if err != nil {
m.DeleteAllocation(a.fiveTuple)
return
}

a.log.Debugf("relay listener %s received connection from %s",
a.RelayListener.Addr().String(),
conn.RemoteAddr().String())

// If the connection is successfully accepted, it is now called a peer
// data connection. The server MUST buffer any data received from the
// peer. The server adjusts its advertised TCP receive window to
// reflect the amount of empty buffer space.

// If no permission for this peer has been installed for this
// allocation, the server MUST close the connection with the peer
// immediately after it has been accepted.

if p := a.GetPermission(conn.RemoteAddr()); p == nil {
a.log.Infof("No Permission or Channel exists for %v on allocation %v", conn.RemoteAddr(), a.RelayAddr.String())
conn.Close()
continue
}

// Otherwise, the server sends a ConnectionAttempt indication to the
// client over the control connection. The indication MUST include an
// XOR-PEER-ADDRESS attribute containing the peer's transport address,
// as well as a CONNECTION-ID attribute uniquely identifying the peer
// data connection.
cid := m.newCID(a)

a.connsLock.Lock()
a.addrToConn[conn.RemoteAddr().String()] = conn
a.cidToConn[cid] = conn
a.connsLock.Unlock()

msg, err := stun.Build(
stun.TransactionID,
stun.NewType(stun.MethodConnectionAttempt, stun.ClassIndication),
)
if err != nil {
a.log.Errorf("Failed to build MethodConnectionAttempt message %v", err)
continue
}

addr, ok := conn.RemoteAddr().(*net.TCPAddr)
if !ok {
a.log.Errorf("Failed to parse remote tcp address")
continue
}

peerAddr := proto.PeerAddress{}
peerAddr.IP = addr.IP
peerAddr.Port = addr.Port

if err = peerAddr.AddTo(msg); err != nil {
a.log.Errorf("Failed to build MethodConnectionAttempt message %v", err)
return
}

attrCid := proto.ConnectionID(cid)
attrCid.AddTo(msg)
a.TurnSocket.WriteTo(msg.Raw, a.fiveTuple.SrcAddr)

// If no ConnectionBind request associated with this peer data
// connection is received after 30 seconds, the peer data connection
// MUST be closed.

go m.removeAfter30(cid, conn.RemoteAddr())
}
}
Loading