Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be more deffensive with the StreamSubscriber internal state #3545

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

BalmungSan
Copy link
Contributor

I think this fixes the flaky flow interop tests: #3541 #3542
Related to #3449

I think the main change is ensuring we reset the StreamSubscriber state BEFORE completing the callback with the Chunk.
But, I also made sure we reset the state in any place where we complete a callback, even with exceptions, just in case.

c.c. @armanbilge

@BalmungSan BalmungSan force-pushed the fix-flow-stream-subscriber branch from ba25fd6 to bdb2835 Compare March 10, 2025 15:27
Comment on lines -69 to +71
private var inOnNextLoop: Boolean = _
private var inOnNextLoop: Boolean = false
private var buffer: Array[Any] = null
private var index: Int = _
private var index: Int = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, these are equivalent, but just in case, I made sure inOnNextLoop starts as false, the index shouldn't matter but just for consistency; I also thought of using -1 but that feels confusing.

Comment on lines -191 to +194
cb.apply(Right(Some(Chunk.array(buffer))))
val chunk = Chunk.array(buffer)
inOnNextLoop = false
buffer = null
cb.apply(Right(Some(chunk)))
Copy link
Contributor Author

@BalmungSan BalmungSan Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset the state BEFORE publishing the Chunk.

Comment on lines -217 to +222
cb.apply(Left(ex))
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
cb.apply(Left(ex))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset the state BEFORE publishing the Exception.

Comment on lines 248 to 263
// We do the updates here,
// to ensure they happen after we have secured the state.
if (canceled) {
s.cancel()
inOnNextLoop = false
buffer = null
cb.apply(Right(None))
} else if (index == 0) {
inOnNextLoop = false
buffer = null
cb.apply(Right(None))
} else {
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index))))
// We do the updates here,
// to ensure they happen after we have secured the state.
val chunk = Chunk.array(buffer, offset = 0, length = index)
inOnNextLoop = false
buffer = null
cb.apply(Right(Some(chunk)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure we reset the state BEFORE anything else.

Comment on lines -286 to +307
s.cancel()
val ex = Left(new InvalidStateException(operation = "Received request", state))
otherCB.apply(ex)
cb.apply(ex)
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
s.cancel()
val ex = Left(new InvalidStateException(operation = "Received request", state))
otherCB.apply(ex)
cb.apply(ex)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset the state BEFORE publishing the Exception.

Comment on lines -209 to -212
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
buffer = null
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant.

Comment on lines +242 to +256
// We do the updates here,
// to ensure they happen after we have secured the state.
if (canceled) {
s.cancel()
inOnNextLoop = false
buffer = null
cb.apply(Right(None))
} else if (index == 0) {
} else if (buffer eq null) {
inOnNextLoop = false
cb.apply(Right(None))
} else {
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index))))
// We do the updates here,
// to ensure they happen after we have secured the state.
val chunk = Chunk.array(buffer, offset = 0, length = index)
inOnNextLoop = false
buffer = null
cb.apply(Right(Some(chunk)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset the state BEFORE publishing the Chunk (or lack of).

Comment on lines -246 to +249
} else if (index == 0) {
} else if (buffer eq null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than relying on the index, we simply check if the buffer hasn't been initialized.

Comment on lines -271 to -274
// We do the updates here,
// to ensure they happen after we have secured the state.
inOnNextLoop = false
index = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant.
The initial state is this, and finishing each loop leaves the state like this as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant