Skip to content

Commit 0b4cc81

Browse files
authored
Merge pull request #185 from cbusbey/donotsend_fix
fixes donotsend logic
2 parents 2dab51a + c512e38 commit 0b4cc81

File tree

2 files changed

+66
-18
lines changed

2 files changed

+66
-18
lines changed

session.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,12 @@ func (s *session) resend(msg Message) error {
218218
}
219219

220220
//queueForSend will validate, persist, and queue the message for send
221-
func (s *session) queueForSend(msg Message) (err error) {
221+
func (s *session) queueForSend(msg Message) error {
222222
s.sendMutex.Lock()
223223
defer s.sendMutex.Unlock()
224224

225-
if err = s.prepMessageForSend(&msg); err != nil {
226-
return
225+
if doNotSend, err := s.prepMessageForSend(&msg); err != nil || doNotSend {
226+
return err
227227
}
228228

229229
s.toSend = append(s.toSend, msg)
@@ -233,69 +233,74 @@ func (s *session) queueForSend(msg Message) (err error) {
233233
default:
234234
}
235235

236-
return
236+
return nil
237237
}
238238

239239
//send will validate, persist, queue the message and send all messages in the queue
240-
func (s *session) send(msg Message) (err error) {
240+
func (s *session) send(msg Message) error {
241241
s.sendMutex.Lock()
242242
defer s.sendMutex.Unlock()
243243

244-
if err = s.prepMessageForSend(&msg); err != nil {
245-
return
244+
if doNotSend, err := s.prepMessageForSend(&msg); err != nil || doNotSend {
245+
return err
246246
}
247247

248248
s.toSend = append(s.toSend, msg)
249249
s.sendQueued()
250250

251-
return
251+
return nil
252252
}
253253

254254
//dropAndSend will optionally reset the store, validate and persist the message, then drops the send queue and sends the message.
255-
func (s *session) dropAndSend(msg Message, resetStore bool) (err error) {
255+
func (s *session) dropAndSend(msg Message, resetStore bool) error {
256256

257257
s.sendMutex.Lock()
258258
defer s.sendMutex.Unlock()
259259

260260
if resetStore {
261-
if err = s.store.Reset(); err != nil {
261+
if err := s.store.Reset(); err != nil {
262262
return err
263263
}
264264
}
265265

266-
if err = s.prepMessageForSend(&msg); err != nil {
267-
return
266+
if doNotSend, err := s.prepMessageForSend(&msg); err != nil || doNotSend {
267+
return err
268268
}
269269

270270
s.dropQueued()
271271
s.toSend = append(s.toSend, msg)
272272
s.sendQueued()
273273

274-
return
274+
return nil
275275
}
276276

277-
func (s *session) prepMessageForSend(msg *Message) (err error) {
277+
func (s *session) prepMessageForSend(msg *Message) (doNotSend bool, err error) {
278278
s.fillDefaultHeader(*msg)
279279
seqNum := s.store.NextSenderMsgSeqNum()
280280
msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum))
281281

282282
var msgType FIXString
283283
if err = msg.Header.GetField(tagMsgType, &msgType); err != nil {
284-
return err
284+
return
285285
}
286286

287287
if isAdminMessageType(string(msgType)) {
288288
s.application.ToAdmin(*msg, s.sessionID)
289289
} else {
290-
s.application.ToApp(*msg, s.sessionID)
290+
if doNotSendErr := s.application.ToApp(*msg, s.sessionID); doNotSendErr != nil {
291+
s.log.OnEventf("Do Not Send: %v", doNotSendErr)
292+
doNotSend = true
293+
return
294+
}
291295
}
292296

293297
var msgBytes []byte
294298
if msgBytes, err = msg.Build(); err != nil {
295299
return
296300
}
297301

298-
return s.persist(seqNum, msgBytes)
302+
err = s.persist(seqNum, msgBytes)
303+
return
299304
}
300305

301306
func (s *session) persist(seqNum int, msgBytes []byte) error {
@@ -492,7 +497,11 @@ func (s *session) verifySelect(msg Message, checkTooHigh bool, checkTooLow bool)
492497

493498
func (s *session) fromCallback(msg Message) MessageRejectError {
494499
var msgType FIXString
495-
if msg.Header.GetField(tagMsgType, &msgType); isAdminMessageType(string(msgType)) {
500+
if err := msg.Header.GetField(tagMsgType, &msgType); err != nil {
501+
return err
502+
}
503+
504+
if isAdminMessageType(string(msgType)) {
496505
return s.application.FromAdmin(msg, s.sessionID)
497506
}
498507

session_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package quickfix
22

33
import (
4+
"fmt"
45
"testing"
56
"time"
67

@@ -252,6 +253,7 @@ func TestSession_CheckTargetTooLow(t *testing.T) {
252253
}
253254

254255
type TestClient struct {
256+
rejectToApp bool
255257
adminMessages []Message
256258
appMessages []Message
257259
}
@@ -275,6 +277,11 @@ func (e *TestClient) ToAdmin(msg Message, sessionID SessionID) {
275277

276278
func (e *TestClient) ToApp(msg Message, sessionID SessionID) (err error) {
277279
e.appMessages = append(e.appMessages, msg)
280+
281+
if e.rejectToApp {
282+
return fmt.Errorf("Rejecting ToApp")
283+
}
284+
278285
return nil
279286
}
280287

@@ -331,6 +338,17 @@ func (suite *SessionSendTestSuite) Logon() Message {
331338
return msg
332339
}
333340

341+
func (suite *SessionSendTestSuite) rejectToApp() {
342+
suite.TestClient.rejectToApp = true
343+
}
344+
345+
func (suite *SessionSendTestSuite) shouldNotPersistMessage() {
346+
suite.Equal(1, suite.store.NextSenderMsgSeqNum(), "The next sender sequence number should not be incremented")
347+
persistedMessages, err := suite.store.GetMessages(1, 1)
348+
suite.Nil(err)
349+
suite.Len(persistedMessages, 0, "The message should not be persisted")
350+
}
351+
334352
func (suite *SessionSendTestSuite) shouldPersistMessage() {
335353
suite.Equal(2, suite.store.NextSenderMsgSeqNum(), "The next sender sequence number should be incremented")
336354
persistedMessages, err := suite.store.GetMessages(1, 1)
@@ -399,6 +417,18 @@ func (suite *SessionSendTestSuite) TestQueueForSendAppMessage() {
399417
suite.shouldNotSendMessage()
400418
}
401419

420+
func (suite *SessionSendTestSuite) TestQueueForSendDoNotSendAppMessage() {
421+
suite.rejectToApp()
422+
require.Nil(suite.T(), suite.queueForSend(suite.NewOrderSingle()))
423+
424+
suite.shouldCallToApp()
425+
suite.shouldNotPersistMessage()
426+
suite.shouldNotSendMessage()
427+
428+
require.Nil(suite.T(), suite.send(suite.Heartbeat()))
429+
suite.shouldSendMessages(1)
430+
}
431+
402432
func (suite *SessionSendTestSuite) TestQueueForSendAdminMessage() {
403433
require.Nil(suite.T(), suite.queueForSend(suite.Heartbeat()))
404434

@@ -415,6 +445,15 @@ func (suite *SessionSendTestSuite) TestSendAppMessage() {
415445
suite.shouldSendMessage()
416446
}
417447

448+
func (suite *SessionSendTestSuite) TestSendAppDoNotSendMessage() {
449+
suite.rejectToApp()
450+
require.Nil(suite.T(), suite.send(suite.NewOrderSingle()))
451+
452+
suite.shouldCallToApp()
453+
suite.shouldNotPersistMessage()
454+
suite.shouldNotSendMessage()
455+
}
456+
418457
func (suite *SessionSendTestSuite) TestSendAdminMessage() {
419458
require.Nil(suite.T(), suite.send(suite.Heartbeat()))
420459

0 commit comments

Comments
 (0)