@@ -74,6 +74,7 @@ type ADNL struct {
74
74
ourAddresses unsafe.Pointer
75
75
76
76
activeQueries map [string ]chan tl.Serializable
77
+ activePings map [int64 ]chan MessagePong
77
78
78
79
customMessageHandler unsafe.Pointer // CustomMessageHandler
79
80
queryHandler unsafe.Pointer // QueryHandler
@@ -98,6 +99,7 @@ func (g *Gateway) initADNL() *ADNL {
98
99
closerCtx : closerCtx ,
99
100
closeFn : closeFn ,
100
101
msgParts : make (map [string ]* partitionedMessage , 128 ),
102
+ activePings : make (map [int64 ]chan MessagePong ),
101
103
activeQueries : map [string ]chan tl.Serializable {},
102
104
}
103
105
}
@@ -220,7 +222,16 @@ func (a *ADNL) processPacket(packet *PacketContent, fromChannel bool) (err error
220
222
func (a * ADNL ) processMessage (message any ) error {
221
223
switch ms := message .(type ) {
222
224
case MessagePong :
223
- // TODO: record
225
+ a .mx .RLock ()
226
+ ch := a .activePings [ms .Value ]
227
+ a .mx .RUnlock ()
228
+
229
+ if ch != nil {
230
+ select {
231
+ case ch <- ms :
232
+ default :
233
+ }
234
+ }
224
235
case MessagePing :
225
236
buf , err := a .buildRequest (MessagePong {Value : ms .Value })
226
237
if err != nil {
@@ -294,7 +305,7 @@ func (a *ADNL) processMessage(message any) error {
294
305
return fmt .Errorf ("failed to setup channel: %w" , err )
295
306
}
296
307
case MessagePart :
297
- msgID := hex . EncodeToString (ms .Hash )
308
+ msgID := string (ms .Hash )
298
309
299
310
a .mx .Lock ()
300
311
p , ok := a .msgParts [msgID ]
@@ -306,11 +317,17 @@ func (a *ADNL) processMessage(message any) error {
306
317
307
318
if len (a .msgParts ) > 100 {
308
319
// cleanup old stuck messages
320
+ tm := time .Now ().Add (- 7 * time .Second )
309
321
for s , pt := range a .msgParts {
310
- if time . Since (pt .startedAt ) > 10 * time . Second {
322
+ if tm . After (pt .startedAt ) {
311
323
delete (a .msgParts , s )
312
324
}
313
325
}
326
+
327
+ if len (a .msgParts ) > 16 * 1024 {
328
+ a .mx .Unlock ()
329
+ return fmt .Errorf ("too many incomplete messages" )
330
+ }
314
331
}
315
332
316
333
p = newPartitionedMessage (ms .TotalSize )
@@ -441,6 +458,47 @@ reSplit:
441
458
return nil
442
459
}
443
460
461
+ func (a * ADNL ) Ping (ctx context.Context ) (time.Duration , error ) {
462
+ val := time .Now ().UnixNano ()
463
+ req , err := a .buildRequest (MessagePing {
464
+ Value : val ,
465
+ })
466
+ if err != nil {
467
+ return 0 , err
468
+ }
469
+
470
+ ch := make (chan MessagePong , 1 )
471
+ a .mx .Lock ()
472
+ a .activePings [val ] = ch
473
+ a .mx .Unlock ()
474
+
475
+ defer func () {
476
+ a .mx .Lock ()
477
+ delete (a .activePings , val )
478
+ a .mx .Unlock ()
479
+ }()
480
+
481
+ for {
482
+ try , cancel := context .WithTimeout (ctx , 1 * time .Second )
483
+
484
+ if err = a .send (req ); err != nil {
485
+ cancel ()
486
+ return 0 , fmt .Errorf ("failed to send ping packet: %w" , err )
487
+ }
488
+
489
+ select {
490
+ case <- ctx .Done ():
491
+ cancel ()
492
+ return 0 , ctx .Err ()
493
+ case <- ch :
494
+ cancel ()
495
+ return time .Since (time .Unix (0 , val )), nil
496
+ case <- try .Done ():
497
+ continue
498
+ }
499
+ }
500
+ }
501
+
444
502
func (a * ADNL ) Query (ctx context.Context , req , result tl.Serializable ) error {
445
503
q , err := createQueryMessage (req )
446
504
if err != nil {
0 commit comments