From b40d86d6d37fe2d35dba80698deef8372a044bf7 Mon Sep 17 00:00:00 2001
From: Oleh Dokuka <oleh.dokuka@icloud.com>
Date: Thu, 10 Feb 2022 14:55:53 +0200
Subject: [PATCH] adds graceful shutdown support

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
---
 .../core/TestRequesterResponderSupport.java   |   4 +-
 .../src/main/java/io/rsocket/RSocket.java     |   2 +
 .../io/rsocket/core/RSocketConnector.java     |  15 +-
 .../io/rsocket/core/RSocketRequester.java     |  29 ++-
 .../io/rsocket/core/RSocketResponder.java     |  15 +-
 .../java/io/rsocket/core/RSocketServer.java   |  12 +-
 .../core/RequesterResponderSupport.java       |  67 ++++++-
 .../core/DefaultRSocketClientTests.java       |   7 +
 .../java/io/rsocket/core/KeepAliveTest.java   |   6 +
 .../io/rsocket/core/RSocketLeaseTest.java     |  13 +-
 .../core/RSocketRequesterSubscribersTest.java |   9 +
 .../io/rsocket/core/RSocketRequesterTest.java |  62 ++++++
 .../io/rsocket/core/RSocketResponderTest.java |  75 +++++++-
 .../java/io/rsocket/core/RSocketTest.java     |  13 +-
 .../io/rsocket/core/SetupRejectionTest.java   |  10 +
 .../core/TestRequesterResponderSupport.java   |   4 +-
 .../ClientStreamingToServer.java              |  93 +++++++++
 .../GracefulShutdownIntegrationTest.java      | 177 ++++++++++++++++++
 18 files changed, 593 insertions(+), 20 deletions(-)
 create mode 100644 rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java
 create mode 100644 rsocket-examples/src/test/java/io/rsocket/integration/GracefulShutdownIntegrationTest.java

diff --git a/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java b/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java
index 420da66ba..b5070f1ee 100644
--- a/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java
+++ b/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java
@@ -5,6 +5,7 @@
 import io.rsocket.DuplexConnection;
 import io.rsocket.RSocket;
 import io.rsocket.frame.decoder.PayloadDecoder;
+import reactor.core.publisher.Sinks;
 import reactor.util.annotation.Nullable;
 
 public class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket {
@@ -27,7 +28,8 @@ public TestRequesterResponderSupport(
         PayloadDecoder.ZERO_COPY,
         connection,
         streamIdSupplier,
-        __ -> null);
+        __ -> null,
+        Sinks.empty());
     this.requesterLeaseTracker = requesterLeaseTracker;
   }
 
diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java
index b05241365..c0a5646d3 100644
--- a/rsocket-core/src/main/java/io/rsocket/RSocket.java
+++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java
@@ -87,6 +87,8 @@ default double availability() {
   @Override
   default void dispose() {}
 
+  default void disposeGracefully() {}
+
   @Override
   default boolean isDisposed() {
     return false;
diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
index de494c4e3..f8b10fc21 100644
--- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
+++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
@@ -655,10 +655,16 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
                                     requesterLeaseTracker = null;
                                   }
 
+                                  final Sinks.Empty<Void> requesterOnGracefulShutdownSink =
+                                      Sinks.unsafe().empty();
+                                  final Sinks.Empty<Void> responderOnGracefulShutdownSink =
+                                      Sinks.unsafe().empty();
                                   final Sinks.Empty<Void> requesterOnAllClosedSink =
                                       Sinks.unsafe().empty();
                                   final Sinks.Empty<Void> responderOnAllClosedSink =
                                       Sinks.unsafe().empty();
+                                  final Sinks.Empty<Void> requesterGracefulShutdownStartedSink =
+                                      Sinks.unsafe().empty();
 
                                   RSocket rSocketRequester =
                                       new RSocketRequester(
@@ -673,7 +679,12 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
                                           keepAliveHandler,
                                           interceptors::initRequesterRequestInterceptor,
                                           requesterLeaseTracker,
+                                          requesterGracefulShutdownStartedSink,
+                                          requesterOnGracefulShutdownSink,
                                           requesterOnAllClosedSink,
+                                          Mono.whenDelayError(
+                                              responderOnGracefulShutdownSink.asMono(),
+                                              requesterOnGracefulShutdownSink.asMono()),
                                           Mono.whenDelayError(
                                               responderOnAllClosedSink.asMono(),
                                               requesterOnAllClosedSink.asMono()));
@@ -725,7 +736,9 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
                                                                         leases.sender)
                                                         : interceptors
                                                             ::initResponderRequestInterceptor,
-                                                    responderOnAllClosedSink);
+                                                    responderOnGracefulShutdownSink,
+                                                    responderOnAllClosedSink,
+                                                    requesterGracefulShutdownStartedSink.asMono());
 
                                             return wrappedRSocketRequester;
                                           })
diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
index b8a9c00ff..85c562daa 100644
--- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
+++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
@@ -23,6 +23,7 @@
 import io.rsocket.DuplexConnection;
 import io.rsocket.Payload;
 import io.rsocket.RSocket;
+import io.rsocket.exceptions.ConnectionCloseException;
 import io.rsocket.exceptions.ConnectionErrorException;
 import io.rsocket.exceptions.Exceptions;
 import io.rsocket.frame.ErrorFrameCodec;
@@ -67,6 +68,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
 
   @Nullable private final RequesterLeaseTracker requesterLeaseTracker;
 
+  private final Sinks.Empty<Void> onGracefulShutdownStartedSink;
   private final Sinks.Empty<Void> onThisSideClosedSink;
   private final Mono<Void> onAllClosed;
   private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
@@ -83,7 +85,10 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
       @Nullable KeepAliveHandler keepAliveHandler,
       Function<RSocket, RequestInterceptor> requestInterceptorFunction,
       @Nullable RequesterLeaseTracker requesterLeaseTracker,
+      Sinks.Empty<Void> onGracefulShutdownStartedSink,
+      Sinks.Empty<Void> onGracefulShutdownSink,
       Sinks.Empty<Void> onThisSideClosedSink,
+      Mono<Void> onGracefulShutdownDone,
       Mono<Void> onAllClosed) {
     super(
         mtu,
@@ -92,14 +97,17 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
         payloadDecoder,
         connection,
         streamIdSupplier,
-        requestInterceptorFunction);
+        requestInterceptorFunction,
+        onGracefulShutdownSink);
 
     this.requesterLeaseTracker = requesterLeaseTracker;
+    this.onGracefulShutdownStartedSink = onGracefulShutdownStartedSink;
     this.onThisSideClosedSink = onThisSideClosedSink;
     this.onAllClosed = onAllClosed;
 
     // DO NOT Change the order here. The Send processor must be subscribed to before receiving
     connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown);
+    onGracefulShutdownDone.subscribe(null, null, connection::dispose);
 
     connection.receive().subscribe(this::handleIncomingFrames, e -> {});
 
@@ -200,6 +208,17 @@ public void dispose() {
     getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed"));
   }
 
+  @Override
+  public void disposeGracefully() {
+    getDuplexConnection()
+        .sendFrame(
+            0,
+            ErrorFrameCodec.encode(
+                getAllocator(), 0, new ConnectionCloseException("Graceful Shutdown")));
+    this.onGracefulShutdownStartedSink.tryEmitEmpty();
+    super.terminate();
+  }
+
   @Override
   public boolean isDisposed() {
     return terminationError != null;
@@ -352,6 +371,12 @@ private void tryTerminate(Supplier<Throwable> errorSupplier) {
     }
     if (terminationError == null) {
       Throwable e = errorSupplier.get();
+
+      if (e instanceof ConnectionCloseException) {
+        this.onGracefulShutdownStartedSink.tryEmitEmpty();
+        super.terminate();
+        return;
+      }
       if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
         terminate(e);
       } else {
@@ -418,7 +443,7 @@ private void terminate(Throwable e) {
       requesterLeaseTracker.dispose(e);
     }
 
-    final Collection<FrameHandler> activeStreamsCopy;
+    final Collection<FrameHandler> activeStreamsCopy; // in case of graceful shut down is empty
     synchronized (this) {
       final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
       activeStreamsCopy = new ArrayList<>(activeStreams.values());
diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
index 50c5ba54c..05f33043f 100644
--- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
+++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
@@ -73,7 +73,9 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
       int maxFrameLength,
       int maxInboundPayloadSize,
       Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
-      Sinks.Empty<Void> onThisSideClosedSink) {
+      Sinks.Empty<Void> onGracefulShutdownSink,
+      Sinks.Empty<Void> onThisSideClosedSink,
+      Mono<Void> onRequesterGracefulShutdownStarted) {
     super(
         mtu,
         maxFrameLength,
@@ -81,7 +83,8 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
         payloadDecoder,
         connection,
         null,
-        requestInterceptorFunction);
+        requestInterceptorFunction,
+        onGracefulShutdownSink);
 
     this.requestHandler = requestHandler;
 
@@ -92,12 +95,18 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
         .onClose()
         .subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
 
+    onRequesterGracefulShutdownStarted.subscribe(null, null, this::onGracefulShutdownStarted);
+
     connection.receive().subscribe(this::handleFrame, e -> {});
   }
 
+  private void onGracefulShutdownStarted() {
+    super.terminate();
+    requestHandler.disposeGracefully();
+  }
+
   private void tryTerminateOnConnectionError(Throwable e) {
     if (LOGGER.isDebugEnabled()) {
-
       LOGGER.debug("Try terminate connection on responder side");
     }
     tryTerminate(() -> e);
diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
index e969c39d2..77f95ed25 100644
--- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
+++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
@@ -438,8 +438,11 @@ private Mono<Void> acceptSetup(
             requesterLeaseTracker = null;
           }
 
+          final Sinks.Empty<Void> requesterOnGracefulShutdownSink = Sinks.unsafe().empty();
+          final Sinks.Empty<Void> responderOnGracefulShutdownSink = Sinks.unsafe().empty();
           final Sinks.Empty<Void> requesterOnAllClosedSink = Sinks.unsafe().empty();
           final Sinks.Empty<Void> responderOnAllClosedSink = Sinks.unsafe().empty();
+          final Sinks.Empty<Void> requesterGracefulShutdownStartedSink = Sinks.unsafe().empty();
 
           RSocket rSocketRequester =
               new RSocketRequester(
@@ -454,7 +457,12 @@ private Mono<Void> acceptSetup(
                   keepAliveHandler,
                   interceptors::initRequesterRequestInterceptor,
                   requesterLeaseTracker,
+                  requesterGracefulShutdownStartedSink,
+                  requesterOnGracefulShutdownSink,
                   requesterOnAllClosedSink,
+                  Mono.whenDelayError(
+                      responderOnGracefulShutdownSink.asMono(),
+                      requesterOnGracefulShutdownSink.asMono()),
                   Mono.whenDelayError(
                       responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono()));
 
@@ -495,7 +503,9 @@ wrappedDuplexConnection, rejectedSetupError(err)))
                                     interceptors.initResponderRequestInterceptor(
                                         rSocket, (RequestInterceptor) leases.sender)
                                 : interceptors::initResponderRequestInterceptor,
-                            responderOnAllClosedSink);
+                            responderOnGracefulShutdownSink,
+                            responderOnAllClosedSink,
+                            requesterGracefulShutdownStartedSink.asMono());
                   })
               .doFinally(signalType -> setupPayload.release())
               .then();
diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java b/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java
index bea7dc1aa..daee8dcb6 100644
--- a/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java
+++ b/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java
@@ -5,10 +5,12 @@
 import io.netty.util.collection.IntObjectMap;
 import io.rsocket.DuplexConnection;
 import io.rsocket.RSocket;
+import io.rsocket.exceptions.CanceledException;
 import io.rsocket.frame.decoder.PayloadDecoder;
 import io.rsocket.plugins.RequestInterceptor;
 import java.util.Objects;
 import java.util.function.Function;
+import reactor.core.publisher.Sinks;
 import reactor.util.annotation.Nullable;
 
 class RequesterResponderSupport {
@@ -19,11 +21,15 @@ class RequesterResponderSupport {
   private final PayloadDecoder payloadDecoder;
   private final ByteBufAllocator allocator;
   private final DuplexConnection connection;
+  private final Sinks.Empty<Void> onGracefulShutdownSink;
   @Nullable private final RequestInterceptor requestInterceptor;
 
   @Nullable final StreamIdSupplier streamIdSupplier;
   final IntObjectMap<FrameHandler> activeStreams;
 
+  boolean terminating;
+  boolean terminated;
+
   public RequesterResponderSupport(
       int mtu,
       int maxFrameLength,
@@ -31,7 +37,8 @@ public RequesterResponderSupport(
       PayloadDecoder payloadDecoder,
       DuplexConnection connection,
       @Nullable StreamIdSupplier streamIdSupplier,
-      Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction) {
+      Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
+      Sinks.Empty<Void> onGracefulShutdownSink) {
 
     this.activeStreams = new IntObjectHashMap<>();
     this.mtu = mtu;
@@ -41,6 +48,7 @@ public RequesterResponderSupport(
     this.allocator = connection.alloc();
     this.streamIdSupplier = streamIdSupplier;
     this.connection = connection;
+    this.onGracefulShutdownSink = onGracefulShutdownSink;
     this.requestInterceptor = requestInterceptorFunction.apply((RSocket) this);
   }
 
@@ -88,6 +96,9 @@ public int getNextStreamId() {
     final StreamIdSupplier streamIdSupplier = this.streamIdSupplier;
     if (streamIdSupplier != null) {
       synchronized (this) {
+        if (this.terminating) {
+          throw new CanceledException("Disposed");
+        }
         return streamIdSupplier.nextStreamId(this.activeStreams);
       }
     } else {
@@ -107,6 +118,10 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
     if (streamIdSupplier != null) {
       final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
       synchronized (this) {
+        if (this.terminating) {
+          throw new CanceledException("Disposed");
+        }
+
         final int streamId = streamIdSupplier.nextStreamId(activeStreams);
 
         activeStreams.put(streamId, frameHandler);
@@ -119,6 +134,11 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
   }
 
   public synchronized boolean add(int streamId, FrameHandler frameHandler) {
+    if (this.terminating) {
+      throw new CanceledException(
+          "This RSocket is either disposed or disposing, and no longer accepting new requests");
+    }
+
     final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
     // copy of Map.putIfAbsent(key, value) without `streamId` boxing
     final FrameHandler previousHandler = activeStreams.get(streamId);
@@ -148,14 +168,45 @@ public synchronized FrameHandler get(int streamId) {
    * @return {@code true} if there is {@link FrameHandler} for the given {@code streamId} and the
    *     instance equals to the passed one
    */
-  public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
-    final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
-    // copy of Map.remove(key, value) without `streamId` boxing
-    final FrameHandler curValue = activeStreams.get(streamId);
-    if (!Objects.equals(curValue, frameHandler)) {
-      return false;
+  public boolean remove(int streamId, FrameHandler frameHandler) {
+    final boolean terminated;
+    synchronized (this) {
+      final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
+      // copy of Map.remove(key, value) without `streamId` boxing
+      final FrameHandler curValue = activeStreams.get(streamId);
+      if (!Objects.equals(curValue, frameHandler)) {
+        return false;
+      }
+      activeStreams.remove(streamId);
+      if (this.terminating && activeStreams.size() == 0) {
+        terminated = true;
+        this.terminated = true;
+      } else {
+        terminated = false;
+      }
+    }
+
+    if (terminated) {
+      onGracefulShutdownSink.tryEmitEmpty();
     }
-    activeStreams.remove(streamId);
     return true;
   }
+
+  public void terminate() {
+    final boolean terminated;
+    synchronized (this) {
+      this.terminating = true;
+
+      if (activeStreams.size() == 0) {
+        terminated = true;
+        this.terminated = true;
+      } else {
+        terminated = false;
+      }
+    }
+
+    if (terminated) {
+      onGracefulShutdownSink.tryEmitEmpty();
+    }
+  }
 }
diff --git a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java
index 84576e6ce..795555e68 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java
@@ -721,6 +721,8 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
     protected Runnable delayer;
     protected Sinks.One<RSocket> producer;
 
+    protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
+    protected Sinks.Empty<Void> thisGracefulShutdownSink;
     protected Sinks.Empty<Void> thisClosedSink;
 
     @Override
@@ -740,6 +742,8 @@ protected void doInit() {
 
     @Override
     protected RSocket newRSocket() {
+      this.onGracefulShutdownStartedSink = Sinks.empty();
+      this.thisGracefulShutdownSink = Sinks.empty();
       this.thisClosedSink = Sinks.empty();
       return new RSocketRequester(
           connection,
@@ -753,7 +757,10 @@ protected RSocket newRSocket() {
           null,
           __ -> null,
           null,
+          onGracefulShutdownStartedSink,
+          thisGracefulShutdownSink,
           thisClosedSink,
+          thisGracefulShutdownSink.asMono(),
           thisClosedSink.asMono());
     }
   }
diff --git a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java
index 5be59235c..b1b01ec58 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java
@@ -84,7 +84,10 @@ static RSocketState requester(int tickPeriod, int timeout) {
             new DefaultKeepAliveHandler(),
             r -> null,
             null,
+            Sinks.empty(),
+            Sinks.empty(),
             empty,
+            Sinks.<Void>empty().asMono(),
             empty.asMono());
     return new RSocketState(rSocket, allocator, connection, empty);
   }
@@ -117,7 +120,10 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) {
                 Mockito.mock(ResumeStateHolder.class)),
             __ -> null,
             null,
+            Sinks.empty(),
+            Sinks.empty(),
             onClose,
+            Sinks.<Void>empty().asMono(),
             onClose.asMono());
     return new ResumableRSocketState(rSocket, connection, resumableConnection, onClose, allocator);
   }
diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java
index a461833d3..824169b23 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java
@@ -92,6 +92,9 @@ class RSocketLeaseTest {
 
   private Sinks.Many<Lease> leaseSender = Sinks.many().multicast().onBackpressureBuffer();
   private RequesterLeaseTracker requesterLeaseTracker;
+  protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
+  protected Sinks.Empty<Void> otherGracefulShutdownSink;
+  protected Sinks.Empty<Void> thisGracefulShutdownSink;
   protected Sinks.Empty<Void> thisClosedSink;
   protected Sinks.Empty<Void> otherClosedSink;
 
@@ -103,6 +106,9 @@ void setUp() {
     connection = new TestDuplexConnection(byteBufAllocator);
     requesterLeaseTracker = new RequesterLeaseTracker(TAG, 0);
     responderLeaseTracker = new ResponderLeaseTracker(TAG, connection, () -> leaseSender.asFlux());
+    this.onGracefulShutdownStartedSink = Sinks.empty();
+    this.otherGracefulShutdownSink = Sinks.empty();
+    this.thisGracefulShutdownSink = Sinks.empty();
     this.thisClosedSink = Sinks.empty();
     this.otherClosedSink = Sinks.empty();
 
@@ -121,7 +127,10 @@ void setUp() {
             null,
             __ -> null,
             requesterLeaseTracker,
+            onGracefulShutdownStartedSink,
+            thisGracefulShutdownSink,
             thisClosedSink,
+            otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
             otherClosedSink.asMono().and(thisClosedSink.asMono()));
 
     mockRSocketHandler = mock(RSocket.class);
@@ -190,7 +199,9 @@ protected void hookOnError(Throwable throwable) {
             FRAME_LENGTH_MASK,
             Integer.MAX_VALUE,
             __ -> null,
-            otherClosedSink);
+            otherGracefulShutdownSink,
+            otherClosedSink,
+            onGracefulShutdownStartedSink.asMono());
   }
 
   @AfterEach
diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java
index 01eb998c7..2bafe1be3 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java
@@ -63,6 +63,9 @@ class RSocketRequesterSubscribersTest {
   private LeaksTrackingByteBufAllocator allocator;
   private RSocket rSocketRequester;
   private TestDuplexConnection connection;
+  protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
+  protected Sinks.Empty<Void> otherGracefulShutdownSink;
+  protected Sinks.Empty<Void> thisGracefulShutdownSink;
   protected Sinks.Empty<Void> thisClosedSink;
   protected Sinks.Empty<Void> otherClosedSink;
 
@@ -75,6 +78,9 @@ void tearDownAndCheckNoLeaks() {
   void setUp() {
     allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
     connection = new TestDuplexConnection(allocator);
+    this.onGracefulShutdownStartedSink = Sinks.empty();
+    this.otherGracefulShutdownSink = Sinks.empty();
+    this.thisGracefulShutdownSink = Sinks.empty();
     this.thisClosedSink = Sinks.empty();
     this.otherClosedSink = Sinks.empty();
     rSocketRequester =
@@ -90,7 +96,10 @@ void setUp() {
             null,
             __ -> null,
             null,
+            onGracefulShutdownStartedSink,
+            thisGracefulShutdownSink,
             thisClosedSink,
+            otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
             otherClosedSink.asMono().and(thisClosedSink.asMono()));
   }
 
diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java
index a1199f698..27e132665 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java
@@ -82,6 +82,7 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Stream;
+import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Assumptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -96,6 +97,7 @@
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
+import reactor.core.Disposable;
 import reactor.core.Scannable;
 import reactor.core.publisher.BaseSubscriber;
 import reactor.core.publisher.Flux;
@@ -1469,13 +1471,70 @@ public void testWorkaround959(String type) {
     }
   }
 
+  @Test
+  public void testDisposeGracefully() {
+    System.out.println(
+        FrameHeaderCodec.frameType(
+            Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump("000000012400"))));
+    final RSocketRequester rSocketRequester = rule.socket;
+    final AssertSubscriber<Void> onGracefulShutdownSubscriber =
+        rule.thisGracefulShutdownSink.asMono().subscribeWith(AssertSubscriber.create());
+    final AssertSubscriber<Void> onCloseSubscriber =
+        rSocketRequester.onClose().subscribeWith(new AssertSubscriber<>());
+
+    final Disposable stream = rSocketRequester.requestStream(EmptyPayload.INSTANCE).subscribe();
+
+    FrameAssert.assertThat(rule.connection.awaitFrame())
+        .typeOf(REQUEST_STREAM)
+        .hasClientSideStreamId()
+        .hasNoLeaks();
+
+    Assertions.assertThat(rSocketRequester.isDisposed()).isFalse();
+
+    rSocketRequester.disposeGracefully();
+    Assertions.assertThat(rSocketRequester.isDisposed()).isFalse();
+    onGracefulShutdownSubscriber.assertNotTerminated();
+
+    FrameAssert.assertThat(rule.connection.awaitFrame())
+        .typeOf(FrameType.ERROR)
+        .hasStreamIdZero()
+        .hasData("Graceful Shutdown")
+        .hasNoLeaks();
+
+    stream.dispose();
+    Assertions.assertThat(rSocketRequester.isDisposed()).isFalse();
+    Assertions.assertThat(rule.connection.isDisposed()).isFalse();
+    onGracefulShutdownSubscriber.assertTerminated();
+
+    FrameAssert.assertThat(rule.connection.awaitFrame())
+        .typeOf(CANCEL)
+        .hasClientSideStreamId()
+        .hasNoLeaks();
+
+    rule.otherGracefulShutdownSink.tryEmitEmpty();
+    Assertions.assertThat(rSocketRequester.isDisposed()).isTrue();
+    Assertions.assertThat(rule.connection.isDisposed()).isTrue();
+    onCloseSubscriber.assertNotTerminated();
+
+    rule.otherClosedSink.tryEmitEmpty();
+    onCloseSubscriber.assertTerminated();
+
+    rule.assertHasNoLeaks();
+  }
+
   public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
 
+    protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
+    protected Sinks.Empty<Void> otherGracefulShutdownSink;
+    protected Sinks.Empty<Void> thisGracefulShutdownSink;
     protected Sinks.Empty<Void> thisClosedSink;
     protected Sinks.Empty<Void> otherClosedSink;
 
     @Override
     protected RSocketRequester newRSocket() {
+      this.onGracefulShutdownStartedSink = Sinks.empty();
+      this.otherGracefulShutdownSink = Sinks.empty();
+      this.thisGracefulShutdownSink = Sinks.empty();
       this.thisClosedSink = Sinks.empty();
       this.otherClosedSink = Sinks.empty();
       return new RSocketRequester(
@@ -1490,7 +1549,10 @@ protected RSocketRequester newRSocket() {
           null,
           (__) -> null,
           null,
+          onGracefulShutdownStartedSink,
+          thisGracefulShutdownSink,
           thisClosedSink,
+          otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
           otherClosedSink.asMono().and(thisClosedSink.asMono()));
     }
 
diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java
index 4f689e396..e48e5a2f0 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java
@@ -67,12 +67,14 @@
 import io.rsocket.util.ByteBufPayload;
 import io.rsocket.util.DefaultPayload;
 import io.rsocket.util.EmptyPayload;
+import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
+import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Assumptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -1179,12 +1181,79 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
     rule.assertHasNoLeaks();
   }
 
+  @Test
+  void testGracefulShutdown() {
+    final AssertSubscriber<Void> onCloseSubscriber = AssertSubscriber.create();
+    final AssertSubscriber<Void> onGracefulShutdownSubscriber = AssertSubscriber.create();
+    final Sinks.Empty<Void> onDisposeGracefullySink = Sinks.unsafe().empty();
+
+    boolean[] disposed = new boolean[] {false};
+    boolean[] disposedGracefully = new boolean[] {false};
+
+    rule.setAcceptingSocket(
+        new RSocket() {
+
+          @Override
+          public Flux<Payload> requestStream(Payload payload) {
+            return Flux.interval(Duration.ofMillis(100))
+                .takeUntilOther(onDisposeGracefullySink.asMono())
+                .map(tick -> ByteBufPayload.create(String.valueOf(tick)));
+          }
+
+          @Override
+          public void dispose() {
+            disposed[0] = true;
+          }
+
+          @Override
+          public void disposeGracefully() {
+            disposedGracefully[0] = true;
+          }
+        });
+
+    rule.connection.addToReceivedBuffer(
+        RequestStreamFrameCodec.encode(
+            rule.allocator, 1, false, Long.MAX_VALUE, null, Unpooled.EMPTY_BUFFER));
+
+    rule.onCloseSink.asMono().subscribe(onCloseSubscriber);
+    rule.onGracefulShutdownSink.asMono().subscribe(onGracefulShutdownSubscriber);
+
+    rule.onGracefulShutdownStartedSink.tryEmitEmpty();
+    Assertions.assertThat(disposed[0]).isFalse();
+    Assertions.assertThat(disposedGracefully[0]).isTrue();
+    Assertions.assertThat(rule.connection.isDisposed()).isFalse();
+    onCloseSubscriber.assertNotTerminated();
+    onGracefulShutdownSubscriber.assertNotTerminated();
+
+    onDisposeGracefullySink.tryEmitEmpty();
+    Assertions.assertThat(disposed[0]).isFalse();
+    Assertions.assertThat(disposedGracefully[0]).isTrue();
+    Assertions.assertThat(rule.connection.isDisposed()).isFalse();
+    onCloseSubscriber.assertNotTerminated();
+    onGracefulShutdownSubscriber.assertTerminated();
+
+    ByteBuf possibleCompleteFrame = rule.connection.pollFrame();
+
+    if (possibleCompleteFrame != null) {
+      FrameAssert.assertThat(possibleCompleteFrame).typeOf(COMPLETE).hasNoLeaks();
+    }
+
+    rule.connection.dispose();
+    Assertions.assertThat(disposed[0]).isTrue();
+    Assertions.assertThat(disposedGracefully[0]).isTrue();
+    Assertions.assertThat(rule.connection.isDisposed()).isTrue();
+    onCloseSubscriber.assertTerminated();
+    onGracefulShutdownSubscriber.assertTerminated();
+  }
+
   public static class ServerSocketRule extends AbstractSocketRule<RSocketResponder> {
 
     private RSocket acceptingSocket;
     private volatile int prefetch;
     private RequestInterceptor requestInterceptor;
+    protected Sinks.Empty<Void> onGracefulShutdownSink;
     protected Sinks.Empty<Void> onCloseSink;
+    protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
 
     @Override
     protected void doInit() {
@@ -1221,7 +1290,9 @@ public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) {
 
     @Override
     protected RSocketResponder newRSocket() {
+      onGracefulShutdownSink = Sinks.empty();
       onCloseSink = Sinks.empty();
+      onGracefulShutdownStartedSink = Sinks.empty();
       return new RSocketResponder(
           connection,
           acceptingSocket,
@@ -1231,7 +1302,9 @@ protected RSocketResponder newRSocket() {
           maxFrameLength,
           maxInboundPayloadSize,
           __ -> requestInterceptor,
-          onCloseSink);
+          onGracefulShutdownSink,
+          onCloseSink,
+          onGracefulShutdownStartedSink.asMono());
     }
 
     private void sendRequest(int streamId, FrameType frameType) {
diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java
index e01e6ebdc..43fb89586 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java
@@ -515,6 +515,9 @@ public static class SocketRule {
     private RSocket requestAcceptor;
 
     private LeaksTrackingByteBufAllocator allocator;
+    protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
+    protected Sinks.Empty<Void> otherGracefulShutdownSink;
+    protected Sinks.Empty<Void> thisGracefulShutdownSink;
     protected Sinks.Empty<Void> thisClosedSink;
     protected Sinks.Empty<Void> otherClosedSink;
 
@@ -527,6 +530,9 @@ public void init() {
       serverProcessor = Sinks.many().multicast().directBestEffort();
       clientProcessor = Sinks.many().multicast().directBestEffort();
 
+      this.onGracefulShutdownStartedSink = Sinks.empty();
+      this.otherGracefulShutdownSink = Sinks.empty();
+      this.thisGracefulShutdownSink = Sinks.empty();
       this.thisClosedSink = Sinks.empty();
       this.otherClosedSink = Sinks.empty();
 
@@ -578,7 +584,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
               FRAME_LENGTH_MASK,
               Integer.MAX_VALUE,
               __ -> null,
-              otherClosedSink);
+              otherGracefulShutdownSink,
+              otherClosedSink,
+              onGracefulShutdownStartedSink.asMono());
 
       crs =
           new RSocketRequester(
@@ -593,7 +601,10 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
               null,
               __ -> null,
               null,
+              onGracefulShutdownStartedSink,
+              thisGracefulShutdownSink,
               thisClosedSink,
+              otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
               otherClosedSink.asMono().and(thisClosedSink.asMono()));
     }
 
diff --git a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java
index 87c3a865f..cbfbccd77 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java
@@ -71,6 +71,8 @@ void requesterStreamsTerminatedOnZeroErrorFrame() {
     LeaksTrackingByteBufAllocator allocator =
         LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
     TestDuplexConnection conn = new TestDuplexConnection(allocator);
+    Sinks.Empty<Void> onGracefulShutdownStartedSink = Sinks.empty();
+    Sinks.Empty<Void> onGracefulShutdownSink = Sinks.empty();
     Sinks.Empty<Void> onThisSideClosedSink = Sinks.empty();
 
     RSocketRequester rSocket =
@@ -86,7 +88,10 @@ void requesterStreamsTerminatedOnZeroErrorFrame() {
             null,
             __ -> null,
             null,
+            onGracefulShutdownStartedSink,
+            onGracefulShutdownSink,
             onThisSideClosedSink,
+            onGracefulShutdownSink.asMono(),
             onThisSideClosedSink.asMono());
 
     String errorMsg = "error";
@@ -114,6 +119,8 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() {
     LeaksTrackingByteBufAllocator allocator =
         LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
     TestDuplexConnection conn = new TestDuplexConnection(allocator);
+    Sinks.Empty<Void> onGracefulShutdownStartedSink = Sinks.empty();
+    Sinks.Empty<Void> onGracefulShutdownSink = Sinks.empty();
     Sinks.Empty<Void> onThisSideClosedSink = Sinks.empty();
     RSocketRequester rSocket =
         new RSocketRequester(
@@ -128,7 +135,10 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() {
             null,
             __ -> null,
             null,
+            onGracefulShutdownStartedSink,
+            onGracefulShutdownSink,
             onThisSideClosedSink,
+            onGracefulShutdownSink.asMono(),
             onThisSideClosedSink.asMono());
 
     conn.addToReceivedBuffer(
diff --git a/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java b/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java
index e282d72d5..8fb8cd37e 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.ThreadLocalRandom;
 import org.assertj.core.api.Assertions;
 import reactor.core.Exceptions;
+import reactor.core.publisher.Sinks;
 import reactor.util.annotation.Nullable;
 
 final class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket {
@@ -58,7 +59,8 @@ final class TestRequesterResponderSupport extends RequesterResponderSupport impl
         PayloadDecoder.ZERO_COPY,
         connection,
         streamIdSupplier,
-        (__) -> requestInterceptor);
+        (__) -> requestInterceptor,
+        Sinks.empty());
     this.error = error;
   }
 
diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java
new file mode 100644
index 000000000..28eb65adf
--- /dev/null
+++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2015-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.examples.transport.tcp.gracefulshutdown;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.core.RSocketServer;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.DefaultPayload;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public final class ClientStreamingToServer {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClientStreamingToServer.class);
+
+  public static void main(String[] args) throws InterruptedException {
+    RSocketServer.create(
+            (setup, sendingSocket) -> {
+              sendingSocket.disposeGracefully();
+
+              return Mono.just(
+                  new RSocket() {
+                    @Override
+                    public Flux<Payload> requestStream(Payload payload) {
+                      return Flux.interval(Duration.ofMillis(100))
+                          .map(aLong -> DefaultPayload.create("Interval: " + aLong));
+                    }
+
+                    @Override
+                    public void disposeGracefully() {}
+                  });
+            })
+        .bindNow(TcpServerTransport.create("localhost", 7000));
+
+    RSocket socket =
+        RSocketConnector.create()
+            .acceptor(
+                SocketAcceptor.with(
+                    new RSocket() {
+                      @Override
+                      public void disposeGracefully() {}
+                    }))
+            .setupPayload(DefaultPayload.create("test", "test"))
+            .connect(TcpClientTransport.create("localhost", 7000))
+            .block();
+
+    AtomicInteger counter = new AtomicInteger();
+
+    final Payload payload = DefaultPayload.create("Hello");
+    socket
+        .requestStream(payload)
+        .map(Payload::getDataUtf8)
+        .doOnNext(
+            msg -> {
+              logger.debug(msg);
+              counter.incrementAndGet();
+            })
+        .subscribe();
+
+    logger.debug("dispose gracefully");
+    socket.disposeGracefully();
+    //
+    //    Mono.delay(Duration.ofSeconds(10))
+    //        .doFinally((__) -> socket.dispose())
+    //        .subscribe();
+
+    socket.onClose().block();
+
+    logger.debug("Observe " + counter.get() + " of 100 events");
+  }
+}
diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/GracefulShutdownIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/GracefulShutdownIntegrationTest.java
new file mode 100644
index 000000000..9d8ca96e0
--- /dev/null
+++ b/rsocket-examples/src/test/java/io/rsocket/integration/GracefulShutdownIntegrationTest.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2015-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.integration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.core.RSocketServer;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.transport.netty.server.CloseableChannel;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.ByteBufPayload;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
+import reactor.core.publisher.Sinks;
+import reactor.test.StepVerifier;
+
+public class GracefulShutdownIntegrationTest {
+  private RSocket handler;
+  private Sinks.Empty<Void> clientGracefulShutdownSink;
+  private Sinks.Empty<Void> serverGracefulShutdownSink;
+  private Disposable stream;
+
+  private CloseableChannel server;
+
+  @BeforeEach
+  public void startup() {
+    server =
+        RSocketServer.create(
+                (setup, sendingSocket) -> {
+                  stream =
+                      sendingSocket
+                          .requestStream(ByteBufPayload.create("REQUEST", "META"))
+                          .subscribe();
+                  return Mono.just(handler);
+                })
+            .bind(TcpServerTransport.create("localhost", 0))
+            .block();
+  }
+
+  @AfterEach
+  public void cleanup() {
+    server.dispose();
+  }
+
+  @Test
+  @Timeout(15_000L)
+  public void testCompleteWithoutNext() {
+    clientGracefulShutdownSink = Sinks.unsafe().empty();
+    serverGracefulShutdownSink = Sinks.unsafe().empty();
+    final AtomicBoolean gracefullyDisposedServer = new AtomicBoolean();
+    final AtomicBoolean gracefullyDisposedClient = new AtomicBoolean();
+    final AtomicBoolean disposedServer = new AtomicBoolean();
+    final AtomicBoolean disposedClient = new AtomicBoolean();
+    final Sinks.Empty<Void> requestHandled = Sinks.unsafe().empty();
+    handler =
+        new RSocket() {
+          @Override
+          public Flux<Payload> requestStream(Payload payload) {
+            payload.release();
+
+            requestHandled.tryEmitEmpty();
+            return Flux.<Payload>never().takeUntilOther(serverGracefulShutdownSink.asMono());
+          }
+
+          @Override
+          public void dispose() {
+            disposedServer.set(true);
+          }
+
+          @Override
+          public void disposeGracefully() {
+            gracefullyDisposedServer.set(true);
+          }
+        };
+    RSocket client =
+        RSocketConnector.create()
+            .acceptor(
+                SocketAcceptor.with(
+                    new RSocket() {
+                      @Override
+                      public Flux<Payload> requestStream(Payload payload) {
+                        payload.release();
+                        return Flux.<Payload>never()
+                            .takeUntilOther(clientGracefulShutdownSink.asMono());
+                      }
+
+                      @Override
+                      public void dispose() {
+                        disposedClient.set(true);
+                      }
+
+                      @Override
+                      public void disposeGracefully() {
+                        gracefullyDisposedClient.set(true);
+                      }
+                    }))
+            .connect(TcpClientTransport.create(server.address()))
+            .block();
+
+    AtomicReference<SignalType> terminalSignal = new AtomicReference<>();
+
+    StepVerifier clientRequestVerifier =
+        client
+            .requestStream(ByteBufPayload.create("REQUEST", "META"))
+            .onErrorResume(
+                t -> Mono.empty()) // FIXME: onComplete frame may not be delivered on time
+            .doFinally(terminalSignal::set)
+            .as(StepVerifier::create)
+            .expectSubscription()
+            .expectComplete()
+            .verifyLater();
+
+    requestHandled.asMono().block(Duration.ofSeconds(5));
+
+    assertThat(gracefullyDisposedServer).isFalse();
+    assertThat(gracefullyDisposedClient).isFalse();
+
+    client.disposeGracefully();
+
+    assertThat(client.isDisposed()).isFalse();
+    assertThat(gracefullyDisposedServer)
+        .as("gracefullyDisposedServer after disposeGracefully")
+        .isTrue();
+    assertThat(gracefullyDisposedClient)
+        .as("gracefullyDisposedClient after disposeGracefully")
+        .isTrue();
+
+    assertThat(disposedServer).as("disposedServer after disposeGracefully").isFalse();
+    assertThat(disposedClient).as("disposedClient after disposeGracefully").isFalse();
+    assertThat(terminalSignal).as("terminalSignal after disposeGracefully").hasValue(null);
+    assertThat(stream.isDisposed()).isFalse();
+
+    clientGracefulShutdownSink.tryEmitEmpty();
+
+    assertThat(client.isDisposed()).isFalse();
+    assertThat(terminalSignal)
+        .as("terminalSignal after clientGracefulShutdownSink.tryEmitEmpty")
+        .hasValue(null);
+    Awaitility.waitAtMost(Duration.ofSeconds(5)).until(() -> stream.isDisposed());
+
+    serverGracefulShutdownSink.tryEmitEmpty();
+
+    clientRequestVerifier.verify(Duration.ofSeconds(5));
+    assertThat(client.isDisposed()).isTrue();
+    assertThat(stream.isDisposed()).isTrue();
+    Awaitility.waitAtMost(Duration.ofSeconds(5)).untilTrue(disposedServer);
+    Awaitility.waitAtMost(Duration.ofSeconds(5)).untilTrue(disposedClient);
+  }
+}