-
Notifications
You must be signed in to change notification settings - Fork 614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Be more deffensive with the StreamSubscriber
internal state
#3545
base: main
Are you sure you want to change the base?
Conversation
ba25fd6
to
bdb2835
Compare
private var inOnNextLoop: Boolean = _ | ||
private var inOnNextLoop: Boolean = false | ||
private var buffer: Array[Any] = null | ||
private var index: Int = _ | ||
private var index: Int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, these are equivalent, but just in case, I made sure inOnNextLoop
starts as false
, the index
shouldn't matter but just for consistency; I also thought of using -1
but that feels confusing.
cb.apply(Right(Some(Chunk.array(buffer)))) | ||
val chunk = Chunk.array(buffer) | ||
inOnNextLoop = false | ||
buffer = null | ||
cb.apply(Right(Some(chunk))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset the state BEFORE publishing the Chunk
.
cb.apply(Left(ex)) | ||
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
inOnNextLoop = false | ||
buffer = null | ||
cb.apply(Left(ex)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset the state BEFORE publishing the Exception
.
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
if (canceled) { | ||
s.cancel() | ||
inOnNextLoop = false | ||
buffer = null | ||
cb.apply(Right(None)) | ||
} else if (index == 0) { | ||
inOnNextLoop = false | ||
buffer = null | ||
cb.apply(Right(None)) | ||
} else { | ||
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index)))) | ||
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
val chunk = Chunk.array(buffer, offset = 0, length = index) | ||
inOnNextLoop = false | ||
buffer = null | ||
cb.apply(Right(Some(chunk))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure we reset the state BEFORE anything else.
s.cancel() | ||
val ex = Left(new InvalidStateException(operation = "Received request", state)) | ||
otherCB.apply(ex) | ||
cb.apply(ex) | ||
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
inOnNextLoop = false | ||
buffer = null | ||
s.cancel() | ||
val ex = Left(new InvalidStateException(operation = "Received request", state)) | ||
otherCB.apply(ex) | ||
cb.apply(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset the state BEFORE publishing the Exception
.
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
inOnNextLoop = false | ||
buffer = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant.
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
if (canceled) { | ||
s.cancel() | ||
inOnNextLoop = false | ||
buffer = null | ||
cb.apply(Right(None)) | ||
} else if (index == 0) { | ||
} else if (buffer eq null) { | ||
inOnNextLoop = false | ||
cb.apply(Right(None)) | ||
} else { | ||
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index)))) | ||
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
val chunk = Chunk.array(buffer, offset = 0, length = index) | ||
inOnNextLoop = false | ||
buffer = null | ||
cb.apply(Right(Some(chunk))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset the state BEFORE publishing the Chunk
(or lack of).
} else if (index == 0) { | ||
} else if (buffer eq null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than relying on the index
, we simply check if the buffer
hasn't been initialized.
// We do the updates here, | ||
// to ensure they happen after we have secured the state. | ||
inOnNextLoop = false | ||
index = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant.
The initial state is this, and finishing each loop leaves the state like this as well.
I think this fixes the flaky flow interop tests: #3541 #3542
Related to #3449
I think the main change is ensuring we reset the
StreamSubscriber
state BEFORE completing the callback with theChunk
.But, I also made sure we reset the state in any place where we complete a callback, even with exceptions, just in case.
c.c. @armanbilge