Skip to content

Commit

Permalink
code health: all requests use request objects
Browse files Browse the repository at this point in the history
The patch is a refactoring of an internal logic. It replaces the usage
of closures to request objects to construct a request body. After the
patch all Connection.* requests use request objects inside.

Closes #126
  • Loading branch information
oleg-jukovec committed Jun 20, 2022
1 parent 9ce0ee5 commit 0343412
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 258 deletions.
79 changes: 39 additions & 40 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,18 +468,35 @@ func (conn *Connection) dial() (err error) {
return
}

func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
request := &Future{
requestId: 0,
requestCode: AuthRequestCode,
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
hl := h.Len()
h.Write([]byte{
0xce, 0, 0, 0, 0, // Length.
0x82, // 2 element map.
KeyCode, byte(req.Code()), // Request code.
KeySync, 0xce,
byte(reqid >> 24), byte(reqid >> 16),
byte(reqid >> 8), byte(reqid),
})

if err = req.Body(res, enc); err != nil {
return
}

l := uint32(h.Len() - 5 - hl)
h.b[hl+1] = byte(l >> 24)
h.b[hl+2] = byte(l >> 16)
h.b[hl+3] = byte(l >> 8)
h.b[hl+4] = byte(l)

return
}

func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
var packet smallWBuf
err = request.pack(&packet, msgpack.NewEncoder(&packet), func(enc *msgpack.Encoder) error {
return enc.Encode(map[uint32]interface{}{
KeyUserName: conn.opts.User,
KeyTuple: []interface{}{string("chap-sha1"), string(scramble)},
})
})
req := newAuthRequest(conn.opts.User, string(scramble))
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)

if err != nil {
return errors.New("auth: pack error " + err.Error())
}
Expand Down Expand Up @@ -704,7 +721,7 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
}
}

func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
func (conn *Connection) newFuture() (fut *Future) {
fut = NewFuture()
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
select {
Expand All @@ -720,7 +737,6 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
}
}
fut.requestId = conn.nextRequestId()
fut.requestCode = requestCode
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.rmut.Lock()
Expand Down Expand Up @@ -769,23 +785,16 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
return
}

func (conn *Connection) sendFuture(fut *Future, body func(*msgpack.Encoder) error) *Future {
func (conn *Connection) send(req Request) *Future {
fut := conn.newFuture()
if fut.ready == nil {
return fut
}
conn.putFuture(fut, body)
return fut
}

func (conn *Connection) failFuture(fut *Future, err error) *Future {
if f := conn.fetchFuture(fut.requestId); f == fut {
fut.SetError(err)
conn.markDone(fut)
}
conn.putFuture(fut, req)
return fut
}

func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error) {
func (conn *Connection) putFuture(fut *Future, req Request) {
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.bufmut.Lock()
Expand All @@ -801,10 +810,11 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
shard.enc = msgpack.NewEncoder(&shard.buf)
}
blen := shard.buf.Len()
if err := fut.pack(&shard.buf, shard.enc, body); err != nil {
reqid := fut.requestId
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
shard.buf.Trunc(blen)
shard.bufmut.Unlock()
if f := conn.fetchFuture(fut.requestId); f == fut {
if f := conn.fetchFuture(reqid); f == fut {
fut.SetError(err)
conn.markDone(fut)
} else if f != nil {
Expand Down Expand Up @@ -983,10 +993,7 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
// An error is returned if the request was formed incorrectly, or failure to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) Do(req Request) (*Response, error) {
fut, err := conn.DoAsync(req)
if err != nil {
return nil, err
}
fut := conn.DoAsync(req)
return fut.Get()
}

Expand All @@ -995,24 +1002,16 @@ func (conn *Connection) Do(req Request) (*Response, error) {
// An error is returned if the request was formed incorrectly, or failure to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) DoTyped(req Request, result interface{}) error {
fut, err := conn.DoAsync(req)
if err != nil {
return err
}
fut := conn.DoAsync(req)
return fut.GetTyped(result)
}

// DoAsync verifies, sends the request and returns a future.
//
// An error is returned if the request was formed incorrectly, or failure to
// create the future.
func (conn *Connection) DoAsync(req Request) (*Future, error) {
bodyFunc, err := req.BodyFunc(conn.Schema)
if err != nil {
return nil, err
}
future := conn.newFuture(req.Code())
return conn.sendFuture(future, bodyFunc), nil
func (conn *Connection) DoAsync(req Request) *Future {
return conn.send(req)
}

// ConfiguredTimeout returns a timeout from connection config.
Expand Down
4 changes: 2 additions & 2 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,10 @@ func (connPool *ConnectionPool) DoTyped(req tarantool.Request, result interface{
}

// DoAsync sends the request and returns a future.
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) (*tarantool.Future, error) {
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) *tarantool.Future {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return nil, err
return tarantool.NewErrorFuture(err)
}

return conn.DoAsync(req)
Expand Down
2 changes: 1 addition & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ type Connector interface {

Do(req Request) (resp *Response, err error)
DoTyped(req Request, result interface{}) (err error)
DoAsync(req Request) (fut *Future, err error)
DoAsync(req Request) (fut *Future)
}
15 changes: 3 additions & 12 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,7 @@ func ExampleSelectRequest() {
Index("primary").
Limit(100).
Key(tarantool.IntKey{1111})
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async select request is %v", err)
}
fut := conn.DoAsync(req)
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async select request is %v", err)
Expand Down Expand Up @@ -177,10 +174,7 @@ func ExampleUpdateRequest() {
Index("primary").
Key(tarantool.IntKey{1111}).
Operations(tarantool.NewOperations().Assign(1, "hello"))
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async update request is %v", err)
}
fut := conn.DoAsync(req)
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async update request is %v", err)
Expand Down Expand Up @@ -210,10 +204,7 @@ func ExampleUpsertRequest() {
req = tarantool.NewUpsertRequest("test").
Tuple([]interface{}{uint(1113), "second", "second"}).
Operations(tarantool.NewOperations().Assign(2, "updated"))
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async upsert request is %v", err)
}
fut := conn.DoAsync(req)
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async upsert request is %v", err)
Expand Down
46 changes: 9 additions & 37 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,19 @@ package tarantool
import (
"sync"
"time"

"gopkg.in/vmihailenco/msgpack.v2"
)

// Future is a handle for asynchronous request.
type Future struct {
requestId uint32
requestCode int32
timeout time.Duration
mutex sync.Mutex
pushes []*Response
resp *Response
err error
ready chan struct{}
done chan struct{}
next *Future
}

func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) {
rid := fut.requestId
hl := h.Len()
h.Write([]byte{
0xce, 0, 0, 0, 0, // Length.
0x82, // 2 element map.
KeyCode, byte(fut.requestCode), // Request code.
KeySync, 0xce,
byte(rid >> 24), byte(rid >> 16),
byte(rid >> 8), byte(rid),
})

if err = body(enc); err != nil {
return
}

l := uint32(h.Len() - 5 - hl)
h.b[hl+1] = byte(l >> 24)
h.b[hl+2] = byte(l >> 16)
h.b[hl+3] = byte(l >> 8)
h.b[hl+4] = byte(l)

return
requestId uint32
next *Future
timeout time.Duration
mutex sync.Mutex
pushes []*Response
resp *Response
err error
ready chan struct{}
done chan struct{}
}

func (fut *Future) wait() {
Expand Down
2 changes: 1 addition & 1 deletion multi/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,6 @@ func (connMulti *ConnectionMulti) DoTyped(req tarantool.Request, result interfac
}

// DoAsync sends the request and returns a future.
func (connMulti *ConnectionMulti) DoAsync(req tarantool.Request) (*tarantool.Future, error) {
func (connMulti *ConnectionMulti) DoAsync(req tarantool.Request) *tarantool.Future {
return connMulti.getCurrentConnection().DoAsync(req)
}
Loading

0 comments on commit 0343412

Please sign in to comment.