Skip to content

Commit

Permalink
REFACT - extensive - Cleanup connection cruft from 2009 - TESTED
Browse files Browse the repository at this point in the history
	- REFACT protocol.go
	- BUG FIX for TYPE command
	- MOD WIP rethink of PubSubClient et al
  • Loading branch information
Joubin Houshyar committed Sep 18, 2012
1 parent 1b5dac4 commit b2a64aa
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 300 deletions.
2 changes: 1 addition & 1 deletion asynchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (c *asyncClient) Randomkey() (result FutureString, err Error) {
var resp *PendingResponse
resp, err = c.conn.QueueRequest(&RANDOMKEY, [][]byte{})
if err == nil {
result = resp.future.(FutureString)
result = resp.future.(FutureString) // REVU - this is broken
}
return result, err

Expand Down
81 changes: 71 additions & 10 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ type PendingResponse struct {
future interface{} // TODO: stop using runtime casts
}

// ----------------------------------------------------------------------------
// PubSubConnection API
// ----------------------------------------------------------------------------

// Defines the service contract supported by asynchronous (Request/FutureReply)
// connections.

// REVU - does this need its own special connection structure or
// can we get away with just using the asyncConnHdl structure?
// REVU - BEST is to reuse asyncConnHdl and simple have that support this
// interface.
type PubSubConnection interface {
// TODO - this connector needs to be given a channel to feed responses
// into as it does not use futures.
// SetOutputChannel(<-chan message) bool
// TODO - then either a generic ServiceRequest or explicit Sub/UnSub/Quit
// REVU - best to keep it simple and use a single method so using Command
// -
}

// ----------------------------------------------------------------------------
// Generic Conn handle and methods - supports SyncConnection interface
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -317,7 +337,10 @@ func (hdl *connHdl) ServiceRequest(cmd *Command, args [][]byte) (resp Response,
if e == nil {
e = sendRequest(hdl.conn, buff)
if e == nil {
// REVU - this demands resp to be non-nil even in case of io errors
// TODO - refactor this
resp, e = GetResponse(hdl.reader, cmd)
// fmt.Printf("DEBUG-TEMP - resp: %s\n", resp) // REVU REMOVE TODO
if e == nil {
if resp.IsError() {
redismsg := fmt.Sprintf(" [%s]: %s", cmd.Code, resp.GetMessage())
Expand Down Expand Up @@ -535,6 +558,46 @@ func (c *asyncConnHdl) QueueRequest(cmd *Command, args [][]byte) (*PendingRespon
return &PendingResponse{future}, nil
}

// ----------------------------------------------------------------------------
// asyncConnHdl support for PubSubConnection interface
// ----------------------------------------------------------------------------

//// TODO Quit - see REVU notes added for adding Quit to async in body
//func (c *asyncConnHdl) ServicePubSubRequest(cmd *Command, args [][]byte) (, Error) {
//
// if c.isShutdown {
// return nil, NewError(SYSTEM_ERR, "Connection is shutdown.")
// }
//
// select {
// case <-c.shutdown:
// c.isShutdown = true
// c.shutdown <- true // put it back REVU likely to be a bug under heavy load ..
// // log.Println("<DEBUG> we're shutdown and not accepting any more requests ...")
// return nil, NewError(SYSTEM_ERR, "Connection is shutdown.")
// default:
// }
//
// future := CreateFuture(cmd)
// request := &asyncRequestInfo{0, 0, cmd, nil, future, nil}
//
// buff, e1 := CreateRequestBytes(cmd, args)
// if e1 == nil {
// request.outbuff = &buff
// c.pendingReqs <- request // REVU - opt 1 TODO is handling QUIT and sending stop to workers
// } else {
// errmsg := fmt.Sprintf("Failed to create asynchrequest - %s aborted", cmd.Code)
// request.stat = inierr
// request.error = NewErrorWithCause(SYSTEM_ERR, errmsg, e1) // only makes sense if using go ...
// request.future.(FutureResult).onError(request.error)
//
// return nil, request.error // remove if restoring go
// }
// //}();
// // done.
// return &PendingResponse{future}, nil
//}

// ----------------------------------------------------------------------------
// asyncConnHdl internal ops
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -571,8 +634,10 @@ func (c *asyncConnHdl) startup() {

var rspProcTask workerTask
switch protocol {
case REDIS_DB: rspProcTask = dbRspProcessingTask
case REDIS_PUBSUB: rspProcTask = msgProcessingTask
case REDIS_DB:
rspProcTask = dbRspProcessingTask
case REDIS_PUBSUB:
rspProcTask = msgProcessingTask
}
go c.worker(heartbeatworker, "response-processor", rspProcTask, c.rspProcCtl, c.feedback)
c.rspProcCtl <- start
Expand Down Expand Up @@ -776,13 +841,9 @@ func msgProcessingTask(c *asyncConnHdl, ctl workerCtl) (sig *interrupt_code, te
// continue to process
}
reader := c.super.reader
// cmd := &SUBSCRIBE
// cmd := &SUBSCRIBE

// REVU can optimize by just reading MultiBulkResponse
// REVU it is NOT a multi bulk, it is a mix
// TODO getMessageResponse(reader)
// Almost like a multibulk but always *3, then 2 $size//string then number reponse
resp, e3 := getMultiBulkResponse(reader, nil)
message, e3 := getPubSubResponse(reader, nil)
if e3 != nil {
// system error
log.Println("<TEMP DEBUG> on error in msgProcessingTask: ", e3)
Expand All @@ -791,12 +852,12 @@ func msgProcessingTask(c *asyncConnHdl, ctl workerCtl) (sig *interrupt_code, te
c.faults <- req
return nil, &taskStatus{rcverr, e3}
}
log.Printf("MSG IN: %s\n", resp.GetMultiBulkData())
log.Printf("MSG IN: %s\n", message)
//
panic("msgProcessingTask not implemented")
}

// Pending further tests, this addresses bug in earlier version
// Pending further tests, this addresses bug in earlier version
// and can be interrupted

func reqProcessingTask(c *asyncConnHdl, ctl workerCtl) (ic *interrupt_code, ts *taskStatus) {
Expand Down
Loading

0 comments on commit b2a64aa

Please sign in to comment.