From 2e02f3700766d1bfa6af17499bbb625c98fdaf59 Mon Sep 17 00:00:00 2001 From: Rob Hansen Date: Mon, 15 Nov 2021 08:44:04 -0800 Subject: [PATCH] Nettest fix (#788) * don't skip messages when offset == highwatermark --- .circleci/config.yml | 33 --------------------------------- conn.go | 28 ++++++++++------------------ 2 files changed, 10 insertions(+), 51 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b0024d1b..03feaee4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -176,17 +176,6 @@ jobs: working_directory: *working_directory environment: KAFKA_VERSION: "2.4.1" - - # Need to skip nettest to avoid these kinds of errors: - # --- FAIL: TestConn/nettest (17.56s) - # --- FAIL: TestConn/nettest/PingPong (7.40s) - # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request - # conntest.go:118: mismatching value: got 77, want 78 - # conntest.go:118: mismatching value: got 78, want 79 - # ... - # - # TODO: Figure out why these are happening and fix them (they don't appear to be new). - KAFKA_SKIP_NETTEST: "1" docker: - image: circleci/golang - image: wurstmeister/zookeeper @@ -203,17 +192,6 @@ jobs: working_directory: *working_directory environment: KAFKA_VERSION: "2.6.0" - - # Need to skip nettest to avoid these kinds of errors: - # --- FAIL: TestConn/nettest (17.56s) - # --- FAIL: TestConn/nettest/PingPong (7.40s) - # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request - # conntest.go:118: mismatching value: got 77, want 78 - # conntest.go:118: mismatching value: got 78, want 79 - # ... - # - # TODO: Figure out why these are happening and fix them (they don't appear to be new). - KAFKA_SKIP_NETTEST: "1" docker: - image: circleci/golang - image: wurstmeister/zookeeper @@ -230,17 +208,6 @@ jobs: working_directory: *working_directory environment: KAFKA_VERSION: "2.7.1" - - # Need to skip nettest to avoid these kinds of errors: - # --- FAIL: TestConn/nettest (17.56s) - # --- FAIL: TestConn/nettest/PingPong (7.40s) - # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request - # conntest.go:118: mismatching value: got 77, want 78 - # conntest.go:118: mismatching value: got 78, want 79 - # ... - # - # TODO: Figure out why these are happening and fix them (they don't appear to be new). - KAFKA_SKIP_NETTEST: "1" docker: - image: circleci/golang - image: wurstmeister/zookeeper diff --git a/conn.go b/conn.go index 2a2411ad..53ff3670 100644 --- a/conn.go +++ b/conn.go @@ -133,11 +133,9 @@ const ( ReadCommitted IsolationLevel = 1 ) -var ( - // DefaultClientID is the default value used as ClientID of kafka - // connections. - DefaultClientID string -) +// DefaultClientID is the default value used as ClientID of kafka +// connections. +var DefaultClientID string func init() { progname := filepath.Base(os.Args[0]) @@ -263,10 +261,12 @@ func (c *Conn) Controller() (broker Broker, err error) { } for _, brokerMeta := range res.Brokers { if brokerMeta.NodeID == res.ControllerID { - broker = Broker{ID: int(brokerMeta.NodeID), + broker = Broker{ + ID: int(brokerMeta.NodeID), Port: int(brokerMeta.Port), Host: brokerMeta.Host, - Rack: brokerMeta.Rack} + Rack: brokerMeta.Rack, + } break } } @@ -322,7 +322,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato err := c.readOperation( func(deadline time.Time, id int32) error { return c.writeRequest(findCoordinator, v0, id, request) - }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -752,9 +751,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch { // ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured // with the default values in ReadBatchConfig except for minBytes and maxBytes. func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { - var adjustedDeadline time.Time - var maxFetch = int(c.fetchMaxBytes) + maxFetch := int(c.fetchMaxBytes) if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch { return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)} @@ -859,11 +857,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { var msgs *messageSetReader if err == nil { - if highWaterMark == offset { - msgs = &messageSetReader{empty: true} - } else { - msgs, err = newMessageSetReader(&c.rbuf, remain) - } + msgs, err = newMessageSetReader(&c.rbuf, remain) } if err == errShortRead { err = checkTimeoutErr(adjustedDeadline) @@ -959,7 +953,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) { // connection. If there are none, the method fetches all partitions of the kafka // cluster. func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) { - if len(topics) == 0 { if len(c.topic) != 0 { defaultTopics := [...]string{c.topic} @@ -1188,7 +1181,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) } return size, err } - }) if err != nil { return size, err @@ -1556,7 +1548,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { return nil, err } if version == v1 { - var request = saslAuthenticateRequestV0{Data: data} + request := saslAuthenticateRequestV0{Data: data} var response saslAuthenticateResponseV0 err := c.writeOperation(