-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathmessage.go
190 lines (159 loc) · 5.06 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package nsq
import (
"bufio"
"encoding/binary"
"io"
"strconv"
"time"
"github.com/pkg/errors"
)
// MessageID is used to represent NSQ message IDs.
type MessageID uint64
// ParseMessageID attempts to parse s, which should be an hexadecimal
// representation of an 8 byte message ID.
func ParseMessageID(s string) (id MessageID, err error) {
var v uint64
v, err = strconv.ParseUint(s, 16, 64)
id = MessageID(v)
return
}
// String returns the hexadecimal representation of the message ID as a string.
func (id MessageID) String() string {
return strconv.FormatUint(uint64(id), 16)
}
// WriteTo writes the message ID to w.
//
// This method satisfies the io.WriterTo interface.
func (id MessageID) WriteTo(w io.Writer) (int64, error) {
a := [16]byte{}
b := strconv.AppendUint(a[:0], uint64(id), 16)
n := len(a) - len(b)
copy(a[n:], b)
for i := 0; i != n; i++ {
a[i] = '0'
}
c, e := w.Write(a[:])
return int64(c), e
}
// Message is a frame type representing a NSQ message.
type Message struct {
// The ID of the message.
ID MessageID
// Attempts is set to the number of attempts made to deliver the message.
Attempts uint16
// Body contains the raw data of the message frame.
Body []byte
// Timestamp is the time at which the message was published.
Timestamp time.Time
// Unexported fields set by the consumer connections.
cmdChan chan<- Command
}
// NewMessage is a helper for creating Message instances directly. A common
// use-case is for writing tests, generally you won't use this directly.
//
// If you do use this, the Command channel is used internally to communicate
// message commands, such as "Finish" or "Requeue". When using this for testing,
// you can make a channel and inspect any message sent along it for assertions.
func NewMessage(id MessageID, body []byte, cmdChan chan<- Command) *Message {
return &Message{
ID: id,
Body: body,
cmdChan: cmdChan,
}
}
// Finish must be called on every message received from a consumer to let the
// NSQ server know that the message was successfully processed.
//
// One of Finish or Requeue should be called on every message, and the methods
// will panic if they are called more than once.
func (m *Message) Finish() {
if m.Complete() {
panic("(*Message).Finish or (*Message).Requeue has already been called")
}
defer func() { recover() }() // the connection may have been closed asynchronously
m.cmdChan <- Fin{MessageID: m.ID}
m.cmdChan = nil
}
// Requeue must be called on messages received from a consumer to let the NSQ
// server know that the message could not be processed and should be retried.
// The timeout is the amount of time the NSQ server waits before offering this
// message again to its consumers.
//
// One of Finish or Requeue should be called on every message, and the methods
// will panic if they are called more than once.
func (m *Message) Requeue(timeout time.Duration) {
if m.Complete() {
panic("(*Message).Finish or (*Message).Requeue has already been called")
}
defer func() { recover() }() // the connection may have been closed asynchronously
m.cmdChan <- Req{MessageID: m.ID, Timeout: timeout}
m.cmdChan = nil
}
// Complete will return a bool indicating whether Finish or Requeue has been called
// for this message.
func (m *Message) Complete() bool {
return m.cmdChan == nil
}
// FrameType returns FrameTypeMessage, satisfies the Frame interface.
func (m Message) FrameType() FrameType {
return FrameTypeMessage
}
// Write serializes the frame to the given buffered output, satisfies the Frame
// interface.
func (m Message) Write(w *bufio.Writer) (err error) {
if err = writeFrameHeader(w, FrameTypeMessage, len(m.Body)+26); err != nil {
err = errors.WithMessage(err, "writing message")
return
}
if err = binary.Write(w, binary.BigEndian, m.Timestamp.UnixNano()); err != nil {
err = errors.Wrap(err, "writing message")
return
}
if err = binary.Write(w, binary.BigEndian, m.Attempts); err != nil {
err = errors.Wrap(err, "writing message")
return
}
if _, err = m.ID.WriteTo(w); err != nil {
err = errors.Wrap(err, "writing message")
return
}
if _, err = w.Write(m.Body); err != nil {
err = errors.Wrap(err, "writing message")
return
}
return
}
func readMessage(n int, r *bufio.Reader) (msg Message, err error) {
var timestamp int64
var attempts uint16
var msgID MessageID
var hexID [16]byte
var body = make([]byte, n-26)
if err = binary.Read(r, binary.BigEndian, ×tamp); err != nil {
err = errors.Wrap(err, "reading message timestamp")
return
}
if err = binary.Read(r, binary.BigEndian, &attempts); err != nil {
err = errors.Wrap(err, "reading message attempts")
return
}
if _, err = io.ReadFull(r, hexID[:]); err != nil {
err = errors.Wrap(err, "reading message ID")
return
}
if _, err = io.ReadFull(r, body); err != nil {
err = errors.Wrap(err, "reading message body")
return
}
if msgID, err = ParseMessageID(string(hexID[:])); err != nil {
err = errors.Wrap(err, "parsing message ID")
return
}
msg = Message{
ID: msgID,
Attempts: attempts,
Body: body,
Timestamp: time.Unix(0, timestamp),
}
return
}