-
Notifications
You must be signed in to change notification settings - Fork 65
dekaf: Add connection+buffer size limit, fix network errors being categorized as auth errors #2070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
54b33fc
to
3577ca8
Compare
3577ca8
to
91b8f64
Compare
Doing some testing of this. librdkafka says some things including the following when a connection is rejected:
It works fine, in that the consumer retries and once it's able to establish the connection, consumes data. But I'm somewhat concerned that we're just setting ourselves up for more confused users in the future, asking "Why are you using super old Kafka brokers?" or some such. The "real" solution is probably to accept all connections, and "poison" the ones that are accepted after the limit is reached. A poisoned session will take its earliest opportunity to return an error code like Edit: I tried:
In the end, I think the cleanest solution is what I had originally: disconnecting connections immediately. |
91b8f64
to
039e7d7
Compare
We were seeing issues caused by large numbers of incoming connection requests causing OOMs, so I'm hoping that adding and subsequently tuning these knobs will let us prevent that in the future. It's possible that we'll want to introduce the ability to adjust the chunk cap on a per-materialization basis, but for the moment tuning it globally is good enough
…ightly better We're getting lots of complaints about unexpected authentication errors. This is because we're categorizing _all_ errors encountered during `sasl_authenticate` as `SaslAuthenticationFailed`, whereas most of the real causes are transient network related errors. We could be even better about breaking down the error variants, but this should be enough to stop the bleeding
039e7d7
to
58404d7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
legacy_mode_kafka_urls.clone(), | ||
legacy_broker_username.to_string(), | ||
legacy_broker_password.to_string() | ||
), | ||
socket, | ||
addr, | ||
cli.idle_session_timeout, | ||
task_cancellation | ||
task_cancellation, | ||
connection_limit.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is OK as it is for now, but I'll suggest that you take a look at tower
because I think it might help simplify a lot of this. There's built in support for concurrency limit and load shedding, in addition to generally having a nice design for composing whatever other middleware we might want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooo, that looks great! I'll take a look, thanks
Tunables
We were seeing issues caused by large numbers of incoming connection requests causing OOMs, so I'm hoping that adding and subsequently tuning these knobs will let us prevent that in the future. It's possible that we'll want to introduce the ability to adjust the chunk cap on a per-materialization basis, but for the moment tuning it globally is good enough.
MAX_CONNECTIONS
limits the total number of connections per Dekaf pod. The limit is enforced by immediately disconnecting new connections if we're over the limit.READ_BUFFER_CHUNK_LIMIT
allows for configuring the buffer size we askread_json_lines()
to maintain. Previously this was hardcoded to 30, which means we can have up to 4mb buffered per session. If we have 300 sessions, that's ~1.2gb in justReadJsonLine
buffers.Error codes
We're getting lots of complaints about unexpected authentication errors. This is because we're currently categorizing all errors encountered during
sasl_authenticate
asSaslAuthenticationFailed
, whereas most of the real causes are transient network related errors. We could be even better about breaking down the error variants, but this should be enough to stop the bleeding.Fixes #2064
This change is