-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53847] Add ContinuousMemorySink for Real-time Mode testing #52550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
/** | ||
* A sink that stores the results in memory. This [[org.apache.spark.sql.execution.streaming.Sink]] | ||
* is primarily intended for use in unit tests and does not provide durability. | ||
* This is mostly copied from MemorySink, except that the data needs to be availalbe not in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* This is mostly copied from MemorySink, except that the data needs to be availalbe not in | |
* This is mostly copied from MemorySink, except that the data needs to be available not in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if (endpointRef != null) { | ||
memoryEndpoint.rpcEnv.stop(endpointRef) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this commit is called for each batch, does it mean that the endpoint stops to work after that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh okay, I see. For each batch, there will be a new MemoryRealTimeRpcEndpoint created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
* for production-quality sinks. It's intended for use in tests. | ||
* | ||
*/ | ||
case class RealTimeRowWriterFactory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why this doesn't follow the same naming pattern like ContinuousMemoryRowWriterFactory
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good question. The reason is because this code is going to be shared with the future RTM version of ConsoleStreamingWrite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I can rename it if you fee strongly about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @HeartSaVioR
What changes were proposed in this pull request?
Add a new in memory sink called "ContinuousMemorySink" to facilitate RTM testing. This sink differentiates from the existing MemorySink by immediately sending output back to the driver once the output is generated and not just at the end of the batch which is what the current MemorySink does.
Why are the changes needed?
To facilitate RTM testing
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added simple test. There will be many RTM related tests that will be added in future PRs.
Was this patch authored or co-authored using generative AI tooling?