Skip to content

Commit 7166a5d

Browse files
authored
Merge pull request #2421 from openziti/no-common-xg-header
Remove common header type from xgress messages since the overlap isn't complete
2 parents 87a866d + 776db43 commit 7166a5d

File tree

9 files changed

+241
-176
lines changed

9 files changed

+241
-176
lines changed

router/xgress/acker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import (
77
"sync/atomic"
88
)
99

10-
var acker *Acker
10+
var acker ackSender
11+
12+
type ackSender interface {
13+
ack(ack *Acknowledgement, address Address)
14+
}
1115

1216
func InitAcker(forwarder PayloadBufferForwarder, metrics metrics.Registry, closeNotify <-chan struct{}) {
1317
acker = NewAcker(forwarder, metrics, closeNotify)

router/xgress/messages.go

Lines changed: 90 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -62,80 +62,45 @@ func (o Originator) String() string {
6262
return "Terminator"
6363
}
6464

65-
type PayloadFlag uint32
65+
type Flag uint32
6666

6767
const (
68-
PayloadFlagCircuitEnd PayloadFlag = 1
69-
PayloadFlagOriginator PayloadFlag = 2
70-
PayloadFlagCircuitStart PayloadFlag = 4
71-
PayloadFlagChunk PayloadFlag = 8
68+
PayloadFlagCircuitEnd Flag = 1
69+
PayloadFlagOriginator Flag = 2
70+
PayloadFlagCircuitStart Flag = 4
71+
PayloadFlagChunk Flag = 8
7272
)
7373

74-
type Header struct {
74+
func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
75+
return &Acknowledgement{
76+
CircuitId: circuitId,
77+
Flags: SetOriginatorFlag(0, originator),
78+
}
79+
}
80+
81+
type Acknowledgement struct {
7582
CircuitId string
7683
Flags uint32
7784
RecvBufferSize uint32
7885
RTT uint16
86+
Sequence []int32
7987
}
8088

81-
func (header *Header) GetCircuitId() string {
82-
return header.CircuitId
89+
func (ack *Acknowledgement) GetCircuitId() string {
90+
return ack.CircuitId
8391
}
8492

85-
func (header *Header) GetFlags() uint32 {
86-
return header.Flags
93+
func (ack *Acknowledgement) GetFlags() uint32 {
94+
return ack.Flags
8795
}
8896

89-
func (header *Header) GetOriginator() Originator {
90-
if isPayloadFlagSet(header.Flags, PayloadFlagOriginator) {
97+
func (ack *Acknowledgement) GetOriginator() Originator {
98+
if isFlagSet(ack.Flags, PayloadFlagOriginator) {
9199
return Terminator
92100
}
93101
return Initiator
94102
}
95103

96-
func (header *Header) unmarshallHeader(msg *channel.Message) error {
97-
circuitId, ok := msg.Headers[HeaderKeyCircuitId]
98-
if !ok {
99-
return fmt.Errorf("no circuitId found in xgress payload message")
100-
}
101-
102-
// If no flags are present, it just means no flags have been set
103-
flags, _ := msg.GetUint32Header(HeaderKeyFlags)
104-
105-
header.CircuitId = string(circuitId)
106-
header.Flags = flags
107-
if header.RecvBufferSize, ok = msg.GetUint32Header(HeaderKeyRecvBufferSize); !ok {
108-
header.RecvBufferSize = math.MaxUint32
109-
}
110-
111-
header.RTT, _ = msg.GetUint16Header(HeaderKeyRTT)
112-
113-
return nil
114-
}
115-
116-
func (header *Header) marshallHeader(msg *channel.Message) {
117-
msg.Headers[HeaderKeyCircuitId] = []byte(header.CircuitId)
118-
if header.Flags != 0 {
119-
msg.PutUint32Header(HeaderKeyFlags, header.Flags)
120-
}
121-
122-
msg.PutUint32Header(HeaderKeyRecvBufferSize, header.RecvBufferSize)
123-
}
124-
125-
func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
126-
return &Acknowledgement{
127-
Header: Header{
128-
CircuitId: circuitId,
129-
Flags: SetOriginatorFlag(0, originator),
130-
},
131-
}
132-
}
133-
134-
type Acknowledgement struct {
135-
Header
136-
Sequence []int32
137-
}
138-
139104
func (ack *Acknowledgement) GetSequence() []int32 {
140105
return ack.Sequence
141106
}
@@ -174,16 +139,33 @@ func (ack *Acknowledgement) unmarshallSequence(data []byte) error {
174139
func (ack *Acknowledgement) Marshall() *channel.Message {
175140
msg := channel.NewMessage(ContentTypeAcknowledgementType, ack.marshallSequence())
176141
msg.PutUint16Header(HeaderKeyRTT, ack.RTT)
177-
ack.marshallHeader(msg)
142+
msg.Headers[HeaderKeyCircuitId] = []byte(ack.CircuitId)
143+
if ack.Flags != 0 {
144+
msg.PutUint32Header(HeaderKeyFlags, ack.Flags)
145+
}
146+
msg.PutUint32Header(HeaderKeyRecvBufferSize, ack.RecvBufferSize)
178147
return msg
179148
}
180149

181150
func UnmarshallAcknowledgement(msg *channel.Message) (*Acknowledgement, error) {
182151
ack := &Acknowledgement{}
183152

184-
if err := ack.unmarshallHeader(msg); err != nil {
185-
return nil, err
153+
circuitId, ok := msg.Headers[HeaderKeyCircuitId]
154+
if !ok {
155+
return nil, fmt.Errorf("no circuitId found in xgress payload message")
186156
}
157+
158+
// If no flags are present, it just means no flags have been set
159+
flags, _ := msg.GetUint32Header(HeaderKeyFlags)
160+
161+
ack.CircuitId = string(circuitId)
162+
ack.Flags = flags
163+
if ack.RecvBufferSize, ok = msg.GetUint32Header(HeaderKeyRecvBufferSize); !ok {
164+
ack.RecvBufferSize = math.MaxUint32
165+
}
166+
167+
ack.RTT, _ = msg.GetUint16Header(HeaderKeyRTT)
168+
187169
if err := ack.unmarshallSequence(msg.Body); err != nil {
188170
return nil, err
189171
}
@@ -201,10 +183,12 @@ func (ack *Acknowledgement) GetLoggerFields() logrus.Fields {
201183
}
202184

203185
type Payload struct {
204-
Header
205-
Sequence int32
206-
Headers map[uint8][]byte
207-
Data []byte
186+
CircuitId string
187+
Flags uint32
188+
RTT uint16
189+
Sequence int32
190+
Headers map[uint8][]byte
191+
Data []byte
208192
}
209193

210194
func (payload *Payload) GetSequence() int32 {
@@ -213,17 +197,25 @@ func (payload *Payload) GetSequence() int32 {
213197

214198
func (payload *Payload) Marshall() *channel.Message {
215199
msg := channel.NewMessage(ContentTypePayloadType, payload.Data)
216-
for key, value := range payload.Headers {
217-
msgHeaderKey := MinHeaderKey + int32(key)
218-
msg.Headers[msgHeaderKey] = value
200+
addPayloadHeadersToMsg(msg, payload.Headers)
201+
msg.Headers[HeaderKeyCircuitId] = []byte(payload.CircuitId)
202+
if payload.Flags != 0 {
203+
msg.PutUint32Header(HeaderKeyFlags, payload.Flags)
219204
}
220-
payload.marshallHeader(msg)
205+
221206
msg.PutUint64Header(HeaderKeySequence, uint64(payload.Sequence))
222207
msg.PutUint16Header(HeaderKeyRTT, uint16(info.NowInMilliseconds()))
223208

224209
return msg
225210
}
226211

212+
func addPayloadHeadersToMsg(msg *channel.Message, headers map[uint8][]byte) {
213+
for key, value := range headers {
214+
msgHeaderKey := MinHeaderKey + int32(key)
215+
msg.Headers[msgHeaderKey] = value
216+
}
217+
}
218+
227219
func UnmarshallPayload(msg *channel.Message) (*Payload, error) {
228220
var headers map[uint8][]byte
229221
for key, val := range msg.Headers {
@@ -241,10 +233,19 @@ func UnmarshallPayload(msg *channel.Message) (*Payload, error) {
241233
Data: msg.Body,
242234
}
243235

244-
if err := payload.unmarshallHeader(msg); err != nil {
245-
return nil, err
236+
circuitId, ok := msg.Headers[HeaderKeyCircuitId]
237+
if !ok {
238+
return nil, fmt.Errorf("no circuitId found in xgress payload message")
246239
}
247240

241+
// If no flags are present, it just means no flags have been set
242+
flags, _ := msg.GetUint32Header(HeaderKeyFlags)
243+
244+
payload.CircuitId = string(circuitId)
245+
payload.Flags = flags
246+
247+
payload.RTT, _ = msg.GetUint16Header(HeaderKeyRTT)
248+
248249
sequence, ok := msg.GetUint64Header(HeaderKeySequence)
249250
if !ok {
250251
return nil, fmt.Errorf("no sequence found in xgress payload message")
@@ -254,20 +255,35 @@ func UnmarshallPayload(msg *channel.Message) (*Payload, error) {
254255
return payload, nil
255256
}
256257

257-
func isPayloadFlagSet(flags uint32, flag PayloadFlag) bool {
258-
return PayloadFlag(flags)&flag == flag
258+
func isFlagSet(flags uint32, flag Flag) bool {
259+
return Flag(flags)&flag == flag
259260
}
260261

261-
func setPayloadFlag(flags uint32, flag PayloadFlag) uint32 {
262-
return uint32(PayloadFlag(flags) | flag)
262+
func setPayloadFlag(flags uint32, flag Flag) uint32 {
263+
return uint32(Flag(flags) | flag)
264+
}
265+
266+
func (payload *Payload) GetCircuitId() string {
267+
return payload.CircuitId
268+
}
269+
270+
func (payload *Payload) GetFlags() uint32 {
271+
return payload.Flags
263272
}
264273

265274
func (payload *Payload) IsCircuitEndFlagSet() bool {
266-
return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitEnd)
275+
return isFlagSet(payload.Flags, PayloadFlagCircuitEnd)
267276
}
268277

269278
func (payload *Payload) IsCircuitStartFlagSet() bool {
270-
return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitStart)
279+
return isFlagSet(payload.Flags, PayloadFlagCircuitStart)
280+
}
281+
282+
func (payload *Payload) GetOriginator() Originator {
283+
if isFlagSet(payload.Flags, PayloadFlagOriginator) {
284+
return Terminator
285+
}
286+
return Initiator
271287
}
272288

273289
func SetOriginatorFlag(flags uint32, originator Originator) uint32 {

router/xgress/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ var buffersBlockedByRemoteWindow int64
2020
var outstandingPayloads int64
2121
var outstandingPayloadBytes int64
2222

23-
func InitMetrics(registry metrics.UsageRegistry) {
23+
func InitMetrics(registry metrics.Registry) {
2424
droppedPayloadsMeter = registry.Meter("xgress.dropped_payloads")
2525
retransmissions = registry.Meter("xgress.retransmissions")
2626
retransmissionFailures = registry.Meter("xgress.retransmission_failures")

router/xgress/ordering_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,12 @@ func Test_Ordering(t *testing.T) {
9090
data := make([]byte, 8)
9191
binary.LittleEndian.PutUint64(data, uint64(i))
9292
payload := &Payload{
93-
Header: Header{
94-
CircuitId: "test",
95-
Flags: SetOriginatorFlag(0, Terminator),
96-
RecvBufferSize: 16000,
97-
RTT: 0,
98-
},
99-
Sequence: int32(i),
100-
Headers: nil,
101-
Data: data,
93+
CircuitId: "test",
94+
Flags: SetOriginatorFlag(0, Terminator),
95+
RTT: 0,
96+
Sequence: int32(i),
97+
Headers: nil,
98+
Data: data,
10299
}
103100
if err := x.SendPayload(payload); err != nil {
104101
errorCh <- err

0 commit comments

Comments
 (0)