From ffafbe49c064642212a7457375c539d58512e2fc Mon Sep 17 00:00:00 2001 From: Kale Blankenship Date: Tue, 20 Aug 2019 17:27:42 -0700 Subject: [PATCH] Fix settlement handling and add Message.SendSettled * Message.SendSettled allows sending messages settled when LinkSenderSettle is ModeMixed. --- client.go | 19 +++++++++---------- integration_test.go | 13 ++----------- types.go | 13 +++++++++---- 3 files changed, 20 insertions(+), 25 deletions(-) diff --git a/client.go b/client.go index 1264046c..ec17044c 100644 --- a/client.go +++ b/client.go @@ -390,8 +390,7 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, er var ( maxPayloadSize = int64(s.link.session.conn.peerMaxFrameSize) - maxTransferFrameHeader sndSettleMode = s.link.senderSettleMode - rcvSettleMode = s.link.receiverSettleMode - senderSettled = sndSettleMode != nil && *sndSettleMode == ModeSettled + senderSettled = sndSettleMode != nil && (*sndSettleMode == ModeSettled || (*sndSettleMode == ModeMixed && msg.SendSettled)) deliveryID = atomic.AddUint32(&s.link.session.nextDeliveryID, 1) ) @@ -416,16 +415,16 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, er fr.Payload = append([]byte(nil), buf...) fr.More = s.buf.len() > 0 if !fr.More { + // SSM=settled: overrides RSM; no acks. + // SSM=unsettled: sender should wait for receiver to ack + // RSM=first: receiver considers it settled immediately, but must still send ack (SSM=unsettled only) + // RSM=second: receiver sends ack and waits for return ack from sender (SSM=unsettled only) + // mark final transfer as settled when sender mode is settled fr.Settled = senderSettled - // set done on last frame to be closed after network transmission - // - // If confirmSettlement is true (ReceiverSettleMode == "second"), - // Session.mux will intercept the done channel and close it when the - // receiver has confirmed settlement instead of on net transmit. + // set done on last frame fr.done = make(chan deliveryState, 1) - fr.confirmSettlement = rcvSettleMode != nil && *rcvSettleMode == ModeSecond } select { @@ -750,9 +749,9 @@ func (s *Session) mux(remoteBegin *performBegin) { delete(handlesByDeliveryID, deliveryID) } - // if confirmSettlement requested, add done chan to map + // if not settled, add done chan to map // and clear from frame so conn doesn't close it. - if fr.confirmSettlement && fr.done != nil { + if !fr.Settled && fr.done != nil { settlementByDeliveryID[deliveryID] = fr.done fr.done = nil } diff --git a/integration_test.go b/integration_test.go index f755c769..87a67dfd 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1131,6 +1131,8 @@ func TestIssue48_ReceiverModeSecond(t *testing.T) { // Create a sender sender, err := session.NewSender( amqp.LinkTargetAddress(hubName), + amqp.LinkSenderSettle(amqp.ModeUnsettled), + amqp.LinkReceiverSettle(amqp.ModeFirst), ) if err != nil { t.Fatalf("%+v\n", err) @@ -1144,17 +1146,6 @@ func TestIssue48_ReceiverModeSecond(t *testing.T) { []byte("there"), }, }) - time.Sleep(1 * time.Second) // Have to wait long enough for disposition to come through. - if err != nil { - t.Fatalf("Unexpected error response: %+v", err) - } - - // Second send should get async error - err = sender.Send(context.Background(), &amqp.Message{ - Data: [][]byte{ - []byte("hello"), - }, - }) if err == nil { t.Fatal("Expected error, got nil") } diff --git a/types.go b/types.go index 06c496d1..7e030e51 100644 --- a/types.go +++ b/types.go @@ -1262,10 +1262,10 @@ type performTransfer struct { Payload []byte // optional channel to indicate to sender that transfer has completed + // + // Settled=true: closed when the transferred on network. + // Settled=false: closed when the receiver has confirmed settlement. done chan deliveryState - // complete when receiver has responded with disposition (ReceiverSettleMode = second) - // instead of when this message has been sent on network - confirmSettlement bool } func (t *performTransfer) frameBody() {} @@ -1726,6 +1726,11 @@ type Message struct { // encryption details). Footer Annotations + // Mark the message as settled when LinkSenderSettle is ModeMixed. + // + // This field is ignored when LinkSenderSettle is not ModeMixed. + SendSettled bool + receiver *Receiver // Receiver the message was received from deliveryID uint32 // used when sending disposition settled bool // whether transfer was settled by sender @@ -1810,7 +1815,7 @@ func (m *Message) MarshalBinary() ([]byte, error) { } func (m *Message) shouldSendDisposition() bool { - return !m.settled || (m.receiver.link.receiverSettleMode != nil && *m.receiver.link.receiverSettleMode == ModeSecond) + return !m.settled } func (m *Message) marshal(wr *buffer) error {