Skip to content

Commit

Permalink
test: rich test
Browse files Browse the repository at this point in the history
  • Loading branch information
woorui committed Aug 5, 2023
1 parent d9f6d13 commit ee7ba0b
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func TestFrameRoundTrip(t *testing.T) {

var (
obversedTag = frame.Tag(1)
backflowTag = frame.Tag(2)
payload = []byte("hello data frame")
backflow = []byte("hello backflow frame")
)

server := NewServer("zipper",
Expand All @@ -60,6 +62,8 @@ func TestFrameRoundTrip(t *testing.T) {
recorder := newFrameWriterRecorder("mockClient")
server.AddDownstreamServer("mockAddr", recorder)

assert.Equal(t, map[string]string{"mockAddr": "mockClient"}, server.Downstreams())

go func() {
server.ListenAndServe(ctx, testaddr)
}()
Expand All @@ -75,10 +79,13 @@ func TestFrameRoundTrip(t *testing.T) {
WithClientQuicConfig(DefalutQuicConfig),
WithClientTLSConfig(nil),
WithLogger(discardingLogger),
WithObserveDataTags(backflowTag),
WithConnectUntilSucceed(),
WithNonBlockWrite(),
)

source.SetBackflowFrameObserver(func(bf *frame.BackflowFrame) {
assert.Equal(t, string(payload), string(bf.Carriage))
assert.Equal(t, string(backflow), string(bf.Carriage))
})

err = source.Connect(ctx, testaddr)
Expand Down Expand Up @@ -124,32 +131,42 @@ func TestFrameRoundTrip(t *testing.T) {
exited = checkClientExited(sameNameSfn, time.Second)
assert.False(t, exited, "the new sfn stream should not exited")

mdBytes, _ := NewDefaultMetadata(source.clientID, true, "tid").Encode()

err = sameNameSfn.WriteFrame(&frame.DataFrame{Tag: backflowTag, Metadata: mdBytes, Payload: backflow})
assert.NoError(t, err)

stats := server.StatsFunctions()
nameList := []string{}
for _, name := range stats {
nameList = append(nameList, name)
}
assert.ElementsMatch(t, nameList, []string{"source", "sfn-1"})

md, _ := metadata.New(
md := metadata.New(
NewDefaultMetadata(source.clientID, true, "tid"),
metadata.M{
"foo": "bar",
},
).Encode()
)
mdBytes, _ = md.Encode()

dataFrame := &frame.DataFrame{
Tag: obversedTag,
Metadata: md,
Metadata: mdBytes,
Payload: payload,
}
dataFrameEncoded, _ := y3codec.Codec().Encode(dataFrame)

err = source.WriteFrame(dataFrame)
assert.NoError(t, err, "source write dataFrame must be success")

time.Sleep(2 * time.Second)
assert.Equal(t, recorder.frameBytes(), dataFrameEncoded)

reocrdTag, reocrdMD, reocrdPayload := recorder.dataFrameContent()
assert.Equal(t, reocrdTag, dataFrame.Tag)
// raw metadata cant be compared because metadata is not ordered.
assert.Equal(t, reocrdMD, md)
assert.Equal(t, reocrdPayload, dataFrame.Payload)

assert.NoError(t, source.Close(), "source client.Close() should not return error")
assert.NoError(t, sfn.Close(), "sfn client.Close() should not return error")
Expand Down Expand Up @@ -241,10 +258,12 @@ func (w *frameWriterRecorder) WriteFrame(f frame.Frame) error {
return err
}

func (w *frameWriterRecorder) frameBytes() []byte {
func (w *frameWriterRecorder) dataFrameContent() (frame.Tag, metadata.M, []byte) {
w.mu.Lock()
defer w.mu.Unlock()

bytes := w.buf.Bytes()
return bytes
dataFrame := new(frame.DataFrame)
y3codec.Codec().Decode(w.buf.Bytes(), dataFrame)
md, _ := metadata.Decode(dataFrame.Metadata)
return dataFrame.Tag, md, dataFrame.Payload
}

0 comments on commit ee7ba0b

Please sign in to comment.