Skip to content

Commit

Permalink
Support for new, flexible wire protocol (V4):
Browse files Browse the repository at this point in the history
The previous protocol is still supported for communication with servers that do not yet support V4. The
version negotation is internal and automatic; however, use of V4 features will fail
at runtime when attempted with an older server. Failure may be an empty or
undefined result or an exception if the request cannot be serviced at all. The following
new features or interfaces depend on the new protocol version:
 - added Durability to QueryRequest for queries that modify data
 - added pagination information to TableUsageResult and TableUsageRequest
 - added shard percent usage information to TableUsageResult
 - added IndexInfo.FieldTypes to return the type information on an index on a JSON field
 - added the ability to ask for and receive the schema of a query using
     * PrepareRequest.GetQuerySchema
     * PreparedStatement.GetQuerySchema
 - Cloud only: added use of ETags, DefinedTags and FreeFormTags in TableRequest and TableResult
  • Loading branch information
connelly38 committed Dec 15, 2022
1 parent e779c76 commit 4e26107
Show file tree
Hide file tree
Showing 25 changed files with 4,833 additions and 1,630 deletions.
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,25 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/).

## Unreleased
## 1.4.0 - 2022-12-15

### Added
Support for new, flexible wire protocol (V4):

The previous protocol is still supported for communication with servers that do not yet support V4. The
version negotation is internal and automatic; however, use of V4 features will fail
at runtime when attempted with an older server. Failure may be an empty or
undefined result or an exception if the request cannot be serviced at all. The following
new features or interfaces depend on the new protocol version:
- added Durability to QueryRequest for queries that modify data
- added pagination information to TableUsageResult and TableUsageRequest
- added shard percent usage information to TableUsageResult
- added IndexInfo.FieldTypes to return the type information on an index on a JSON field
- added the ability to ask for and receive the schema of a query using
* PrepareRequest.GetQuerySchema
* PreparedStatement.GetQuerySchema
- Cloud only: added use of ETags, DefinedTags and FreeFormTags in TableRequest and TableResult

- Latest Oracle Cloud Infrastructure regions and region codes: SGU, IFP, GCN

## 1.3.2 - 2022-10-18
Expand Down
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ testcases ?=
options ?=
examples := basic delete index

GOTEST := $(GOENV) $(GO) test -timeout 20m -count 1 -run "$(testcases)" -v $(options)
# Enable to get code coverage from tests
# afterwards, run go tool cover -html=nosqldb/cover.out
#COVER := -coverprofile cover.out

GOTEST := $(GOENV) $(GO) test $(COVER) -timeout 20m -count 1 -run "$(testcases)" -v $(options)

.PHONY: all build test cloudsim-test onprem-test clean lint build-examples release $(examples) help

all: build

# compile all packages
build:
cd $(SRC) && $(GOENV) $(GO) build -v ./...
cd $(SRC) && $(GOENV) $(GO) build -gcflags="-e" -v ./...

# run tests
test:
Expand All @@ -54,7 +58,7 @@ clean:

# lint check
lint:
cd $(SRC) && golint -set_exit_status -min_confidence 0.3 ./...
cd $(SRC) && $(GO) vet

# compile examples
build-examples: $(examples)
Expand Down
20 changes: 11 additions & 9 deletions internal/test/cloudsim_config.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{
"version": "1.3.2",
"tablePrefix": "Go",
"reCreateTables": true,
"dropTablesOnTearDown": true,
"clientConfig": {
"mode": "cloudsim",
"endpoint": "http://localhost:8080"
}
}
"version": "21.2.19",
"tablePrefix": "Go",
"reCreateTables": true,
"verbose": true,
"dropTablesOnTearDown": true,
"clientConfig": {
"mode": "cloudsim",
"endpoint": "http://localhost:8080",
"rateLimitingEnabled": true
}
}
11 changes: 11 additions & 0 deletions internal/test/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Config struct {

// For extended testing
RunExtended bool `json:"runExtended"`

// For testing
SerialVersion int16 `json:"serialVersion"`
}

// newConfig creates a test configuration object from the specified JSON file.
Expand Down Expand Up @@ -148,6 +151,14 @@ func createClient(cfg *Config) (*nosqldb.Client, error) {
return nil, err
}

// if specified, force a specific serial version
if cfg.SerialVersion != 0 {
if cfg.Verbose {
fmt.Printf("Setting client serial version to %d\n", cfg.SerialVersion)
}
client.SetSerialVersion(cfg.SerialVersion)
}

// this will set the protocol serial version according to the connected server.
// ignore errors here, they may be expected.
client.VerifyConnection()
Expand Down
7 changes: 5 additions & 2 deletions nosqldb/bad_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (suite *BadProtocolTestSuite) SetupSuite() {
suite.bpTestClient, err = nosqldb.NewClient(suite.Client.Config)
suite.Require().NoErrorf(err, "failed to create a client, got error %v", err)

// Currently this test requires V3 or lower
suite.bpTestClient.SetSerialVersion(3)

// this will set the serial protocol version. Ignore errors from it.
suite.bpTestClient.VerifyConnection()

Expand Down Expand Up @@ -111,7 +114,7 @@ func (suite *BadProtocolTestSuite) createTableAndIndex() {

// processTestResponse is a custom handleResponse function for the Client.
// It checks error code from the response, does not parse the response content.
func processTestResponse(httpResp *http.Response, req nosqldb.Request) (nosqldb.Result, error) {
func processTestResponse(httpResp *http.Response, req nosqldb.Request, serialVerUsed int16) (nosqldb.Result, error) {
data, err := ioutil.ReadAll(httpResp.Body)
httpResp.Body.Close()
if err != nil {
Expand Down Expand Up @@ -765,7 +768,7 @@ func (suite *BadProtocolTestSuite) TestBadWriteMultipleRequest() {
suite.wr.Reset()
suite.wr.WriteOpCode(v)
copy(data[off:], suite.wr.Bytes())
suite.doBadProtoTest(req, data, desc, nosqlerr.BadProtocolMessage)
suite.doBadProtoTest2(req, data, desc, nosqlerr.BadProtocolMessage, nosqlerr.IllegalArgument)
}

}
Expand Down
51 changes: 37 additions & 14 deletions nosqldb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
//"os"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -68,7 +69,7 @@ type Client struct {
// handleResponse specifies a function that is used to handle the response
// returned from server.
// This is used internally by tests for customizing response processing.
handleResponse func(httpResp *http.Response, req Request) (Result, error)
handleResponse func(httpResp *http.Response, req Request, serialVerUsed int16) (Result, error)

// isCloud represents whether the client connects to the cloud service or
// cloud simulator.
Expand Down Expand Up @@ -1103,7 +1104,7 @@ func (c *Client) doExecute(ctx context.Context, req Request, data []byte, serial
continue
}

result, err = c.handleResponse(httpResp, req)
result, err = c.handleResponse(httpResp, req, serialVerUsed)
// Cancel request context after response body has been read.
reqCancel()
if err != nil {
Expand Down Expand Up @@ -1348,14 +1349,20 @@ func (c *Client) signHTTPRequest(httpReq *http.Request) error {
// will be sent to the server. The serial version is always written followed by
// the actual request payload.
func (c *Client) serializeRequest(req Request) (data []byte, serialVerUsed int16, err error) {
wr := binary.NewWriter()
serialVerUsed = c.serialVersion
wr := binary.NewWriter()
if _, err = wr.WriteSerialVersion(serialVerUsed); err != nil {
return nil, 0, err
}

if err = req.serialize(wr, serialVerUsed); err != nil {
return nil, 0, err
if serialVerUsed >= 4 {
if err = req.serialize(wr, serialVerUsed); err != nil {
return nil, 0, err
}
} else {
if err = req.serializeV3(wr, serialVerUsed); err != nil {
return nil, 0, err
}
}

return wr.Bytes(), serialVerUsed, nil
Expand All @@ -1366,7 +1373,7 @@ func (c *Client) serializeRequest(req Request) (data []byte, serialVerUsed int16
// If the http response status code is 200, this method reads in response
// content and parses them as an appropriate result suitable for the request.
// Otherwise, it returns the http error.
func (c *Client) processResponse(httpResp *http.Response, req Request) (Result, error) {
func (c *Client) processResponse(httpResp *http.Response, req Request, serialVerUsed int16) (Result, error) {
data, err := ioutil.ReadAll(httpResp.Body)
httpResp.Body.Close()
if err != nil {
Expand All @@ -1375,31 +1382,41 @@ func (c *Client) processResponse(httpResp *http.Response, req Request) (Result,

if httpResp.StatusCode == http.StatusOK {
c.setSessionCookie(httpResp.Header)
return c.processOKResponse(data, req)
return c.processOKResponse(data, req, serialVerUsed)
}

return nil, c.processNotOKResponse(data, httpResp.StatusCode)
}

func (c *Client) processOKResponse(data []byte, req Request) (Result, error) {
func (c *Client) processOKResponse(data []byte, req Request, serialVerUsed int16) (res Result, err error) {
buf := bytes.NewBuffer(data)
rd := binary.NewReader(buf)
code, err := rd.ReadByte()

var code int
if serialVerUsed >= 4 {
if res, code, err = req.deserialize(rd, serialVerUsed); err != nil {
return nil, wrapResponseErrors(int(code), err.Error())
}
if queryReq, ok := req.(*QueryRequest); ok && !queryReq.isSimpleQuery() {
queryReq.driver.client = c
}
return res, nil
}

// V3
bcode, err := rd.ReadByte()
if err != nil {
return nil, err
}

code = int(bcode)
// A zero byte represents the operation succeeded.
if code == 0 {
res, err := req.deserialize(rd, c.serialVersion)
if err != nil {
if res, err = req.deserializeV3(rd, serialVerUsed); err != nil {
return nil, err
}

if queryReq, ok := req.(*QueryRequest); ok && !queryReq.isSimpleQuery() {
queryReq.driver.client = c
}

return res, nil
}

Expand Down Expand Up @@ -1548,6 +1565,7 @@ func (c *Client) decrementSerialVersion(serialVerUsed int16) bool {
}
if c.serialVersion > 2 {
c.serialVersion--
c.logger.Fine("Decremented serial version to %d\n", c.serialVersion)
return true
}
return false
Expand All @@ -1558,6 +1576,11 @@ func (c *Client) GetSerialVersion() int16 {
return c.serialVersion
}

// SetSerialVersion is used for tests. Do not use this in regular client code.
func (c *Client) SetSerialVersion(sVer int16) {
c.serialVersion = sVer
}

func (c *Client) oneTimeMessage(msg string) {
if _, ok := c.oneTimeMessages[msg]; ok == false {
c.oneTimeMessages[msg] = struct{}{}
Expand Down
5 changes: 5 additions & 0 deletions nosqldb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
func TestExecuteErrorHandling(t *testing.T) {
client, err := newMockClient()
require.NoErrorf(t, err, "failed to create client, got error %v.", err)

// This test is very specific to V2/3 protocol.
// TODO: V4 protocol error tests
client.SetSerialVersion(3)

// GetRequest is a retryable request.
getReq := &GetRequest{
TableName: "T1",
Expand Down
2 changes: 1 addition & 1 deletion nosqldb/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// This file exports functions/methods that are used in test codes.

type HandleResponse func(httpResp *http.Response, req Request) (Result, error)
type HandleResponse func(httpResp *http.Response, req Request, serialVerUsed int16) (Result, error)

func (c *Client) SetResponseHandler(fn HandleResponse) {
c.handleResponse = fn
Expand Down
25 changes: 18 additions & 7 deletions nosqldb/internal/proto/binary/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package binary

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -28,25 +29,25 @@ import (
// Reader implements the io.Reader and io.ByteReader interfaces.
type Reader struct {
// The underlying io.Reader.
rd io.Reader
rd *bytes.Buffer

// A buffer that holds the bytes for decoding.
buf []byte
}

// NewReader creates a reader for the binary protocol.
// If the provided io.Reader is already a binary protocol Reader, it returns
// the provided one without creating a new Reader.
func NewReader(r io.Reader) *Reader {
if r, ok := r.(*Reader); ok {
return r
}
func NewReader(r *bytes.Buffer) *Reader {
return &Reader{
rd: r,
buf: make([]byte, 64, 256),
}
}

// GetBuffer returns the underlying bytes Buffer.
func (r *Reader) GetBuffer() *bytes.Buffer {
return r.rd
}

// Read reads up to len(p) bytes into p.
// It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
func (r *Reader) Read(p []byte) (n int, err error) {
Expand Down Expand Up @@ -181,6 +182,16 @@ func (r *Reader) ReadString() (*string, error) {
return &s, nil
}

// ReadNonNilString reads a string. If there is an error, it will return
// an empty string and the error.
func (r *Reader) ReadNonNilString() (string, error) {
str, err := r.ReadString()
if str == nil || err != nil {
return "", err
}
return *str, nil
}

// ReadVersion reads byte sequences and decodes as a types.Version.
func (r *Reader) ReadVersion() (types.Version, error) {
return r.ReadByteArray()
Expand Down
Loading

0 comments on commit 4e26107

Please sign in to comment.