Skip to content

Commit 14cf9a0

Browse files
committed
fix: singleflight needs check for existing #171
1 parent 91edefa commit 14cf9a0

File tree

5 files changed

+112
-18
lines changed

5 files changed

+112
-18
lines changed

client.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,28 @@ func (c *Client) requestTransaction(ctx context.Context, req *sip.Request) (sip.
174174
return c.tx.Request(ctx, req)
175175
}
176176

177+
func (c *Client) newTransaction(ctx context.Context, req *sip.Request, onConnection func(conn sip.Connection) error) (sip.ClientTransaction, error) {
178+
if c.TxRequester != nil {
179+
return c.TxRequester.Request(ctx, req)
180+
}
181+
182+
tx, err := c.tx.NewClientTransaction(ctx, req)
183+
if err != nil {
184+
return nil, err
185+
}
186+
187+
if err := onConnection(tx.Connection()); err != nil {
188+
tx.Terminate()
189+
return nil, err
190+
}
191+
192+
err = tx.Init()
193+
if err != nil {
194+
tx.Terminate()
195+
}
196+
return tx, err
197+
}
198+
177199
// Do request is HTTP client like Do request/response.
178200
// It returns on final response.
179201
// NOTE: Canceling ctx WILL not send Cancel Request which is needed for INVITE. Use dialog API for dealing with dialogs

client_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package sipgo
22

33
import (
44
"context"
5+
"io"
6+
"net"
57
"os"
8+
"runtime"
69
"sort"
710
"strings"
11+
"sync"
812
"testing"
913

1014
"github.com/emiago/sipgo/sip"
@@ -291,6 +295,40 @@ func TestDigestAuthLowerCase(t *testing.T) {
291295
require.NoError(t, err)
292296
}
293297

298+
func TestClientParalelDialing(t *testing.T) {
299+
if os.Getenv("TEST_INTEGRATION") == "" {
300+
t.Skip("Use TEST_INTEGRATION env value to run this test")
301+
return
302+
}
303+
304+
ua, err := NewUA()
305+
require.NoError(t, err)
306+
307+
l, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 15090})
308+
require.NoError(t, err)
309+
go func() {
310+
io.ReadAll(l)
311+
}()
312+
313+
c, err := NewClient(ua,
314+
WithClientHostname("10.0.0.0"),
315+
WithClientConnectionAddr("127.0.0.1:15080"),
316+
)
317+
require.NoError(t, err)
318+
wg := sync.WaitGroup{}
319+
for i := 0; i < 2*runtime.NumCPU(); i++ {
320+
wg.Add(1)
321+
go func() {
322+
defer wg.Done()
323+
req := sip.NewRequest(sip.INVITE, sip.Uri{Host: "127.0.0.1", Port: 15090})
324+
err := c.WriteRequest(req)
325+
require.NoError(t, err)
326+
}()
327+
}
328+
329+
wg.Wait()
330+
}
331+
294332
func BenchmarkClientTransactionRequestBuild(t *testing.B) {
295333
ua, err := NewUA()
296334
require.Nil(t, err)

dialog_client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,17 @@ func (s *DialogClientSession) Do(ctx context.Context, req *sip.Request) (*sip.Re
9090
// This ensures that you have proper request done within dialog. You should avoid setting any Dialog header (cseq, from, to, callid)
9191
func (s *DialogClientSession) TransactionRequest(ctx context.Context, req *sip.Request) (sip.ClientTransaction, error) {
9292
s.buildReq(req)
93+
94+
// Overrides contact header with local connection addr
95+
if cont := req.Contact(); cont.Address.Host == "" {
96+
tx, err := s.UA.Client.newTransaction(ctx, req, func(conn sip.Connection) error {
97+
var err error
98+
cont.Address.Host, cont.Address.Port, err = sip.ParseAddr(conn.LocalAddr().String())
99+
return err
100+
})
101+
return tx, err
102+
}
103+
93104
// Passing option to avoid CSEQ apply
94105
return s.UA.Client.TransactionRequest(ctx, req, s.requestValidate)
95106
}

sip/transaction_layer.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,19 @@ func (txl *TransactionLayer) handleResponse(res *Response) error {
193193
}
194194

195195
func (txl *TransactionLayer) Request(ctx context.Context, req *Request) (*ClientTx, error) {
196+
tx, err := txl.NewClientTransaction(ctx, req)
197+
if err != nil {
198+
return nil, err
199+
}
200+
201+
if err := tx.Init(); err != nil {
202+
tx.Terminate()
203+
return nil, err
204+
}
205+
return tx, nil
206+
}
207+
208+
func (txl *TransactionLayer) NewClientTransaction(ctx context.Context, req *Request) (*ClientTx, error) {
196209
if req.IsAck() {
197210
return nil, fmt.Errorf("ACK request must be sent directly through transport")
198211
}
@@ -202,7 +215,11 @@ func (txl *TransactionLayer) Request(ctx context.Context, req *Request) (*Client
202215
return nil, err
203216
}
204217

205-
return txl.clientTxRequest(ctx, req, key)
218+
tx, err := txl.clientTxRequest(ctx, req, key)
219+
if err != nil {
220+
return nil, err
221+
}
222+
return tx, nil
206223
}
207224

208225
func (txl *TransactionLayer) clientTxRequest(ctx context.Context, req *Request, key string) (*ClientTx, error) {
@@ -232,11 +249,6 @@ func (txl *TransactionLayer) clientTxCreate(ctx context.Context, req *Request, k
232249
}
233250

234251
tx := NewClientTx(key, req, conn, txl.log)
235-
if err := tx.Init(); err != nil {
236-
tx.Terminate()
237-
return nil, err
238-
}
239-
240252
return tx, nil
241253
}
242254

sip/transport_connection_pool.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,34 @@ func (p *ConnectionPool) addSingleflight(raddr Addr, laddr Addr, reuse bool, do
5555

5656
if laddr.Port > 0 || reuse {
5757
// TODO: remplement this here to avoid type conversion
58-
conn, err, shared := p.sf.Do(laddr.String()+raddr.String(), func() (any, error) {
59-
return do()
60-
})
61-
if err != nil {
62-
return nil, err
58+
if c := p.Get(a); c != nil {
59+
return c, nil
6360
}
64-
c := conn.(Connection)
6561

66-
if shared {
67-
return c, nil
62+
laddrStr := laddr.String()
63+
if laddr.Port > 0 {
64+
if c := p.Get(laddrStr); c != nil {
65+
return c, nil
66+
}
6867
}
6968

70-
p.Lock()
71-
defer p.Unlock()
69+
conn, err, _ := p.sf.Do(laddrStr+a, func() (any, error) {
70+
c, err := do()
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
p.Lock()
76+
defer p.Unlock()
7277

73-
p.m[a] = c
74-
p.m[c.LocalAddr().String()] = c
78+
p.m[a] = c
79+
p.m[c.LocalAddr().String()] = c
80+
return c, nil
81+
})
82+
if err != nil {
83+
return nil, err
84+
}
85+
c := conn.(Connection)
7586
return c, nil
7687
}
7788

0 commit comments

Comments
 (0)