Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Correct handling for calling dekaf::connector::unary_materialize #2018

Merged
merged 1 commit into from
Mar 19, 2025

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Mar 19, 2025

Description:

Originally the Dekaf special-case lived in Runtime::unary_materialize, but #2001 changed the way unary RPCs are invoked in a way that bypassed that logic. Moving it into Runtime::serve_materialize fixes things.


This change is Reviewable

@jgraettinger
Copy link
Member

For a better way to do this, please take a look at how derive-sqlite is implemented:

https://github.com/estuary/flow/blob/master/crates/runtime/src/derive/connector.rs#L91-L94

You should be able to implement an equivalent connector stream for Dekaf and just call it from materialize::connector::start(), without needing to be aware of it elsewhere

@jgraettinger
Copy link
Member

You don't actually need to spawn it in a tokio task -- those are details that are specific to how sqlite needs to work.

You can instead model off of the validation noop implementation. It only needs to serve Spec and Validate, and can error on anything else.

@jshearer
Copy link
Contributor Author

Ok, the noop implementation makes sense -- basically just a coroutine loop that passes Spec and Validate calls to dekaf::connector::unary_materialize() and errors otherwise.

@jgraettinger
Copy link
Member

jgraettinger commented Mar 19, 2025

I would get rid of dekaf::connector::unary_materialize() altogether, and implement a dekaf connector stream in terms of coroutine that services Spec and Validate, and errors on anything else.

I'd expect dekaf::connector(connector_rx) to return an impl Stream of materialization responses, pretty much identically to how derive_sqlite::connector(connector_rx) works, except you can implement it using a simple coroutine rather than a tokio-spawned task.

…alize`

Originally there was a special-case for Dekaf in `Runtime::unary_materialize`, but #2001 changed the way unary RPCs are invoked in a way that bypassed that logic.
So rolling with the change to implement all connector invocations in terms of streaming RPCs , this refactors `dekaf::connector` to expose a
`connector()` method that implements a streaming interface. It still only handles Spec and Validate.
@jshearer jshearer force-pushed the runtime/update_handling_for_dekaf branch from a0d45bc to 454c312 Compare March 19, 2025 15:51
@jshearer
Copy link
Contributor Author

That turned out nice and clean! Best reviewed with ?w=1 due to whitespace-only changes

Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jshearer jshearer merged commit 261ef3e into master Mar 19, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants