-
Notifications
You must be signed in to change notification settings - Fork 812
Compatibility with Apache Kafka 4.0 #1360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I believe kafka-go will work with Kafka 4.0 in general because Kafka 2.x protocol support was added to kafka-go and that is not going away with Kafka 4. You will not get the very latest protocol versions with kafka-go yet, but the existing protocols should work, generally speaking. It appears that even if Apache removed support for all <2.1 protocol versions, kafka-go would still function similarly to today. We don't yet have plans to test Kafka 4 as it is still most likely 1-2 months from release. |
Thanks for the response. Let me ask a more concrete question, the following file seems to show V1, but not V2+. While KIP-896 removes both V0 and V1 for the JoinGroup request. Won't that be an issue? https://github.com/segmentio/kafka-go/blob/main/joingroup.go |
I believe this is actually the file you would want to look at to determine which versions are supported. You can see the various fields handled differently for various versions, as appropriate. The way the code is written in your link, there is a V1 hardcoded request struct for whatever reason and then the more generic JoinGroupRequest struct should accommodate up to v7, as far as I understand this code. I don't think the struct names V1, etc. are indicative of what exactly is supported anymore. If you look at Kafka Protocol here (https://kafka.apache.org/protocol.html#The_Messages_JoinGroup), the fields up to v7 are accounted for in the kafka-go code. I spot checked several of these myself before my first message and did not find any APIs that didn't accommodate Kafka 2.1. The V1 code is linked to this extremely old PR. |
Thanks. I was unsure what is actually used given that some stuff was reverted in #1027. It is indeed great if the current version of the client works with Apache Kafka 4.0. The other thing that would be useful to understand is the oldest segment io version that works with Apache Kafka 4.0. Is there a way to know that? |
I tested kafka-go with Apache Kafka 4.0 (building and running the branch mentioned above). The latest version (0.4.47) doesn't seem to work with Kafka 4.0 as it sends a JoinGroupRequest v1 (code ref). I checked this with wireshark, too, to see what the client was actually sending out rather than just basing it on my reading of the code. It fails with a message like this in the logs:
This is what my code looks like func ConsumeUntilEnd() {
var err error
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic-1part",
GroupID: "test-group",
Logger: kafka.LoggerFunc(logf), // just logs everything with a newline
ErrorLogger: kafka.LoggerFunc(logf),
})
done := false
go func() {
// Sleep for 5s
time.Sleep(5 * time.Second)
done = true
}()
for !done {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
m, err := r.ReadMessage(ctx)
if err != nil && ctx.Err() != context.DeadlineExceeded {
panic(err)
} else if ctx.Err() == context.DeadlineExceeded {
continue
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
fmt.Println("Closing...")
err = r.Close()
if err != nil {
panic("could not close reader " + err.Error())
}
} Hopefully this is helpful and I'm not making a mistake in the testing code. |
Thanks @milindl. For the project maintainers, would it be possible to fix this before the Apache Kafka 4.0 release? It would be very useful to have a working client version when the release lands. |
@petedannemann Would you be the right person to comment on this? |
We will be happy to review a PR for supporting Kafka 4.0 if it is needed |
We will leave it to the maintainers/community of this project to work on a patch. We wanted to make sure you were aware that the client will not work with Apache Kafka 4.0 without some (seemingly minor) changes. |
server-side log:
|
Apache Kafka 4.0 was released this week: https://kafka.apache.org/blog#apache_kafka_400_release_announcement |
Is there any update for this issue? Seeing same server-side error as above when upgrading to |
I also used The Client side error message is: |
I did a quick audit of the protocol changes and this is my summary of what we are affected by. It's possible I may have missed something here. Note all the things affected are just for Conn and things that use it like Reader. The Client appears to support Kafka 4.0. Affected:
Unaffected:
|
Given how we claim to support > Kafka 0.10 in the README it seems like we could just update the protocol versions for these APIs and push out a new v0.5.0. The long term best fix would be to adapt Conn to use Client, which by design negotiates API versions and selects the latest. It also has support for many more API versions. That is a big lift and had issues in the past though #1027 |
Can someone from Confluent comment on this issue? It seems like Kafka 4.0 memory usage is unpredictable and JVM configurations for heap memory are no longer respected. Kafka 4.0 requests fail often due to running out of heap memory regardless of how much memory Kafka is given and this makes it difficult for us to test against. I guess it's possible this may just be a Bitnami image issue, but I've haven't been able to find sufficient examples for SASL using the Apache Kafka image to get that to work |
I gave Kafka 4GB of memory and the tests now sometimes pass without running out of memory. Maybe using Kraft mode dramatically increases the memory requirements. Previously I think Kafka used like 200MB of memory during our tests |
Neither or these is expected and there haven't been such reports filed in the issue tracker either. |
We have only seen the test failures due to Kafka running out of memory for tests involving consumer groups. VisualVM shows a dramatic increase of allocations from the "group-coordinator-event-processor" Kafka threads when running these tests. This memory never seems to get reclaimed and the increase occurs when we create several consumer groups |
@petedannemann This is weird. I am not aware of any issues in 4.0 affecting the memory allocation. Is there a way to reproduce the test? Could you share the heap dump? |
func NewReader(cfg *config.Config, topic string) *kafka.Reader {
} Am getting the same errors mentioned above for my code and I have the latest Kafka and latest kafka-go |
@dajac to reproduce this issue you can checkout the branch associated with this PR where we fixed the API deprecation issues and added a docker-compose file for running kafka 4.0. Then run this # Launch Kafka 4.0
docker compose -f docker_compose_versions/docker-compose-400.yml up -d
# KAFKA_VERSION env var is required to run tests that hit Kafka running locally
export KAFKA_VERSION=4.0.0
# Memory appears to increase with each time the tests are run up to some limit
go test -count 10 -run TestReaderConsumerGroup github.com/segmentio/kafka-go I'll try to attach a heapdump sometime soon |
Apache Kafka 4.0 will remove a number of very old protocol API versions as specified by KIP-896. I was trying to understand if this client will work correctly with it and it was not clear, particularly when it comes to the consumer group apis.
So, I thought it would be simplest to ask the project maintainers. :) Note that the 4.0 branch of Apache Kafka contains the KIP-896 changes in case you would like to test your client.
The text was updated successfully, but these errors were encountered: