Skip to content

Commit

Permalink
feat: version negotiation (yomorun#639)
Browse files Browse the repository at this point in the history
# Description

Adding version negotiation between `zipper` and `source/sfn`, if version
negotiation failed, the `source/sfn` cannot connect to `zipper`.

The version format must follow the `Major.Minor.Patch` formatting, and
the Major, Minor, and Patch components must be numeric.

The client‘s MAJOR and MINOR versions should equal to server's,
otherwise, the zipper will be considered has break-change, the handshake
will fail.


## Impact

This is a break-change. It will not be possible to establish a
connection with the `zipper` if the `source/sfn` does not send the
version field.
  • Loading branch information
woorui authored Dec 15, 2023
1 parent 2402767 commit 0ca2c1a
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 29 deletions.
1 change: 1 addition & 0 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (c *Client) connect(ctx context.Context, addr string) (frame.Conn, error) {
ObserveDataTags: c.opts.observeDataTags,
AuthName: c.opts.credential.Name(),
AuthPayload: c.opts.credential.Payload(),
Version: Version,
}

if err := conn.WriteFrame(hf); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func newContext(conn *Connection, route router.Route, df *frame.DataFrame) (c *C
return
}

// CloseWithError close dataStream with an error string.
// CloseWithError close connection with an error string.
func (c *Context) CloseWithError(errString string) {
c.Logger.Debug("connection closed", "err", errString)

Expand Down
11 changes: 8 additions & 3 deletions core/frame/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,23 @@ func (f *DataFrame) Type() Type { return TypeDataFrame }
// It includes essential details required for the creation of a fresh connection.
// The server then generates the connection utilizing this provided information.
type HandshakeFrame struct {
// Name is the name of the dataStream that will be created.
// Name is the name of the connection that will be created.
Name string
// ID is the ID of the dataStream that will be created.
// ID is the ID of the connection that will be created.
ID string
// ClientType is the type of client.
ClientType byte
// ObserveDataTags is the ObserveDataTags of the dataStream that will be created.
// ObserveDataTags is the ObserveDataTags of the connection that will be created.
ObserveDataTags []Tag
// AuthName is the authentication name.
AuthName string
// AuthPayload is the authentication payload.
AuthPayload string
// Version is used by the source/sfn to communicate their version to the server.
// The Version format must follow the `Major.Minor.Patch`. otherwise, the handshake
// will fail. The client‘s MAJOR and MINOR versions should equal to server's,
// otherwise, the zipper will be considered has break-change, the handshake will fail.
Version string
}

// Type returns the type of HandshakeFrame.
Expand Down
66 changes: 46 additions & 20 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
yquic "github.com/yomorun/yomo/pkg/listener/quic"
pkgtls "github.com/yomorun/yomo/pkg/tls"
"github.com/yomorun/yomo/pkg/version"
oteltrace "go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -152,7 +153,22 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {
}
}

func (s *Server) handshake(fconn frame.Conn) (bool, router.Route, *Connection) {
func (s *Server) handleFrameConn(fconn frame.Conn, logger *slog.Logger) {
route, conn, err := s.handshake(fconn)
if err != nil {
logger.Error("handshake failed", "err", err)
return
}

s.connHandler(conn, route) // s.handleConnRoute(conn, route) with middlewares

if conn.ClientType() == ClientTypeStreamFunction {
_ = route.Remove(conn.ID())
}
_ = s.connector.Remove(conn.ID())
}

func (s *Server) handshake(fconn frame.Conn) (router.Route, *Connection, error) {
var gerr error

defer func() {
Expand All @@ -166,7 +182,7 @@ func (s *Server) handshake(fconn frame.Conn) (bool, router.Route, *Connection) {
first, err := fconn.ReadFrame()
if err != nil {
gerr = err
return false, nil, nil
return nil, nil, gerr
}
switch first.Type() {
case frame.TypeHandshakeFrame:
Expand All @@ -175,17 +191,17 @@ func (s *Server) handshake(fconn frame.Conn) (bool, router.Route, *Connection) {
conn, err := s.handleHandshakeFrame(fconn, hf)
if err != nil {
gerr = err
return false, nil, conn
return nil, conn, gerr
}

route, err := s.addSfnToRoute(hf, conn.Metadata())
if err != nil {
gerr = err
}
return true, route, conn
return route, conn, gerr
default:
gerr = fmt.Errorf("yomo: handshake read unexpected frame, read: %s", first.Type().String())
return false, nil, nil
return nil, nil, gerr
}
}

Expand Down Expand Up @@ -216,22 +232,8 @@ func (s *Server) handleConnRoute(conn *Connection, route router.Route) {
}
}

func (s *Server) handleFrameConn(fconn frame.Conn, logger *slog.Logger) {
ok, route, conn := s.handshake(fconn)
if !ok {
logger.Error("handshake failed")
return
}

s.connHandler(conn, route) // s.handleConnRoute(conn, route) with middlewares

if conn.ClientType() == ClientTypeStreamFunction {
_ = route.Remove(conn.ID())
}
_ = s.connector.Remove(conn.ID())
}

func (s *Server) handleHandshakeFrame(fconn frame.Conn, hf *frame.HandshakeFrame) (*Connection, error) {
// 1. authentication
md, ok := auth.Authenticate(s.opts.auths, hf)

if !ok {
Expand All @@ -243,6 +245,11 @@ func (s *Server) handleHandshakeFrame(fconn frame.Conn, hf *frame.HandshakeFrame
return nil, fmt.Errorf("authentication failed: client credential type is %s", hf.AuthName)
}

// 2. version negotiation
if err := negotiateVersion(hf.Version, Version); err != nil {
return nil, err
}

conn := newConnection(hf.Name, hf.ID, ClientType(hf.ClientType), md, hf.ObserveDataTags, fconn, s.logger)

return conn, s.connector.Store(hf.ID, conn)
Expand All @@ -263,6 +270,25 @@ func (s *Server) addSfnToRoute(hf *frame.HandshakeFrame, md metadata.M) (router.
return route, nil
}

func negotiateVersion(cVersion, sVersion string) error {
cv, err := version.Parse(cVersion)
if err != nil {
return err
}

sv, err := version.Parse(sVersion)
if err != nil {
return err
}

// If the Major and Minor versions are equal, the server can serve the client.
if cv.Major == sv.Major && cv.Minor == sv.Minor {
return nil
}

return fmt.Errorf("yomo: version negotiation failed, client=%s, server=%s", cVersion, sVersion)
}

func (s *Server) handleFrame(c *Context) {
// routing data frame.
if err := s.routingDataFrame(c); err != nil {
Expand Down
44 changes: 44 additions & 0 deletions core/server_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -44,3 +45,46 @@ func (s *mockConnectionInfo) Name() string { return s.name }
func (s *mockConnectionInfo) Metadata() metadata.M { return s.metadata }
func (s *mockConnectionInfo) ClientType() ClientType { return s.clientType }
func (s *mockConnectionInfo) ObserveDataTags() []frame.Tag { return s.observed }

func Test_negotiateVersion(t *testing.T) {
type args struct {
cVersion string
sVersion string
}
tests := []struct {
name string
args args
wantErr error
}{
{
name: "ok",
args: args{
cVersion: "1.16.3",
sVersion: "1.16.3",
},
wantErr: nil,
},
{
name: "client empty version",
args: args{
cVersion: "",
sVersion: "1.16.3",
},
wantErr: errors.New("invalid semantic version, params="),
},
{
name: "not ok",
args: args{
cVersion: "1.15.0",
sVersion: "1.16.3",
},
wantErr: errors.New("yomo: version negotiation failed, client=1.15.0, server=1.16.3"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := negotiateVersion(tt.args.cVersion, tt.args.sVersion)
assert.Equal(t, tt.wantErr, err)
})
}
}
4 changes: 4 additions & 0 deletions core/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package core

// Version is the current yomo version.
const Version = "1.17.0"
8 changes: 4 additions & 4 deletions pkg/frame-codec/y3codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func TestCodec(t *testing.T) {
ObserveDataTags: []uint32{'a', 'b', 'c'},
AuthName: "ddddd",
AuthPayload: "eeeee",
Version: "1.16.3",
},
data: []byte{0xb1, 0x31, 0x1, 0x8, 0x74, 0x68, 0x65, 0x2d, 0x6e, 0x61,
data: []byte{0xb1, 0x39, 0x1, 0x8, 0x74, 0x68, 0x65, 0x2d, 0x6e, 0x61,
0x6d, 0x65, 0x3, 0x6, 0x74, 0x68, 0x65, 0x2d, 0x69, 0x64, 0x2, 0x1,
0x68, 0x6, 0xc, 0x61, 0x0, 0x0, 0x0, 0x62, 0x0, 0x0, 0x0, 0x63, 0x0,
0x0, 0x0, 0x4, 0x5, 0x64, 0x64, 0x64, 0x64, 0x64, 0x5, 0x5, 0x65,
0x65, 0x65, 0x65, 0x65,
},
0x0, 0x0, 0x4, 0x5, 0x64, 0x64, 0x64, 0x64, 0x64, 0x5, 0x5, 0x65, 0x65,
0x65, 0x65, 0x65, 0x7, 0x6, 0x31, 0x2e, 0x31, 0x36, 0x2e, 0x33},
},
},
{
Expand Down
15 changes: 14 additions & 1 deletion pkg/frame-codec/y3codec/handshake_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func encodeHandshakeFrame(f *frame.HandshakeFrame) ([]byte, error) {
// auth payload
authPayloadBlock := y3.NewPrimitivePacketEncoder(tagAuthenticationPayload)
authPayloadBlock.SetStringValue(f.AuthPayload)
// version
versionBlock := y3.NewPrimitivePacketEncoder(tagHandshakeVersion)
versionBlock.SetStringValue(f.Version)

// handshake frame
handshake := y3.NewNodePacketEncoder(byte(f.Type()))
Expand All @@ -40,6 +43,7 @@ func encodeHandshakeFrame(f *frame.HandshakeFrame) ([]byte, error) {
handshake.AddPrimitivePacket(observeDataTagsBlock)
handshake.AddPrimitivePacket(authNameBlock)
handshake.AddPrimitivePacket(authPayloadBlock)
handshake.AddPrimitivePacket(versionBlock)

return handshake.Encode(), nil
}
Expand Down Expand Up @@ -98,15 +102,24 @@ func decodeHandshakeFrame(data []byte, f *frame.HandshakeFrame) error {
}
f.AuthPayload = authPayload
}
// version
if versionBlock, ok := node.PrimitivePackets[tagHandshakeVersion]; ok {
version, err := versionBlock.ToUTF8String()
if err != nil {
return err
}
f.Version = version
}

return nil
}

var (
const (
tagHandshakeName byte = 0x01
tagHandshakeClientType byte = 0x02
tagHandshakeID byte = 0x03
tagAuthenticationName byte = 0x04
tagAuthenticationPayload byte = 0x05
tagHandshakeObserveDataTags byte = 0x06
tagHandshakeVersion byte = 0x07
)
44 changes: 44 additions & 0 deletions pkg/version/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Package version provides functionality for parsing versions..
package version

import (
"fmt"
"strconv"
"strings"
)

// Version is used by the source/sfn to communicate their version to the server.
type Version struct {
Major int
Minor int
Patch int
}

// Parse parses a string into a Version. The string format must follow the `Major.Minor.Patch`
// formatting, and the Major, Minor, and Patch components must be numeric. If they are not,
// a parse error will be returned.
func Parse(str string) (*Version, error) {
vs := strings.Split(str, ".")
if len(vs) != 3 {
return nil, fmt.Errorf("invalid semantic version, params=%s", str)
}

major, err := strconv.Atoi(vs[0])
if err != nil {
return nil, fmt.Errorf("invalid version major, params=%s", str)
}

minor, err := strconv.Atoi(vs[1])
if err != nil {
return nil, fmt.Errorf("invalid version minor, params=%s", str)
}

patch, err := strconv.Atoi(vs[2])
if err != nil {
return nil, fmt.Errorf("invalid version patch, params=%s", str)
}

ver := &Version{Major: major, Minor: minor, Patch: patch}

return ver, nil
}
Loading

0 comments on commit 0ca2c1a

Please sign in to comment.