diff --git a/client.go b/client.go index bc32660d..cbb0e40e 100644 --- a/client.go +++ b/client.go @@ -138,7 +138,7 @@ func (c *Client) NewSession() (*Session, error) { begin, ok := fr.body.(*performBegin) if !ok { - s.Close() // deallocate session on error + _ = s.Close() // deallocate session on error return nil, errorErrorf("unexpected begin response: %+v", fr.body) } @@ -782,7 +782,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { // default to a more reasonable max and allow users to // change via LinkOption l.peerMaxMessageSize = maxSliceLen - if resp.MaxMessageSize != 0 && resp.MaxMessageSize < uint64(l.peerMaxMessageSize) { + if resp.MaxMessageSize != 0 && resp.MaxMessageSize < l.peerMaxMessageSize { l.peerMaxMessageSize = resp.MaxMessageSize } @@ -1274,7 +1274,7 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) { messageSize += len(fr.Payload) if messageSize > maxMessageSize { // TODO: send error - r.Close() + _ = r.Close() return nil, errorErrorf("received message larger than max size of ") } diff --git a/conn.go b/conn.go index d26b25ab..1e611a73 100644 --- a/conn.go +++ b/conn.go @@ -256,7 +256,7 @@ func (c *conn) start() error { // check if err occurred if c.err != nil { close(c.txDone) // close here since connWriter hasn't been started yet - c.Close() + _ = c.Close() return c.err } @@ -429,7 +429,7 @@ func (c *conn) connReader() { // need to read more if buf doesn't contain the complete frame // or there's not enough in buf to parse the header if frameInProgress || buf.len() < frameHeaderSize { - c.net.SetReadDeadline(time.Now().Add(c.idleTimeout)) + _ = c.net.SetReadDeadline(time.Now().Add(c.idleTimeout)) err := buf.readFromOnce(c.net) if err != nil { select { @@ -577,7 +577,7 @@ func (c *conn) connWriter() { // connection complete case <-c.done: // send close - c.writeFrame(frame{ + _ = c.writeFrame(frame{ type_: frameTypeAMQP, body: &performClose{}, }) @@ -590,7 +590,7 @@ func (c *conn) connWriter() { // by connWriter after initial negotiation. func (c *conn) writeFrame(fr frame) error { if c.connectTimeout != 0 { - c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout)) + _ = c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout)) } // writeFrame into txBuf @@ -614,7 +614,7 @@ func (c *conn) writeFrame(fr frame) error { // network func (c *conn) writeProtoHeader(pID protoID) error { if c.connectTimeout != 0 { - c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout)) + _ = c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout)) } _, err := c.net.Write([]byte{'A', 'M', 'Q', 'P', byte(pID), 1, 0, 0}) return err @@ -722,12 +722,12 @@ func (c *conn) startTLS() stateFunc { // this function will be executed by connReader c.connReaderRun <- func() { - c.net.SetReadDeadline(time.Time{}) // clear timeout + _ = c.net.SetReadDeadline(time.Time{}) // clear timeout // wrap existing net.Conn and perform TLS handshake tlsConn := tls.Client(c.net, c.tlsConfig) if c.connectTimeout != 0 { - tlsConn.SetWriteDeadline(time.Now().Add(c.connectTimeout)) + _ = tlsConn.SetWriteDeadline(time.Now().Add(c.connectTimeout)) } c.err = tlsConn.Handshake() @@ -739,7 +739,7 @@ func (c *conn) startTLS() stateFunc { } // set deadline to interrupt connReader - c.net.SetReadDeadline(time.Time{}.Add(1)) + _ = c.net.SetReadDeadline(time.Time{}.Add(1)) <-done diff --git a/decode.go b/decode.go index bce49499..bc9e0306 100644 --- a/decode.go +++ b/decode.go @@ -373,7 +373,7 @@ func unmarshalComposite(r *buffer, type_ amqpType, fields ...unmarshalField) err } // Unmarshal each of the received fields. - err := unmarshal(r, field.field) + err = unmarshal(r, field.field) if err != nil { return errorWrapf(err, "unmarshaling field %d", i) } @@ -406,8 +406,6 @@ type unmarshalField struct { type nullHandler func() error // readCompositeHeader reads and consumes the composite header from r. -// -// If the composite is null, errNull will be returned. func readCompositeHeader(r *buffer) (_ amqpType, fields int, _ error) { type_, err := r.readType() if err != nil { @@ -426,57 +424,57 @@ func readCompositeHeader(r *buffer) (_ amqpType, fields int, _ error) { } // fields are represented as a list - _, fields, err = readListHeader(r) + fields, err = readListHeader(r) return amqpType(v), fields, err } -func readListHeader(r *buffer) (size, length int, _ error) { +func readListHeader(r *buffer) (length int, _ error) { type_, err := r.readType() if err != nil { - return 0, 0, err + return 0, err } listLength := r.len() switch type_ { case typeCodeList0: - return 0, 0, nil + return 0, nil case typeCodeList8: buf, ok := r.next(2) if !ok { - return 0, 0, errorNew("invalid length") + return 0, errorNew("invalid length") } _ = buf[1] - size = int(buf[0]) - if int(size) > listLength-1 { - return 0, 0, errorNew("invalid length") + size := int(buf[0]) + if size > listLength-1 { + return 0, errorNew("invalid length") } length = int(buf[1]) case typeCodeList32: buf, ok := r.next(8) if !ok { - return 0, 0, errorNew("invalid length") + return 0, errorNew("invalid length") } _ = buf[7] - size = int(binary.BigEndian.Uint32(buf[:4])) + size := int(binary.BigEndian.Uint32(buf[:4])) if size > listLength-4 { - return 0, 0, errorNew("invalid length") + return 0, errorNew("invalid length") } length = int(binary.BigEndian.Uint32(buf[4:8])) default: - return 0, 0, errorErrorf("type code %#02x is not a recognized list type", type_) + return 0, errorErrorf("type code %#02x is not a recognized list type", type_) } - return size, length, nil + return length, nil } -func readArrayHeader(r *buffer) (size, length int, _ error) { +func readArrayHeader(r *buffer) (length int, _ error) { type_, err := r.readType() if err != nil { - return 0, 0, err + return 0, err } arrayLength := r.len() @@ -485,31 +483,31 @@ func readArrayHeader(r *buffer) (size, length int, _ error) { case typeCodeArray8: buf, ok := r.next(2) if !ok { - return 0, 0, errorNew("invalid length") + return 0, errorNew("invalid length") } _ = buf[1] - size = int(buf[0]) - if int(size) > arrayLength-1 { - return 0, 0, errorNew("invalid length") + size := int(buf[0]) + if size > arrayLength-1 { + return 0, errorNew("invalid length") } length = int(buf[1]) case typeCodeArray32: buf, ok := r.next(8) if !ok { - return 0, 0, errorNew("invalid length") + return 0, errorNew("invalid length") } _ = buf[7] - l := binary.BigEndian.Uint32(buf[:4]) - if int(l) > arrayLength-4 { - return 0, 0, errorErrorf("invalid length for type %02x", type_) + size := binary.BigEndian.Uint32(buf[:4]) + if int(size) > arrayLength-4 { + return 0, errorErrorf("invalid length for type %02x", type_) } length = int(binary.BigEndian.Uint32(buf[4:8])) default: - return 0, 0, errorErrorf("type code %#02x is not a recognized list type", type_) + return 0, errorErrorf("type code %#02x is not a recognized list type", type_) } - return size, length, nil + return length, nil } func readString(r *buffer) (string, error) { @@ -684,7 +682,7 @@ func readAnyMap(r *buffer) (interface{}, error) { stringKeys := true Loop: - for key, _ := range m { + for key := range m { switch key.(type) { case string: case symbol: @@ -829,7 +827,7 @@ func readComposite(r *buffer) (interface{}, error) { if len(buf) < 10 { return nil, errorNew("invalid length for ulong") } - compositeType = uint64(binary.BigEndian.Uint64(buf[2:])) + compositeType = binary.BigEndian.Uint64(buf[2:]) } if compositeType > math.MaxUint8 { @@ -1196,40 +1194,45 @@ func readUUID(r *buffer) (UUID, error) { return uuid, nil } -func readMapHeader(r *buffer) (size uint32, count uint32, _ error) { +func readMapHeader(r *buffer) (count uint32, _ error) { type_, err := r.readType() if err != nil { - return 0, 0, err + return 0, err } - var width int + length := r.len() + switch type_ { case typeCodeMap8: buf, ok := r.next(2) if !ok { - return 0, 0, errorNew("invalid length") + return 0, errorNew("invalid length") } _ = buf[1] - size = uint32(buf[0]) + size := int(buf[0]) + if size > length-1 { + return 0, errorNew("invalid length") + } count = uint32(buf[1]) - width = 1 case typeCodeMap32: buf, ok := r.next(8) if !ok { - return 0, 0, errorNew("invalid length") + return 0, errorNew("invalid length") } _ = buf[7] - size = binary.BigEndian.Uint32(buf[:4]) + size := int(binary.BigEndian.Uint32(buf[:4])) + if size > length-4 { + return 0, errorNew("invalid length") + } count = binary.BigEndian.Uint32(buf[4:8]) - width = 4 default: - return 0, 0, errorErrorf("invalid map type %#02x", type_) + return 0, errorErrorf("invalid map type %#02x", type_) } - if uint64(size) > uint64(r.len()+width) || uint64(count) > uint64(r.len()) { - return 0, 0, errorNew("invalid length") + if int(count) > r.len() { + return 0, errorNew("invalid length") } - return size, count, err + return count, nil } diff --git a/encode.go b/encode.go index fb2b5b77..0c1297fd 100644 --- a/encode.go +++ b/encode.go @@ -452,8 +452,11 @@ func writeMap(wr *buffer, m interface{}) error { case map[string]interface{}: pairs = len(m) * 2 for key, val := range m { - writeString(wr, key) - err := marshal(wr, val) + err := writeString(wr, key) + if err != nil { + return err + } + err = marshal(wr, val) if err != nil { return err } @@ -473,8 +476,11 @@ func writeMap(wr *buffer, m interface{}) error { case unsettled: pairs = len(m) * 2 for key, val := range m { - writeString(wr, key) - err := marshal(wr, val) + err := writeString(wr, key) + if err != nil { + return err + } + err = marshal(wr, val) if err != nil { return err } diff --git a/internal/testconn/testconn.go b/internal/testconn/testconn.go index 91e5c5ac..feae6db9 100644 --- a/internal/testconn/testconn.go +++ b/internal/testconn/testconn.go @@ -49,11 +49,17 @@ func (c *Conn) Close() error { } func (c *Conn) LocalAddr() net.Addr { - return &net.TCPAddr{net.IP{127, 0, 0, 1}, 49706, ""} + return &net.TCPAddr{ + IP: net.IP{127, 0, 0, 1}, + Port: 49706, + } } func (c *Conn) RemoteAddr() net.Addr { - return &net.TCPAddr{net.IP{127, 0, 0, 1}, 49706, ""} + return &net.TCPAddr{ + IP: net.IP{127, 0, 0, 1}, + Port: 49706, + } } func (c *Conn) SetDeadline(t time.Time) error { diff --git a/marshal_test.go b/marshal_test.go index 2788259a..4ca5594a 100644 --- a/marshal_test.go +++ b/marshal_test.go @@ -50,12 +50,12 @@ func TestFrameMarshalUnmarshal(t *testing.T) { err := writeFrame(&buf, tt.frame) if err != nil { - t.Error(fmt.Sprintf("%+v", err)) + t.Fatalf("%+v", err) } header, err := parseFrameHeader(&buf) if err != nil { - t.Errorf("%+v", err) + t.Fatalf("%+v", err) } want := tt.frame @@ -67,6 +67,9 @@ func TestFrameMarshalUnmarshal(t *testing.T) { } payload, err := parseFrameBody(&buf) + if err != nil { + t.Fatalf("%+v", err) + } cmpOpts := cmp.Options{ DeepAllowUnexported(want.body, payload), } @@ -216,12 +219,12 @@ func TestReadAny(t *testing.T) { var buf buffer err := marshal(&buf, type_) if err != nil { - t.Error(fmt.Sprintf("%+v", err)) + t.Errorf("%+v", err) } got, err := readAny(&buf) if err != nil { - t.Fatal(fmt.Sprintf("%+v", err)) + t.Fatalf("%+v", err) } cmpOpts := cmp.Options{ @@ -626,11 +629,11 @@ func DeepAllowUnexported(vs ...interface{}) cmp.Option { for _, v := range vs { structTypes(reflect.ValueOf(v), m) } - var typs []interface{} + var types []interface{} for t := range m { - typs = append(typs, reflect.New(t).Elem().Interface()) + types = append(types, reflect.New(t).Elem().Interface()) } - return cmp.AllowUnexported(typs...) + return cmp.AllowUnexported(types...) } func structTypes(v reflect.Value, m map[reflect.Type]struct{}) { diff --git a/types.go b/types.go index 1f810ecc..06d81175 100644 --- a/types.go +++ b/types.go @@ -566,7 +566,7 @@ func (u unsettled) marshal(wr *buffer) error { } func (u *unsettled) unmarshal(r *buffer) error { - _, count, err := readMapHeader(r) + count, err := readMapHeader(r) if err != nil { return err } @@ -595,7 +595,7 @@ func (f filter) marshal(wr *buffer) error { } func (f *filter) unmarshal(r *buffer) error { - _, count, err := readMapHeader(r) + count, err := readMapHeader(r) if err != nil { return err } @@ -1263,7 +1263,7 @@ type performTransfer struct { // optional channel to indicate to sender that transfer has completed done chan struct{} - // complete when receiver has responded with dispostion (ReceiverSettleMode = second) + // complete when receiver has responded with disposition (ReceiverSettleMode = second) // instead of when this message has been sent on network confirmSettlement bool } @@ -1617,7 +1617,7 @@ func (c *performClose) frameBody() {} func (c *performClose) marshal(wr *buffer) error { return marshalComposite(wr, typeCodeClose, []marshalField{ - marshalField{value: c.Error, omit: c.Error == nil}, + {value: c.Error, omit: c.Error == nil}, }) } @@ -1924,9 +1924,9 @@ func peekMessageType(buf []byte) (uint8, error) { if t == typeCodeSmallUlong { if len(buf[2:]) == 0 { - errorNew("invalid ulong") + return 0, errorNew("invalid ulong") } - return uint8(buf[2]), nil + return buf[2], nil } if t != typeCodeUlong { @@ -1959,7 +1959,7 @@ func (a Annotations) marshal(wr *buffer) error { } func (a *Annotations) unmarshal(r *buffer) error { - _, count, err := readMapHeader(r) + count, err := readMapHeader(r) if err != nil { return err } @@ -2458,7 +2458,7 @@ func (m mapAnyAny) marshal(wr *buffer) error { } func (m *mapAnyAny) unmarshal(r *buffer) error { - _, count, err := readMapHeader(r) + count, err := readMapHeader(r) if err != nil { return err } @@ -2497,7 +2497,7 @@ func (m mapStringAny) marshal(wr *buffer) error { } func (m *mapStringAny) unmarshal(r *buffer) error { - _, count, err := readMapHeader(r) + count, err := readMapHeader(r) if err != nil { return err } @@ -2526,13 +2526,13 @@ func (m mapSymbolAny) marshal(wr *buffer) error { return writeMap(wr, map[symbol]interface{}(m)) } -func (f *mapSymbolAny) unmarshal(r *buffer) error { - _, count, err := readMapHeader(r) +func (m *mapSymbolAny) unmarshal(r *buffer) error { + count, err := readMapHeader(r) if err != nil { return err } - m := make(mapSymbolAny, count/2) + mm := make(mapSymbolAny, count/2) for i := uint32(0); i < count; i += 2 { key, err := readString(r) if err != nil { @@ -2542,9 +2542,9 @@ func (f *mapSymbolAny) unmarshal(r *buffer) error { if err != nil { return err } - m[symbol(key)] = value + mm[symbol(key)] = value } - *f = m + *m = mm return nil } @@ -2609,28 +2609,19 @@ func (p *lifetimePolicy) unmarshal(r *buffer) error { return nil } -func (p lifetimePolicy) unmarshalConstant(r *buffer) error { - _, ok := r.next(4) - if !ok { - return errorErrorf("invalid size %d for lifetime-policy", r.len()) - } - return nil -} - +// Sender Settlement Modes const ( - // ModeMixed specifies the sender will send all deliveries initially - // unsettled to the receiver. + // Sender will send all deliveries initially unsettled to the receiver. ModeUnsettled SenderSettleMode = 0 - // ModeSettled specifies the sender will send all deliveries settled - // to the receiver. + // Sender will send all deliveries settled to the receiver. ModeSettled SenderSettleMode = 1 - // ModeMixed specifies the sender MAY send a mixture of settled and - // unsettled deliveries to the receiver. + // Sender MAY send a mixture of settled and unsettled deliveries to the receiver. ModeMixed SenderSettleMode = 2 ) +// SenderSettleMode specifies how the sender will settle messages. type SenderSettleMode uint8 func (m *SenderSettleMode) String() string { @@ -2663,14 +2654,14 @@ func (m *SenderSettleMode) unmarshal(r *buffer) error { return err } +// Receiver Settlement Modes const ( - // ModeFirst specifies the receiver will spontaneously settle all - // incoming transfers. + // Receiver will spontaneously settle all incoming transfers. ModeFirst ReceiverSettleMode = 0 - // ModeSecond specifies receiver will only settle after sending the - // disposition to the sender and receiving a disposition indicating - // settlement of the delivery from the sender. + // Receiver will only settle after sending the disposition to the + // sender and receiving a disposition indicating settlement of + // the delivery from the sender. // // BUG: When receiving messages, accepting/rejecting/releasing a // received message does not block and wait for the sender's @@ -2678,6 +2669,7 @@ const ( ModeSecond ReceiverSettleMode = 1 ) +// ReceiverSettleMode specifies how the receiver will settle messages. type ReceiverSettleMode uint8 func (m *ReceiverSettleMode) String() string { @@ -2723,9 +2715,14 @@ func (t describedType) marshal(wr *buffer) error { func (t *describedType) unmarshal(r *buffer) error { b, err := r.readByte() + if err != nil { + return err + } + if b != 0x0 { return errorErrorf("invalid described type header %02x", b) } + err = unmarshal(r, &t.descriptor) if err != nil { return err @@ -2756,7 +2753,7 @@ func (a ArrayUByte) marshal(wr *buffer) error { } func (a *ArrayUByte) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -2793,7 +2790,7 @@ func (a arrayInt8) marshal(wr *buffer) error { } func (a *arrayInt8) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -2841,7 +2838,7 @@ func (a arrayUint16) marshal(wr *buffer) error { } func (a *arrayUint16) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -2892,7 +2889,7 @@ func (a arrayInt16) marshal(wr *buffer) error { } func (a *arrayInt16) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -2959,7 +2956,7 @@ func (a arrayUint32) marshal(wr *buffer) error { } func (a *arrayUint32) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3052,7 +3049,7 @@ func (a arrayInt32) marshal(wr *buffer) error { } func (a *arrayInt32) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3136,7 +3133,7 @@ func (a arrayUint64) marshal(wr *buffer) error { } func (a *arrayUint64) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3229,7 +3226,7 @@ func (a arrayInt64) marshal(wr *buffer) error { } func (a *arrayInt64) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3297,7 +3294,7 @@ func (a arrayFloat) marshal(wr *buffer) error { } func (a *arrayFloat) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3349,7 +3346,7 @@ func (a arrayDouble) marshal(wr *buffer) error { } func (a *arrayDouble) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3405,7 +3402,7 @@ func (a arrayBool) marshal(wr *buffer) error { } func (a *arrayBool) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3489,7 +3486,7 @@ func (a arrayString) marshal(wr *buffer) error { } func (a *arrayString) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3580,7 +3577,7 @@ func (a arraySymbol) marshal(wr *buffer) error { } func (a *arraySymbol) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3670,7 +3667,7 @@ func (a arrayBinary) marshal(wr *buffer) error { } func (a *arrayBinary) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3743,7 +3740,7 @@ func (a arrayTimestamp) marshal(wr *buffer) error { } func (a *arrayTimestamp) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3773,7 +3770,7 @@ func (a *arrayTimestamp) unmarshal(r *buffer) error { for i := range aa { ms := int64(binary.BigEndian.Uint64(buf[bufIdx:])) bufIdx += typeSize - aa[i] = time.Unix(int64(ms)/1000, int64(ms%1000)*1000000).UTC() + aa[i] = time.Unix(ms/1000, (ms%1000)*1000000).UTC() } *a = aa @@ -3795,7 +3792,7 @@ func (a arrayUUID) marshal(wr *buffer) error { } func (a *arrayUUID) unmarshal(r *buffer) error { - _, length, err := readArrayHeader(r) + length, err := readArrayHeader(r) if err != nil { return err } @@ -3866,7 +3863,7 @@ func (l list) marshal(wr *buffer) error { } func (l *list) unmarshal(r *buffer) error { - _, length, err := readListHeader(r) + length, err := readListHeader(r) if err != nil { return err }