Skip to content

Commit

Permalink
Fix golints
Browse files Browse the repository at this point in the history
  • Loading branch information
nsurfer committed Jan 10, 2022
1 parent 13ea138 commit bef0b41
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
4 changes: 2 additions & 2 deletions server/core/nats2kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,10 @@ func TestNATSConnectorError(t *testing.T) {
n1 := len(tbs.Bridge.reconnect)
tbs.Bridge.reconnectLock.Unlock()

tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("error!"))
tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("error"))

// Should be a no-op.
tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("another error!"))
tbs.Bridge.ConnectorError(tbs.Bridge.connectors[0], fmt.Errorf("another error"))

tbs.Bridge.reconnectLock.Lock()
n2 := len(tbs.Bridge.reconnect)
Expand Down
1 change: 1 addition & 0 deletions server/core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
stan "github.com/nats-io/stan.go"
)

// Version specifies the command version. This should be set at compile time.
var Version = "0.0-dev"

// NATSKafkaBridge is the core structure for the server.
Expand Down
4 changes: 2 additions & 2 deletions server/core/stan2kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func TestSTANConnectionLostClientIDRegistered(t *testing.T) {
tbs.Bridge.config.ReconnectInterval = 125
sc1 := tbs.Bridge.stan
tbs.Bridge.reconnectLock.Unlock()
tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection!"))
tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection"))

time.Sleep(250 * time.Millisecond)

Expand Down Expand Up @@ -968,7 +968,7 @@ func TestSTANConnectionLost(t *testing.T) {
sc1 := tbs.Bridge.stan
tbs.Bridge.config.STAN.ClientID += "new"
tbs.Bridge.reconnectLock.Unlock()
tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection!"))
tbs.Bridge.stanConnectionLost(sc1, fmt.Errorf("lost connection"))

time.Sleep(250 * time.Millisecond)

Expand Down
10 changes: 4 additions & 6 deletions server/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,8 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) {
Key: cmsg.Key,
Value: deserializedValue,
}, nil
} else {
return Message{}, err
}
return Message{}, err
case loopErr := <-c.consumeErrCh:
return Message{}, loopErr
}
Expand All @@ -235,9 +234,8 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) {
Key: cmsg.Key,
Value: deserializedValue,
}, nil
} else {
return Message{}, err
}
return Message{}, err
}
}

Expand Down Expand Up @@ -329,7 +327,7 @@ func (c *saramaConsumer) deserializePayload(payload []byte) ([]byte, error) {
case srclient.Avro:
value, err = c.deserializeAvro(schema, payload[5:])
case srclient.Json:
value, err = c.validateJsonSchema(schema, payload[5:])
value, err = c.validateJSONSchema(schema, payload[5:])
case srclient.Protobuf:
value, err = c.pbDeserializer.Deserialize(schema, payload[5:])
}
Expand All @@ -355,7 +353,7 @@ func (c *saramaConsumer) deserializeAvro(schema *srclient.Schema, cleanPayload [
return value, nil
}

func (c *saramaConsumer) validateJsonSchema(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) {
func (c *saramaConsumer) validateJSONSchema(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) {
jsc, err := jsonschema.CompileString("schema.json", schema.Schema())
if err != nil {
return nil, fmt.Errorf("unable to parse json schema: %w", err)
Expand Down

0 comments on commit bef0b41

Please sign in to comment.