@@ -367,18 +367,18 @@ protected override void Unregister()
367
367
/// <summary>
368
368
/// Waits until a packet of the specified type is received and matches the given condition.
369
369
/// </summary>
370
- /// <typeparam name="T ">The type of the packet.</typeparam>
370
+ /// <typeparam name="TPacket ">The type of the packet.</typeparam>
371
371
/// <param name="condition">A function that evaluates the packet and returns true if the condition is met.</param>
372
372
/// <param name="cancellationToken">A token to cancel the wait for the matching packet.</param>
373
373
/// <returns>A task that completes once a packet matching the condition is received.</returns>
374
- public Task WaitForPacketWhere < T > ( Func < T , Task < bool > > condition , CancellationToken cancellationToken = default )
375
- where T : IPacket
374
+ public Task < TPacket > WaitForPacketWhere < TPacket > ( Func < TPacket , Task < bool > > condition , CancellationToken cancellationToken = default )
375
+ where TPacket : IPacket
376
376
{
377
377
// linked token is required to cancel the task when the client is disconnected
378
378
var cts = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken , CancellationToken ) ;
379
379
var token = cts . Token ;
380
- var tcs = new TaskCompletionSource ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
381
- async Task PacketHandler ( T packet )
380
+ var tcs = new TaskCompletionSource < TPacket > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
381
+ async Task PacketHandler ( TPacket packet )
382
382
{
383
383
try
384
384
{
@@ -388,7 +388,7 @@ async Task PacketHandler(T packet)
388
388
}
389
389
if ( await condition ( packet ) . WaitAsync ( token ) )
390
390
{
391
- tcs . TrySetResult ( ) ;
391
+ tcs . TrySetResult ( packet ) ;
392
392
}
393
393
}
394
394
catch ( OperationCanceledException e )
@@ -400,31 +400,33 @@ async Task PacketHandler(T packet)
400
400
tcs . TrySetException ( e ) ;
401
401
}
402
402
}
403
- var packetRegistration = On < T > ( PacketHandler ) ;
403
+ var packetRegistration = On < TPacket > ( PacketHandler ) ;
404
404
if ( packetRegistration == null )
405
405
{
406
406
// TODO: Can this occur?
407
407
cts . Dispose ( ) ;
408
408
throw new InvalidOperationException ( "Could not register packet handler" ) ;
409
409
}
410
- return tcs . Task . ContinueWith ( _ =>
410
+ // this registration is required because otherwise the task will only get cancelled when the next packet of that ype is received
411
+ var cancellationRegistration = token . Register ( ( ) =>
412
+ {
413
+ // cancelling the tcs will later dispose the other stuff
414
+ tcs . TrySetCanceled ( token ) ;
415
+ } ) ;
416
+ tcs . Task . ContinueWith ( _ =>
411
417
{
418
+ cancellationRegistration . Dispose ( ) ;
412
419
packetRegistration . Dispose ( ) ;
413
420
cts . Dispose ( ) ;
414
421
} , TaskContinuationOptions . ExecuteSynchronously ) ;
422
+ return tcs . Task ;
415
423
}
416
424
417
- /// <summary>
418
- /// Waits until a packet of the specified type is received and matches the given condition.
419
- /// </summary>
420
- /// <typeparam name="T">The type of the packet.</typeparam>
421
- /// <param name="condition">A function that evaluates the packet and returns true if the condition is met.</param>
422
- /// <param name="cancellationToken">A token to cancel the wait for the matching packet.</param>
423
- /// <returns>A task that completes once a packet matching the condition is received.</returns>
424
- public Task WaitForPacketWhere < T > ( Func < T , bool > condition , CancellationToken cancellationToken = default )
425
- where T : IPacket
425
+ /// <inheritdoc cref="WaitForPacketWhere{TPacket}(Func{TPacket, Task{bool}}, CancellationToken)"/>
426
+ public Task < TPacket > WaitForPacketWhere < TPacket > ( Func < TPacket , bool > condition , CancellationToken cancellationToken = default )
427
+ where TPacket : IPacket
426
428
{
427
- return WaitForPacketWhere < T > ( packet => Task . FromResult ( condition ( packet ) ) , cancellationToken ) ;
429
+ return WaitForPacketWhere < TPacket > ( packet => Task . FromResult ( condition ( packet ) ) , cancellationToken ) ;
428
430
}
429
431
430
432
/// <summary>
@@ -550,7 +552,7 @@ internal void HandleBundleDelimiter()
550
552
551
553
private async Task ProcessBundledPackets ( ConcurrentQueue < ( PacketType , PacketBuffer ) > packets )
552
554
{
553
- Logger . Debug ( $ "Processing { packets . Count } bundled packets") ;
555
+ Logger . Trace ( $ "Processing { packets . Count } bundled packets") ;
554
556
try
555
557
{
556
558
// wiki.vg: the client is guaranteed to process every packet in the bundle on the same tick
@@ -581,8 +583,9 @@ private async Task StreamLoop()
581
583
try
582
584
{
583
585
// run both tasks in parallel
584
- var receiveTask = Task . Factory . StartNew ( ReceivePackets , TaskCreationOptions . LongRunning ) ;
585
- var sendTask = Task . Factory . StartNew ( SendPackets , TaskCreationOptions . LongRunning ) ;
586
+ // because the task factory does not unwrap the tasks (like Task.Run) we need to do it manually
587
+ var receiveTask = Task . Factory . StartNew ( ReceivePackets , TaskCreationOptions . LongRunning ) . Unwrap ( ) ;
588
+ var sendTask = Task . Factory . StartNew ( SendPackets , TaskCreationOptions . LongRunning ) . Unwrap ( ) ;
586
589
587
590
// extract the exception from the task that finished first
588
591
await await Task . WhenAny ( receiveTask , sendTask ) ;
@@ -601,79 +604,97 @@ private async Task StreamLoop()
601
604
602
605
private async Task ReceivePackets ( )
603
606
{
604
- while ( true )
607
+ try
605
608
{
606
- CancellationToken . ThrowIfCancellationRequested ( ) ;
609
+ while ( true )
610
+ {
611
+ CancellationToken . ThrowIfCancellationRequested ( ) ;
607
612
608
- var buffer = stream ! . ReadPacket ( ) ;
613
+ var buffer = stream ! . ReadPacket ( ) ;
609
614
610
- var packetId = buffer . ReadVarInt ( ) ;
611
- var gameState = gameStatePacketHandler . GameState ;
612
- var packetType = Data . Protocol . GetPacketType ( PacketFlow . Clientbound , gameState , packetId ) ;
615
+ var packetId = buffer . ReadVarInt ( ) ;
616
+ var gameState = gameStatePacketHandler . GameState ;
617
+ var packetType = Data . Protocol . GetPacketType ( PacketFlow . Clientbound , gameState , packetId ) ;
613
618
614
- Logger . Trace ( "Received packet {PacketType}. GameState = {GameState}, PacketId = {PacketId}" , packetType , gameState , packetId ) ;
619
+ Logger . Trace ( "Received packet {PacketType}. GameState = {GameState}, PacketId = {PacketId}" , packetType , gameState , packetId ) ;
615
620
616
- // handle BundleDelimiter packet here, because there is a race condition where some
617
- // packets may be read before HandleBundleDelimiter is invoked through a handler
618
- if ( packetType == PacketType . CB_Play_BundleDelimiter )
619
- {
620
- HandleBundleDelimiter ( ) ;
621
- continue ;
622
- }
621
+ // handle BundleDelimiter packet here, because there is a race condition where some
622
+ // packets may be read before HandleBundleDelimiter is invoked through a handler
623
+ if ( packetType == PacketType . CB_Play_BundleDelimiter )
624
+ {
625
+ HandleBundleDelimiter ( ) ;
626
+ continue ;
627
+ }
623
628
624
- if ( gameState != GameState . Play )
625
- {
626
- await HandleIncomingPacket ( packetType , buffer ) ;
627
- }
628
- else
629
- {
630
- var bundledPackets = this . bundledPackets ;
631
- if ( bundledPackets != null )
629
+ if ( gameState != GameState . Play )
632
630
{
633
- bundledPackets . Enqueue ( ( packetType , buffer ) ) ;
631
+ await HandleIncomingPacket ( packetType , buffer ) ;
634
632
}
635
633
else
636
634
{
637
- // handle the packet in a new task to prevent blocking the stream loop
638
- _ = Task . Run ( ( ) => HandleIncomingPacket ( packetType , buffer ) ) ;
635
+ var bundledPackets = this . bundledPackets ;
636
+ if ( bundledPackets != null )
637
+ {
638
+ bundledPackets . Enqueue ( ( packetType , buffer ) ) ;
639
+ }
640
+ else
641
+ {
642
+ // handle the packet in a new task to prevent blocking the stream loop
643
+ _ = Task . Run ( ( ) => HandleIncomingPacket ( packetType , buffer ) ) ;
644
+ }
639
645
}
640
646
}
641
647
}
648
+ catch ( Exception e )
649
+ {
650
+ Logger . Debug ( e , "ReceivePackets loop ended with exception." ) ;
651
+ throw ;
652
+ }
653
+ // can never exit without exception because infinite loop without break
642
654
}
643
655
644
656
private async Task SendPackets ( )
645
657
{
646
- await foreach ( var task in packetQueue . ReceiveAllAsync ( ) )
658
+ try
647
659
{
648
- if ( task . Token . IsCancellationRequested )
660
+ await foreach ( var task in packetQueue . ReceiveAllAsync ( ) )
649
661
{
650
- task . Task . TrySetCanceled ( ) ;
651
- continue ;
652
- }
662
+ if ( task . Token . IsCancellationRequested )
663
+ {
664
+ task . Task . TrySetCanceled ( ) ;
665
+ continue ;
666
+ }
653
667
654
- try
655
- {
656
- DispatchPacket ( task . Packet ) ;
657
- task . Task . TrySetResult ( ) ;
658
- }
659
- catch ( OperationCanceledException e )
660
- {
661
- task . Task . TrySetCanceled ( e . CancellationToken ) ;
662
- // we should stop. So we do by rethrowing the exception
663
- throw ;
664
- }
665
- catch ( Exception e )
666
- {
667
- Logger . Error ( e , "Encountered exception while dispatching packet {PacketType}" , task . Packet . Type ) ;
668
- task . Task . TrySetException ( e ) ;
669
- if ( e is SocketException )
668
+ try
669
+ {
670
+ DispatchPacket ( task . Packet ) ;
671
+ task . Task . TrySetResult ( ) ;
672
+ }
673
+ catch ( OperationCanceledException e )
670
674
{
671
- // break the loop to prevent further packets from being sent
672
- // because the connection is probably dead
675
+ task . Task . TrySetCanceled ( e . CancellationToken ) ;
676
+ // we should stop. So we do by rethrowing the exception
673
677
throw ;
674
678
}
679
+ catch ( Exception e )
680
+ {
681
+ Logger . Error ( e , "Encountered exception while dispatching packet {PacketType}" , task . Packet . Type ) ;
682
+ task . Task . TrySetException ( e ) ;
683
+ if ( e is SocketException )
684
+ {
685
+ // break the loop to prevent further packets from being sent
686
+ // because the connection is probably dead
687
+ throw ;
688
+ }
689
+ }
675
690
}
676
691
}
692
+ catch ( Exception e )
693
+ {
694
+ Logger . Debug ( e , "SendPackets loop ended with exception." ) ;
695
+ throw ;
696
+ }
697
+ // can never exit without exception because infinite loop without break (because we never complete the BufferBlock we only cancel it)
677
698
}
678
699
679
700
private void DispatchPacket ( IPacket packet )
0 commit comments