diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java b/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java index 56a19de9e6..2eb9cfa553 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java @@ -18,7 +18,6 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.IntConsumer; @@ -61,9 +60,7 @@ final class FluxReceive extends Flux implements Subscription, Disposable volatile IntConsumer receiverCancel; - volatile int once; - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once"); + boolean subscribedOnce; // Please note, in this specific case WIP is non-volatile since all operation that // involves work-in-progress pattern is within Netty Event-Loops which guarantees @@ -153,11 +150,12 @@ public void subscribe(CoreSubscriber s) { } final void startReceiver(CoreSubscriber s) { - if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + if (!subscribedOnce) { + subscribedOnce = true; if (log.isDebugEnabled()) { log.debug(format(channel, "{}: subscribing inbound receiver"), this); } - if (inboundDone && getPending() == 0) { + if ((inboundDone && getPending() == 0) || isCancelled()) { if (inboundError != null) { Operators.error(s, inboundError); return; diff --git a/reactor-netty-core/src/test/java/reactor/netty/channel/FluxReceiveTest.java b/reactor-netty-core/src/test/java/reactor/netty/channel/FluxReceiveTest.java new file mode 100644 index 0000000000..e1c531c73f --- /dev/null +++ b/reactor-netty-core/src/test/java/reactor/netty/channel/FluxReceiveTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.channel; + +import java.time.Duration; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.jupiter.api.Test; +import reactor.netty.NettyInbound; +import reactor.netty.NettyOutbound; +import reactor.test.subscriber.TestSubscriber; +import reactor.test.util.RaceTestUtils; + +public class FluxReceiveTest { + + @Test + void disposeAndSubscribeRaceTest() { + for (int i = 0; i < 100; i++) { + ChannelOperations operations = + new ChannelOperations<>(EmbeddedChannel::new, (connection, newState) -> { + }); + FluxReceive receive = new FluxReceive(operations); + TestSubscriber subscriber = TestSubscriber.create(); + RaceTestUtils.race(receive::dispose, () -> receive.subscribe(subscriber)); + + subscriber.block(Duration.ofSeconds(5)); + } + } +}