diff --git a/server/core/nats2kafka_test.go b/server/core/nats2kafka_test.go index 926ba65..f39e1e4 100644 --- a/server/core/nats2kafka_test.go +++ b/server/core/nats2kafka_test.go @@ -664,10 +664,10 @@ func TestNATSConnectorError(t *testing.T) { n1 := len(tbs.Bridge.reconnect) tbs.Bridge.reconnectLock.Unlock() - tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("error!")) + tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("error")) // Should be a no-op. - tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("another error!")) + tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("another error")) tbs.Bridge.reconnectLock.Lock() n2 := len(tbs.Bridge.reconnect) diff --git a/server/core/server.go b/server/core/server.go index 99ce293..26cd065 100644 --- a/server/core/server.go +++ b/server/core/server.go @@ -31,6 +31,7 @@ import ( stan "github.com/nats-io/stan.go" ) +// Version specifies the command version. This should be set at compile time. var Version = "0.0-dev" // NATSKafkaBridge is the core structure for the server. diff --git a/server/core/stan2kafka_test.go b/server/core/stan2kafka_test.go index 354361b..086e384 100644 --- a/server/core/stan2kafka_test.go +++ b/server/core/stan2kafka_test.go @@ -939,7 +939,7 @@ func TestSTANConnectionLostClientIDRegistered(t *testing.T) { tbs.Bridge.config.ReconnectInterval = 125 sc1 := tbs.Bridge.stan tbs.Bridge.reconnectLock.Unlock() - tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection!")) + tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection")) time.Sleep(250 * time.Millisecond) @@ -968,7 +968,7 @@ func TestSTANConnectionLost(t *testing.T) { sc1 := tbs.Bridge.stan tbs.Bridge.config.STAN.ClientID += "new" tbs.Bridge.reconnectLock.Unlock() - tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection!")) + tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection")) time.Sleep(250 * time.Millisecond) diff --git a/server/kafka/consumer.go b/server/kafka/consumer.go index 9e92650..8f7fe12 100644 --- a/server/kafka/consumer.go +++ b/server/kafka/consumer.go @@ -208,9 +208,8 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) { Key: cmsg.Key, Value: deserializedValue, }, nil - } else { - return Message{}, err } + return Message{}, err case loopErr := <-c.consumeErrCh: return Message{}, loopErr } @@ -235,9 +234,8 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) { Key: cmsg.Key, Value: deserializedValue, }, nil - } else { - return Message{}, err } + return Message{}, err } } @@ -329,7 +327,7 @@ func (c *saramaConsumer) deserializePayload(payload []byte) ([]byte, error) { case srclient.Avro: value, err = c.deserializeAvro(schema, payload[5:]) case srclient.Json: - value, err = c.validateJsonSchema(schema, payload[5:]) + value, err = c.validateJSONSchema(schema, payload[5:]) case srclient.Protobuf: value, err = c.pbDeserializer.Deserialize(schema, payload[5:]) } @@ -355,7 +353,7 @@ func (c *saramaConsumer) deserializeAvro(schema *srclient.Schema, cleanPayload [ return value, nil } -func (c *saramaConsumer) validateJsonSchema(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) { +func (c *saramaConsumer) validateJSONSchema(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) { jsc, err := jsonschema.CompileString("schema.json", schema.Schema()) if err != nil { return nil, fmt.Errorf("unable to parse json schema: %w", err)