diff --git a/.circleci/config.yml b/.circleci/config.yml
index 67d03a7be..c16afcf63 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -178,6 +178,83 @@ jobs:
       entrypoint: *entrypoint
     steps: *steps
 
+  kafka-370-kraft:
+    working_directory: *working_directory
+    environment:
+      KAFKA_VERSION: "3.7.0"
+
+      # Need to skip nettest to avoid these kinds of errors:
+      #  --- FAIL: TestConn/nettest (17.56s)
+      #    --- FAIL: TestConn/nettest/PingPong (7.40s)
+      #      conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
+      #      conntest.go:118: mismatching value: got 77, want 78
+      #      conntest.go:118: mismatching value: got 78, want 79
+      # ...
+      #
+      # TODO: Figure out why these are happening and fix them (they don't appear to be new).
+      KAFKA_SKIP_NETTEST: "1"
+    docker:
+      - image: circleci/golang
+      - image: bitnami/kafka:3.7.0
+        ports:
+          - 9092:9092
+          - 9093:9093
+        environment: &kraft-env
+          KAFKA_KRAFT_MODE: "true"
+          KAFKA_CFG_NODE_ID: 1
+          KAFKA_CFG_BROKER_ID: 1
+          KAFKA_CLUSTER_ID: 1
+          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+          KAFKA_CFG_PROCESS_ROLES: broker,controller
+          KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+          KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+          KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT
+          KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093
+          KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093
+          KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN
+          KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+          KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
+          ALLOW_PLAINTEXT_LISTENER: yes
+          KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true
+          KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
+          KAFKA_CFG_DELETE_TOPIC_ENABLE: true
+          KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+          KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer'
+          KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+          KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain
+          KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain
+          KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret
+          KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
+          KAFKA_INTER_BROKER_USER: adminscram512
+          KAFKA_INTER_BROKER_PASSWORD: admin-secret-512
+          KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
+        entrypoint: *entrypoint
+    steps: *steps
+
+  kafka-400:
+    working_directory: *working_directory
+    environment:
+      KAFKA_VERSION: "4.0.0"
+
+      # Need to skip nettest to avoid these kinds of errors:
+      #  --- FAIL: TestConn/nettest (17.56s)
+      #    --- FAIL: TestConn/nettest/PingPong (7.40s)
+      #      conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
+      #      conntest.go:118: mismatching value: got 77, want 78
+      #      conntest.go:118: mismatching value: got 78, want 79
+      # ...
+      #
+      # TODO: Figure out why these are happening and fix them (they don't appear to be new).
+      KAFKA_SKIP_NETTEST: "1"
+    docker:
+      - image: circleci/golang
+      - image: bitnami/kafka:4.0.0
+        ports:
+          - 9092:9092
+          - 9093:9093
+        environment: *kraft-env
+    steps: *steps
 workflows:
   version: 2
   run:
@@ -187,3 +264,5 @@ workflows:
     - kafka-270
     - kafka-281
     - kafka-370
+    - kafka-370-kraft
+    - kafka-400
\ No newline at end of file
diff --git a/alterclientquotas_test.go b/alterclientquotas_test.go
index d61c745e3..3bb1023e2 100644
--- a/alterclientquotas_test.go
+++ b/alterclientquotas_test.go
@@ -3,9 +3,11 @@ package kafka
 import (
 	"context"
 	"testing"
+	"time"
 
-	ktesting "github.com/segmentio/kafka-go/testing"
 	"github.com/stretchr/testify/assert"
+
+	ktesting "github.com/segmentio/kafka-go/testing"
 )
 
 func TestClientAlterClientQuotas(t *testing.T) {
@@ -65,6 +67,8 @@ func TestClientAlterClientQuotas(t *testing.T) {
 
 	assert.Equal(t, expectedAlterResp, *alterResp)
 
+	time.Sleep(1 * time.Second) // wait for the quota to be applie (Kafka 4.0.0+)
+
 	describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
 		Components: []DescribeClientQuotasRequestComponent{
 			{
diff --git a/alterpartitionreassignments_test.go b/alterpartitionreassignments_test.go
index 7bbce8fff..48974c7c5 100644
--- a/alterpartitionreassignments_test.go
+++ b/alterpartitionreassignments_test.go
@@ -3,6 +3,7 @@ package kafka
 import (
 	"context"
 	"testing"
+	"time"
 
 	ktesting "github.com/segmentio/kafka-go/testing"
 )
@@ -35,6 +36,7 @@ func TestClientAlterPartitionReassignments(t *testing.T) {
 					BrokerIDs:   []int{1},
 				},
 			},
+			Timeout: 5 * time.Second,
 		},
 	)
 
@@ -96,6 +98,7 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) {
 					BrokerIDs:   []int{1},
 				},
 			},
+			Timeout: 5 * time.Second,
 		},
 	)
 
diff --git a/conn.go b/conn.go
index 2b51afbd5..9f9f25903 100644
--- a/conn.go
+++ b/conn.go
@@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) {
 
 // DeleteTopics deletes the specified topics.
 func (c *Conn) DeleteTopics(topics ...string) error {
-	_, err := c.deleteTopics(deleteTopicsRequestV0{
+	_, err := c.deleteTopics(deleteTopicsRequest{
 		Topics: topics,
 	})
 	return err
@@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error
 // joinGroup attempts to join a consumer group
 //
 // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
-func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
-	var response joinGroupResponseV1
+func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) {
+	version, err := c.negotiateVersion(joinGroup, v1, v2)
+	if err != nil {
+		return joinGroupResponse{}, err
+	}
 
-	err := c.writeOperation(
+	response := joinGroupResponse{v: version}
+
+	err = c.writeOperation(
 		func(deadline time.Time, id int32) error {
-			return c.writeRequest(joinGroup, v1, id, request)
+			return c.writeRequest(joinGroup, version, id, request)
 		},
 		func(deadline time.Time, size int) error {
 			return expectZeroSize(func() (remain int, err error) {
@@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
 		},
 	)
 	if err != nil {
-		return joinGroupResponseV1{}, err
+		return joinGroupResponse{}, err
 	}
 	if response.ErrorCode != 0 {
-		return joinGroupResponseV1{}, Error(response.ErrorCode)
+		return joinGroupResponse{}, Error(response.ErrorCode)
 	}
 
 	return response, nil
diff --git a/conn_test.go b/conn_test.go
index bdce327e0..ef5ce3071 100644
--- a/conn_test.go
+++ b/conn_test.go
@@ -13,8 +13,9 @@ import (
 	"testing"
 	"time"
 
-	ktesting "github.com/segmentio/kafka-go/testing"
 	"golang.org/x/net/nettest"
+
+	ktesting "github.com/segmentio/kafka-go/testing"
 )
 
 type timeout struct{}
@@ -679,10 +680,10 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
 func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) {
 	waitForCoordinator(t, conn, groupID)
 
-	join := func() (joinGroup joinGroupResponseV1) {
+	join := func() (joinGroup joinGroupResponse) {
 		var err error
 		for attempt := 0; attempt < 10; attempt++ {
-			joinGroup, err = conn.joinGroup(joinGroupRequestV1{
+			joinGroup, err = conn.joinGroup(joinGroupRequest{
 				GroupID:          groupID,
 				SessionTimeout:   int32(time.Minute / time.Millisecond),
 				RebalanceTimeout: int32(time.Second / time.Millisecond),
@@ -770,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
 }
 
 func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) {
-	_, err := conn.joinGroup(joinGroupRequestV1{})
+	_, err := conn.joinGroup(joinGroupRequest{})
 	if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) {
 		t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err)
 	}
@@ -780,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) {
 	groupID := makeGroupID()
 	waitForCoordinator(t, conn, groupID)
 
-	_, err := conn.joinGroup(joinGroupRequestV1{
+	_, err := conn.joinGroup(joinGroupRequest{
 		GroupID: groupID,
 	})
 	if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) {
@@ -792,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) {
 	groupID := makeGroupID()
 	waitForCoordinator(t, conn, groupID)
 
-	_, err := conn.joinGroup(joinGroupRequestV1{
+	_, err := conn.joinGroup(joinGroupRequest{
 		GroupID:        groupID,
 		SessionTimeout: int32(3 * time.Second / time.Millisecond),
 	})
diff --git a/consumergroup.go b/consumergroup.go
index f4bb382cb..b32f90162 100644
--- a/consumergroup.go
+++ b/consumergroup.go
@@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
 type coordinator interface {
 	io.Closer
 	findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
-	joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
+	joinGroup(joinGroupRequest) (joinGroupResponse, error)
 	syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
 	leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
 	heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find
 	return t.conn.findCoordinator(req)
 }
 
-func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
+func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
 	// in the case of join group, the consumer group coordinator may wait up
 	// to rebalance timeout in order to wait for all members to join.
 	if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
-		return joinGroupResponseV1{}, err
+		return joinGroupResponse{}, err
 	}
 	return t.conn.joinGroup(req)
 }
@@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
 //  * InvalidSessionTimeout:
 //  * GroupAuthorizationFailed:
 func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
-	request, err := cg.makeJoinGroupRequestV1(memberID)
+	request, err := cg.makeJoinGroupRequest(memberID)
 	if err != nil {
 		return "", 0, nil, err
 	}
@@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
 
 // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
 // request.
-func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
-	request := joinGroupRequestV1{
+func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) {
+	request := joinGroupRequest{
 		GroupID:          cg.config.ID,
 		MemberID:         memberID,
 		SessionTimeout:   int32(cg.config.SessionTimeout / time.Millisecond),
@@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
 	for _, balancer := range cg.config.GroupBalancers {
 		userData, err := balancer.UserData()
 		if err != nil {
-			return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
+			return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
 		}
 		request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
 			ProtocolName: balancer.ProtocolName(),
@@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
 
 // assignTopicPartitions uses the selected GroupBalancer to assign members to
 // their various partitions.
-func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
+func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) {
 	cg.withLogger(func(l Logger) {
 		l.Printf("selected as leader for group, %s\n", cg.config.ID)
 	})
@@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
 }
 
 // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
-func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
+func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) {
 	members := make([]GroupMember, 0, len(in))
 	for _, item := range in {
 		metadata := groupMetadata{}
diff --git a/consumergroup_test.go b/consumergroup_test.go
index 0d3e290a9..da41fc3df 100644
--- a/consumergroup_test.go
+++ b/consumergroup_test.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"errors"
 	"reflect"
+	"strconv"
 	"strings"
 	"sync"
 	"testing"
@@ -15,7 +16,7 @@ var _ coordinator = mockCoordinator{}
 type mockCoordinator struct {
 	closeFunc           func() error
 	findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
-	joinGroupFunc       func(joinGroupRequestV1) (joinGroupResponseV1, error)
+	joinGroupFunc       func(joinGroupRequest) (joinGroupResponse, error)
 	syncGroupFunc       func(syncGroupRequestV0) (syncGroupResponseV0, error)
 	leaveGroupFunc      func(leaveGroupRequestV0) (leaveGroupResponseV0, error)
 	heartbeatFunc       func(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -38,9 +39,9 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor
 	return c.findCoordinatorFunc(req)
 }
 
-func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
+func (c mockCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
 	if c.joinGroupFunc == nil {
-		return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified")
+		return joinGroupResponse{}, errors.New("no joinGroup behavior specified")
 	}
 	return c.joinGroupFunc(req)
 }
@@ -140,33 +141,36 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 		},
 	}
 
-	newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponseV1 {
-		resp := joinGroupResponseV1{
-			GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(),
-		}
+	newJoinGroupResponse := func(topicsByMemberID map[string][]string) func(v apiVersion) joinGroupResponse {
+		return func(v apiVersion) joinGroupResponse {
+			resp := joinGroupResponse{
+				v:             v,
+				GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(),
+			}
 
-		for memberID, topics := range topicsByMemberID {
-			resp.Members = append(resp.Members, joinGroupResponseMemberV1{
-				MemberID: memberID,
-				MemberMetadata: groupMetadata{
-					Topics: topics,
-				}.bytes(),
-			})
-		}
+			for memberID, topics := range topicsByMemberID {
+				resp.Members = append(resp.Members, joinGroupResponseMember{
+					MemberID: memberID,
+					MemberMetadata: groupMetadata{
+						Topics: topics,
+					}.bytes(),
+				})
+			}
 
-		return resp
+			return resp
+		}
 	}
 
 	testCases := map[string]struct {
-		Members     joinGroupResponseV1
+		MembersFunc func(v apiVersion) joinGroupResponse
 		Assignments GroupMemberAssignments
 	}{
 		"nil": {
-			Members:     newJoinGroupResponseV1(nil),
+			MembersFunc: newJoinGroupResponse(nil),
 			Assignments: GroupMemberAssignments{},
 		},
 		"one member, one topic": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1"},
 			}),
 			Assignments: GroupMemberAssignments{
@@ -176,7 +180,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 			},
 		},
 		"one member, two topics": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1", "topic-2"},
 			}),
 			Assignments: GroupMemberAssignments{
@@ -187,7 +191,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 			},
 		},
 		"two members, one topic": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1"},
 				"member-2": {"topic-1"},
 			}),
@@ -201,7 +205,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 			},
 		},
 		"two members, two unshared topics": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1"},
 				"member-2": {"topic-2"},
 			}),
@@ -216,21 +220,24 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 		},
 	}
 
+	supportedVersions := []apiVersion{v1, v2} // joinGroup versions
 	for label, tc := range testCases {
-		t.Run(label, func(t *testing.T) {
-			cg := ConsumerGroup{}
-			cg.config.GroupBalancers = []GroupBalancer{
-				RangeGroupBalancer{},
-				RoundRobinGroupBalancer{},
-			}
-			assignments, err := cg.assignTopicPartitions(conn, tc.Members)
-			if err != nil {
-				t.Fatalf("bad err: %v", err)
-			}
-			if !reflect.DeepEqual(tc.Assignments, assignments) {
-				t.Errorf("expected %v; got %v", tc.Assignments, assignments)
-			}
-		})
+		for _, v := range supportedVersions {
+			t.Run(label+"_v"+strconv.Itoa(int(v)), func(t *testing.T) {
+				cg := ConsumerGroup{}
+				cg.config.GroupBalancers = []GroupBalancer{
+					RangeGroupBalancer{},
+					RoundRobinGroupBalancer{},
+				}
+				assignments, err := cg.assignTopicPartitions(conn, tc.MembersFunc(v))
+				if err != nil {
+					t.Fatalf("bad err: %v", err)
+				}
+				if !reflect.DeepEqual(tc.Assignments, assignments) {
+					t.Errorf("expected %v; got %v", tc.Assignments, assignments)
+				}
+			})
+		}
 	}
 }
 
@@ -243,12 +250,12 @@ func TestConsumerGroup(t *testing.T) {
 			scenario: "Next returns generations",
 			function: func(t *testing.T, ctx context.Context, cg *ConsumerGroup) {
 				gen1, err := cg.Next(ctx)
-				if gen1 == nil {
-					t.Fatalf("expected generation 1 not to be nil")
-				}
 				if err != nil {
 					t.Fatalf("expected no error, but got %+v", err)
 				}
+				if gen1 == nil {
+					t.Fatalf("expected generation 1 not to be nil")
+				}
 				// returning from this function should cause the generation to
 				// exit.
 				gen1.Start(func(context.Context) {})
@@ -419,8 +426,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 						},
 					}, nil
 				}
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{}, errors.New("join group failed")
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{}, errors.New("join group failed")
 				}
 				// NOTE : no stub for leaving the group b/c the member never joined.
 			},
@@ -449,8 +456,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 						},
 					}, nil
 				}
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{
 						ErrorCode: int16(InvalidTopic),
 					}, nil
 				}
@@ -472,8 +479,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 		{
 			scenario: "fails to join group (leader, unsupported protocol)",
 			prepare: func(mc *mockCoordinator) {
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{
 						GenerationID:  12345,
 						GroupProtocol: "foo",
 						LeaderID:      "abc",
@@ -498,8 +505,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 		{
 			scenario: "fails to sync group (general error)",
 			prepare: func(mc *mockCoordinator) {
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{
 						GenerationID:  12345,
 						GroupProtocol: "range",
 						LeaderID:      "abc",
diff --git a/createtopics.go b/createtopics.go
index 4244b3117..40f180f58 100644
--- a/createtopics.go
+++ b/createtopics.go
@@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) {
 }
 
 // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
-type createTopicsRequestV0 struct {
+type createTopicsRequest struct {
+	v apiVersion // v0, v1, v2
+
 	// Topics contains n array of single topic creation requests. Can not
 	// have multiple entries for the same topic.
 	Topics []createTopicsRequestV0Topic
@@ -270,86 +272,136 @@ type createTopicsRequestV0 struct {
 	// Timeout ms to wait for a topic to be completely created on the
 	// controller node. Values <= 0 will trigger topic creation and return immediately
 	Timeout int32
+
+	// If true, check that the topics can be created as specified, but don't create anything.
+	// Internal use only for Kafka 4.0 support.
+	ValidateOnly bool
 }
 
-func (t createTopicsRequestV0) size() int32 {
-	return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
+func (t createTopicsRequest) size() int32 {
+	sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
 		sizeofInt32(t.Timeout)
+	if t.v >= v1 {
+		sz += 1
+	}
+	return sz
 }
 
-func (t createTopicsRequestV0) writeTo(wb *writeBuffer) {
+func (t createTopicsRequest) writeTo(wb *writeBuffer) {
 	wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
 	wb.writeInt32(t.Timeout)
+	if t.v >= v1 {
+		wb.writeBool(t.ValidateOnly)
+	}
 }
 
-type createTopicsResponseV0TopicError struct {
+type createTopicsResponseTopicError struct {
+	v apiVersion
+
 	// Topic name
 	Topic string
 
 	// ErrorCode holds response error code
 	ErrorCode int16
+
+	// ErrorMessage holds responce error message string
+	ErrorMessage string
 }
 
-func (t createTopicsResponseV0TopicError) size() int32 {
-	return sizeofString(t.Topic) +
+func (t createTopicsResponseTopicError) size() int32 {
+	sz := sizeofString(t.Topic) +
 		sizeofInt16(t.ErrorCode)
+	if t.v >= v1 {
+		sz += sizeofString(t.ErrorMessage)
+	}
+	return sz
 }
 
-func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) {
+func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) {
 	wb.writeString(t.Topic)
 	wb.writeInt16(t.ErrorCode)
+	if t.v >= v1 {
+		wb.writeString(t.ErrorMessage)
+	}
 }
 
-func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	if remain, err = readString(r, size, &t.Topic); err != nil {
 		return
 	}
 	if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
 		return
 	}
+	if t.v >= v1 {
+		if remain, err = readString(r, remain, &t.ErrorMessage); err != nil {
+			return
+		}
+	}
 	return
 }
 
 // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
-type createTopicsResponseV0 struct {
-	TopicErrors []createTopicsResponseV0TopicError
+type createTopicsResponse struct {
+	v apiVersion
+
+	ThrottleTime int32 // v2+
+	TopicErrors  []createTopicsResponseTopicError
 }
 
-func (t createTopicsResponseV0) size() int32 {
-	return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
+func (t createTopicsResponse) size() int32 {
+	sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
+	if t.v >= v2 {
+		sz += sizeofInt32(t.ThrottleTime)
+	}
+	return sz
 }
 
-func (t createTopicsResponseV0) writeTo(wb *writeBuffer) {
+func (t createTopicsResponse) writeTo(wb *writeBuffer) {
+	if t.v >= v2 {
+		wb.writeInt32(t.ThrottleTime)
+	}
 	wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) })
 }
 
-func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
-		var topic createTopicsResponseV0TopicError
-		if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil {
+		topic := createTopicsResponseTopicError{v: t.v}
+		if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil {
 			return
 		}
 		t.TopicErrors = append(t.TopicErrors, topic)
 		return
 	}
-	if remain, err = readArrayWith(r, size, fn); err != nil {
+	remain = size
+	if t.v >= v2 {
+		if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
+			return
+		}
+	}
+	if remain, err = readArrayWith(r, remain, fn); err != nil {
 		return
 	}
 
 	return
 }
 
-func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) {
-	var response createTopicsResponseV0
+func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) {
+	version, err := c.negotiateVersion(createTopics, v0, v1, v2)
+	if err != nil {
+		return createTopicsResponse{}, err
+	}
+
+	request.v = version
+	response := createTopicsResponse{v: version}
 
-	err := c.writeOperation(
+	err = c.writeOperation(
 		func(deadline time.Time, id int32) error {
 			if request.Timeout == 0 {
 				now := time.Now()
 				deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
 				request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
 			}
-			return c.writeRequest(createTopics, v0, id, request)
+			return c.writeRequest(createTopics, version, id, request)
 		},
 		func(deadline time.Time, size int) error {
 			return expectZeroSize(func() (remain int, err error) {
@@ -383,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
 			t.toCreateTopicsRequestV0Topic())
 	}
 
-	_, err := c.createTopics(createTopicsRequestV0{
+	_, err := c.createTopics(createTopicsRequest{
 		Topics: requestV0Topics,
 	})
 	return err
diff --git a/createtopics_test.go b/createtopics_test.go
index 38819c382..119d17094 100644
--- a/createtopics_test.go
+++ b/createtopics_test.go
@@ -160,32 +160,37 @@ func TestClientCreateTopics(t *testing.T) {
 	}
 }
 
-func TestCreateTopicsResponseV0(t *testing.T) {
-	item := createTopicsResponseV0{
-		TopicErrors: []createTopicsResponseV0TopicError{
-			{
-				Topic:     "topic",
-				ErrorCode: 2,
+func TestCreateTopicsResponse(t *testing.T) {
+	supportedVersions := []apiVersion{v0, v1, v2}
+	for _, v := range supportedVersions {
+		item := createTopicsResponse{
+			v: v,
+			TopicErrors: []createTopicsResponseTopicError{
+				{
+					v:         v,
+					Topic:     "topic",
+					ErrorCode: 2,
+				},
 			},
-		},
-	}
+		}
 
-	b := bytes.NewBuffer(nil)
-	w := &writeBuffer{w: b}
-	item.writeTo(w)
+		b := bytes.NewBuffer(nil)
+		w := &writeBuffer{w: b}
+		item.writeTo(w)
 
-	var found createTopicsResponseV0
-	remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
-	if err != nil {
-		t.Error(err)
-		t.FailNow()
-	}
-	if remain != 0 {
-		t.Errorf("expected 0 remain, got %v", remain)
-		t.FailNow()
-	}
-	if !reflect.DeepEqual(item, found) {
-		t.Error("expected item and found to be the same")
-		t.FailNow()
+		found := createTopicsResponse{v: v}
+		remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
+		if err != nil {
+			t.Error(err)
+			t.FailNow()
+		}
+		if remain != 0 {
+			t.Errorf("expected 0 remain, got %v", remain)
+			t.FailNow()
+		}
+		if !reflect.DeepEqual(item, found) {
+			t.Error("expected item and found to be the same")
+			t.FailNow()
+		}
 	}
 }
diff --git a/deletetopics.go b/deletetopics.go
index d758d9fd6..ff73d553b 100644
--- a/deletetopics.go
+++ b/deletetopics.go
@@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D
 }
 
 // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
-type deleteTopicsRequestV0 struct {
+type deleteTopicsRequest struct {
 	// Topics holds the topic names
 	Topics []string
 
@@ -77,41 +77,57 @@ type deleteTopicsRequestV0 struct {
 	Timeout int32
 }
 
-func (t deleteTopicsRequestV0) size() int32 {
+func (t deleteTopicsRequest) size() int32 {
 	return sizeofStringArray(t.Topics) +
 		sizeofInt32(t.Timeout)
 }
 
-func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) {
+func (t deleteTopicsRequest) writeTo(wb *writeBuffer) {
 	wb.writeStringArray(t.Topics)
 	wb.writeInt32(t.Timeout)
 }
 
-type deleteTopicsResponseV0 struct {
+type deleteTopicsResponse struct {
+	v apiVersion // v0, v1
+
+	ThrottleTime int32
 	// TopicErrorCodes holds per topic error codes
 	TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
 }
 
-func (t deleteTopicsResponseV0) size() int32 {
-	return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
+func (t deleteTopicsResponse) size() int32 {
+	sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
+	if t.v >= v1 {
+		sz += sizeofInt32(t.ThrottleTime)
+	}
+	return sz
 }
 
-func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
 		var item deleteTopicsResponseV0TopicErrorCode
-		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
+		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
 			return
 		}
 		t.TopicErrorCodes = append(t.TopicErrorCodes, item)
 		return
 	}
-	if remain, err = readArrayWith(r, size, fn); err != nil {
+	remain = size
+	if t.v >= v1 {
+		if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
+			return
+		}
+	}
+	if remain, err = readArrayWith(r, remain, fn); err != nil {
 		return
 	}
 	return
 }
 
-func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) {
+func (t deleteTopicsResponse) writeTo(wb *writeBuffer) {
+	if t.v >= v1 {
+		wb.writeInt32(t.ThrottleTime)
+	}
 	wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) })
 }
 
@@ -146,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) {
 // deleteTopics deletes the specified topics.
 //
 // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
-func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
-	var response deleteTopicsResponseV0
-	err := c.writeOperation(
+func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) {
+	version, err := c.negotiateVersion(deleteTopics, v0, v1)
+	if err != nil {
+		return deleteTopicsResponse{}, err
+	}
+
+	response := deleteTopicsResponse{
+		v: version,
+	}
+
+	err = c.writeOperation(
 		func(deadline time.Time, id int32) error {
 			if request.Timeout == 0 {
 				now := time.Now()
 				deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
 				request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
 			}
-			return c.writeRequest(deleteTopics, v0, id, request)
+			return c.writeRequest(deleteTopics, version, id, request)
 		},
 		func(deadline time.Time, size int) error {
 			return expectZeroSize(func() (remain int, err error) {
@@ -164,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse
 		},
 	)
 	if err != nil {
-		return deleteTopicsResponseV0{}, err
+		return deleteTopicsResponse{}, err
 	}
 	for _, c := range response.TopicErrorCodes {
 		if c.ErrorCode != 0 {
diff --git a/deletetopics_test.go b/deletetopics_test.go
index 3caffe840..4dc681831 100644
--- a/deletetopics_test.go
+++ b/deletetopics_test.go
@@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) {
 }
 
 func TestDeleteTopicsResponseV1(t *testing.T) {
-	item := deleteTopicsResponseV0{
+	item := deleteTopicsResponse{
 		TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{
 			{
 				Topic:     "a",
@@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) {
 	w := &writeBuffer{w: b}
 	item.writeTo(w)
 
-	var found deleteTopicsResponseV0
+	var found deleteTopicsResponse
 	remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
 	if err != nil {
 		t.Fatal(err)
diff --git a/describegroups_test.go b/describegroups_test.go
index ad5890988..5d907366a 100644
--- a/describegroups_test.go
+++ b/describegroups_test.go
@@ -32,7 +32,7 @@ func TestClientDescribeGroups(t *testing.T) {
 		Topic: topic,
 	})
 
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0
 	defer cancel()
 
 	err := w.WriteMessages(
diff --git a/electleaders_test.go b/electleaders_test.go
index 3dbaa4704..8933bb4c1 100644
--- a/electleaders_test.go
+++ b/electleaders_test.go
@@ -3,6 +3,7 @@ package kafka
 import (
 	"context"
 	"testing"
+	"time"
 
 	ktesting "github.com/segmentio/kafka-go/testing"
 )
@@ -26,6 +27,7 @@ func TestClientElectLeaders(t *testing.T) {
 		&ElectLeadersRequest{
 			Topic:      topic,
 			Partitions: []int{0, 1},
+			Timeout:    5 * time.Second,
 		},
 	)
 
diff --git a/joingroup.go b/joingroup.go
index 30823a69a..f3d90a937 100644
--- a/joingroup.go
+++ b/joingroup.go
@@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
 	wb.writeBytes(t.ProtocolMetadata)
 }
 
-type joinGroupRequestV1 struct {
+type joinGroupRequest struct {
 	// GroupID holds the unique group identifier
 	GroupID string
 
@@ -264,7 +264,7 @@ type joinGroupRequestV1 struct {
 	GroupProtocols []joinGroupRequestGroupProtocolV1
 }
 
-func (t joinGroupRequestV1) size() int32 {
+func (t joinGroupRequest) size() int32 {
 	return sizeofString(t.GroupID) +
 		sizeofInt32(t.SessionTimeout) +
 		sizeofInt32(t.RebalanceTimeout) +
@@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 {
 		sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
 }
 
-func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
+func (t joinGroupRequest) writeTo(wb *writeBuffer) {
 	wb.writeString(t.GroupID)
 	wb.writeInt32(t.SessionTimeout)
 	wb.writeInt32(t.RebalanceTimeout)
@@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
 	wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
 }
 
-type joinGroupResponseMemberV1 struct {
+type joinGroupResponseMember struct {
 	// MemberID assigned by the group coordinator
 	MemberID       string
 	MemberMetadata []byte
 }
 
-func (t joinGroupResponseMemberV1) size() int32 {
+func (t joinGroupResponseMember) size() int32 {
 	return sizeofString(t.MemberID) +
 		sizeofBytes(t.MemberMetadata)
 }
 
-func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
+func (t joinGroupResponseMember) writeTo(wb *writeBuffer) {
 	wb.writeString(t.MemberID)
 	wb.writeBytes(t.MemberMetadata)
 }
 
-func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	if remain, err = readString(r, size, &t.MemberID); err != nil {
 		return
 	}
@@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain
 	return
 }
 
-type joinGroupResponseV1 struct {
+type joinGroupResponse struct {
+	v apiVersion // v1, v2
+
+	ThrottleTime int32
+
 	// ErrorCode holds response error code
 	ErrorCode int16
 
@@ -323,19 +327,26 @@ type joinGroupResponseV1 struct {
 
 	// MemberID assigned by the group coordinator
 	MemberID string
-	Members  []joinGroupResponseMemberV1
+	Members  []joinGroupResponseMember
 }
 
-func (t joinGroupResponseV1) size() int32 {
-	return sizeofInt16(t.ErrorCode) +
+func (t joinGroupResponse) size() int32 {
+	sz := sizeofInt16(t.ErrorCode) +
 		sizeofInt32(t.GenerationID) +
 		sizeofString(t.GroupProtocol) +
 		sizeofString(t.LeaderID) +
 		sizeofString(t.MemberID) +
 		sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
+	if t.v >= v2 {
+		sz += sizeofInt32(t.ThrottleTime)
+	}
+	return sz
 }
 
-func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
+func (t joinGroupResponse) writeTo(wb *writeBuffer) {
+	if t.v >= v2 {
+		wb.writeInt32(t.ThrottleTime)
+	}
 	wb.writeInt16(t.ErrorCode)
 	wb.writeInt32(t.GenerationID)
 	wb.writeString(t.GroupProtocol)
@@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
 	wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
 }
 
-func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
-	if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
+func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+	remain = size
+	if t.v >= v2 {
+		if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil {
+			return
+		}
+	}
+	if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
 		return
 	}
 	if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
@@ -362,7 +379,7 @@ func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, e
 	}
 
 	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
-		var item joinGroupResponseMemberV1
+		var item joinGroupResponseMember
 		if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
 			return
 		}
diff --git a/joingroup_test.go b/joingroup_test.go
index 926f5b4a6..73922d6a0 100644
--- a/joingroup_test.go
+++ b/joingroup_test.go
@@ -217,37 +217,41 @@ func TestMemberMetadata(t *testing.T) {
 	}
 }
 
-func TestJoinGroupResponseV1(t *testing.T) {
-	item := joinGroupResponseV1{
-		ErrorCode:     2,
-		GenerationID:  3,
-		GroupProtocol: "a",
-		LeaderID:      "b",
-		MemberID:      "c",
-		Members: []joinGroupResponseMemberV1{
-			{
-				MemberID:       "d",
-				MemberMetadata: []byte("blah"),
+func TestJoinGroupResponse(t *testing.T) {
+	supportedVersions := []apiVersion{v1, v2}
+	for _, v := range supportedVersions {
+		item := joinGroupResponse{
+			v:             v,
+			ErrorCode:     2,
+			GenerationID:  3,
+			GroupProtocol: "a",
+			LeaderID:      "b",
+			MemberID:      "c",
+			Members: []joinGroupResponseMember{
+				{
+					MemberID:       "d",
+					MemberMetadata: []byte("blah"),
+				},
 			},
-		},
-	}
+		}
 
-	b := bytes.NewBuffer(nil)
-	w := &writeBuffer{w: b}
-	item.writeTo(w)
+		b := bytes.NewBuffer(nil)
+		w := &writeBuffer{w: b}
+		item.writeTo(w)
 
-	var found joinGroupResponseV1
-	remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
-	if err != nil {
-		t.Error(err)
-		t.FailNow()
-	}
-	if remain != 0 {
-		t.Errorf("expected 0 remain, got %v", remain)
-		t.FailNow()
-	}
-	if !reflect.DeepEqual(item, found) {
-		t.Error("expected item and found to be the same")
-		t.FailNow()
+		found := joinGroupResponse{v: v}
+		remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
+		if err != nil {
+			t.Error(err)
+			t.FailNow()
+		}
+		if remain != 0 {
+			t.Errorf("expected 0 remain, got %v", remain)
+			t.FailNow()
+		}
+		if !reflect.DeepEqual(item, found) {
+			t.Error("expected item and found to be the same")
+			t.FailNow()
+		}
 	}
 }
diff --git a/listgroups.go b/listgroups.go
index 229de9352..5034b5440 100644
--- a/listgroups.go
+++ b/listgroups.go
@@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int,
 
 	fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
 		var item listGroupsResponseGroupV1
-		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
+		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
 			return
 		}
 		t.Groups = append(t.Groups, item)
diff --git a/listgroups_test.go b/listgroups_test.go
index 8c389d712..a148718be 100644
--- a/listgroups_test.go
+++ b/listgroups_test.go
@@ -55,7 +55,7 @@ func TestClientListGroups(t *testing.T) {
 		Topic: topic,
 	})
 
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0
 	defer cancel()
 
 	err := w.WriteMessages(
diff --git a/offsetfetch.go b/offsetfetch.go
index b85bc5c83..ce80213f8 100644
--- a/offsetfetch.go
+++ b/offsetfetch.go
@@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem
 
 	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
 		item := offsetFetchResponseV1PartitionResponse{}
-		if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
+		if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
 			return
 		}
 		t.PartitionResponses = append(t.PartitionResponses, item)
diff --git a/reader_test.go b/reader_test.go
index d179c2858..63d81816e 100644
--- a/reader_test.go
+++ b/reader_test.go
@@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) {
 
 	conn.SetDeadline(time.Now().Add(10 * time.Second))
 
-	_, err = conn.createTopics(createTopicsRequestV0{
+	_, err = conn.createTopics(createTopicsRequest{
 		Topics: []createTopicsRequestV0Topic{
 			{
 				Topic:             topic,
diff --git a/sasl/sasl_test.go b/sasl/sasl_test.go
index a4101391a..57ff8b7cf 100644
--- a/sasl/sasl_test.go
+++ b/sasl/sasl_test.go
@@ -18,6 +18,11 @@ const (
 )
 
 func TestSASL(t *testing.T) {
+	scramUsers := map[scram.Algorithm]string{scram.SHA256: "adminscram", scram.SHA512: "adminscram"}
+	// kafka 4.0.0 test environment supports only different users for different scram algorithms.
+	if ktesting.KafkaIsAtLeast("4.0.0") {
+		scramUsers = map[scram.Algorithm]string{scram.SHA256: "adminscram256", scram.SHA512: "adminscram512"}
+	}
 	tests := []struct {
 		valid    func() sasl.Mechanism
 		invalid  func() sasl.Mechanism
@@ -39,22 +44,22 @@ func TestSASL(t *testing.T) {
 		},
 		{
 			valid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256")
+				mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "admin-secret-256")
 				return mech
 			},
 			invalid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword")
+				mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "badpassword")
 				return mech
 			},
 			minKafka: "0.10.2.0",
 		},
 		{
 			valid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512")
+				mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "admin-secret-512")
 				return mech
 			},
 			invalid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword")
+				mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "badpassword")
 				return mech
 			},
 			minKafka: "0.10.2.0",
diff --git a/writer_test.go b/writer_test.go
index 6f894ecd3..def7d8a63 100644
--- a/writer_test.go
+++ b/writer_test.go
@@ -856,7 +856,8 @@ func testWriterAutoCreateTopic(t *testing.T) {
 		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 		defer cancel()
 		err = w.WriteMessages(ctx, msg)
-		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
+		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) ||
+			errors.Is(err, context.DeadlineExceeded) {
 			time.Sleep(time.Millisecond * 250)
 			continue
 		}
@@ -924,7 +925,8 @@ func testWriterSasl(t *testing.T) {
 		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 		defer cancel()
 		err = w.WriteMessages(ctx, msg)
-		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
+		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) ||
+			errors.Is(err, context.DeadlineExceeded) {
 			time.Sleep(time.Millisecond * 250)
 			continue
 		}