Skip to content

Conversation

jerrypeng
Copy link
Contributor

What changes were proposed in this pull request?

Add a new in memory sink called "ContinuousMemorySink" to facilitate RTM testing. This sink differentiates from the existing MemorySink by immediately sending output back to the driver once the output is generated and not just at the end of the batch which is what the current MemorySink does.

Why are the changes needed?

To facilitate RTM testing

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added simple test. There will be many RTM related tests that will be added in future PRs.

Was this patch authored or co-authored using generative AI tooling?

/**
* A sink that stores the results in memory. This [[org.apache.spark.sql.execution.streaming.Sink]]
* is primarily intended for use in unit tests and does not provide durability.
* This is mostly copied from MemorySink, except that the data needs to be availalbe not in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This is mostly copied from MemorySink, except that the data needs to be availalbe not in
* This is mostly copied from MemorySink, except that the data needs to be available not in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines +166 to +168
if (endpointRef != null) {
memoryEndpoint.rpcEnv.stop(endpointRef)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this commit is called for each batch, does it mean that the endpoint stops to work after that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay, I see. For each batch, there will be a new MemoryRealTimeRpcEndpoint created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

* for production-quality sinks. It's intended for use in tests.
*
*/
case class RealTimeRowWriterFactory(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this doesn't follow the same naming pattern like ContinuousMemoryRowWriterFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good question. The reason is because this code is going to be shared with the future RTM version of ConsoleStreamingWrite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I can rename it if you fee strongly about it.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants