Skip to content

Commit

Permalink
MOD (extensive) - properly support QUIT for async clients -- TESTED
Browse files Browse the repository at this point in the history
	- ADD support for VIRTUAL resps to protocol.go
	- MOD connection.go and workers to handle QUIT
	- ADD QUIT test
	- NOP cleanup mentions of RedisClient
  • Loading branch information
Joubin Houshyar committed Sep 14, 2012
1 parent e854f26 commit bfefff6
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 44 deletions.
16 changes: 8 additions & 8 deletions asynchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func NewAsynchClientWithSpec(spec *ConnectionSpec) (client AsyncClient, err Erro

// Redis QUIT command.
func (c *asyncClient) Quit() (stat FutureBool, err Error) {
log.Println("<BUG> Lazy programmer hasn't implemented Quit!")
return nil, NewError(SYSTEM_ERR, "<BUG> Lazy programmer hasn't implemented Quit!")
// resp, err := c.conn.QueueRequest(&QUIT, [][]byte{})
// if err == nil {
// stat = resp.future.(FutureBool)
// }
//
// return
// log.Println("<BUG> Lazy programmer hasn't implemented Quit!")
// return nil, NewError(SYSTEM_ERR, "<BUG> Lazy programmer hasn't implemented Quit!")
resp, err := c.conn.QueueRequest(&QUIT, [][]byte{})
if err == nil {
stat = resp.future.(FutureBool)
}

return
}

// Redis GET command.
Expand Down
38 changes: 37 additions & 1 deletion asynchclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,43 @@ func TestAsyncClientConnectWithSpec(t *testing.T) {
} else if client == nil {
t.Error("BUG: client is nil")
}
client.Quit()

// quit once -- OK
futureBool, err := client.Quit()
if err != nil {
t.Errorf("BUG - initial Quit on asyncClient should not return error - %s ", err)
}
if futureBool == nil {
t.Errorf("BUG - non-error asyncClient response should not return nil future")
}
// block until we get results
ok, fe := futureBool.Get()
if fe != nil {
t.Errorf("BUG - non-Error Quit future result get must never return error - got: %s", fe)
}
if !ok {
t.Errorf("BUG - non-Error Quit future result must always be true ")
}

// subsequent quit should raise error
futureBool, err = client.Quit()
if err == nil {
t.Errorf("BUG - Quit on shutdown asyncClient should return error")
}
if futureBool != nil {
t.Errorf("BUG - Quit on shutdown asyncClient should not return future. got: %s", futureBool)
}

futureBool, err = client.Quit()
if err == nil {
t.Errorf("BUG - Quit on shutdown asyncClient should return error")
}
if futureBool != nil {
t.Errorf("BUG - Quit on shutdown asyncClient should not return future. got: %s", futureBool)
}

// ch := make(chan bool, 0)
// <-ch
}

/* --------------- KEEP THIS AS LAST FUNCTION -------------- */
Expand Down
2 changes: 1 addition & 1 deletion bench/asyncbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func benchTask(taskspec taskSpec, iterations int, workers int, printReport bool)
return -1, e
}

defer client.RedisClient().Quit()
defer client.Quit()

// panics
setup(client)
Expand Down
2 changes: 1 addition & 1 deletion bench/synchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func doOne(cnt int) error {
return failedTest("NewSynchClient returned nil!")
}
// defer client.Quit() // will be deprecated soon
defer client.RedisClient().Quit()
defer client.Quit()

client.Flushdb()

Expand Down
2 changes: 1 addition & 1 deletion compliance/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func getDefinedMethods(ctype clientType) (map[string]string, *error_) {
if client == nil {
return mmap, &error_{"client is nil", nil}
} else {
defer client.(redis.RedisClient).Quit()
defer client.Quit()
}

tc := reflect.TypeOf(client)
Expand Down
58 changes: 42 additions & 16 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ const (
ready
working
faulted
quit_processed
)

type workerStatus struct {
Expand Down Expand Up @@ -405,7 +406,8 @@ type asyncConnHdl struct {

feedback chan workerStatus

shutdown chan bool
shutdown chan bool
isShutdown bool
}

func (c *asyncConnHdl) String() string {
Expand Down Expand Up @@ -436,6 +438,8 @@ func newAsyncConnHdl(spec *ConnectionSpec) (async *asyncConnHdl, err Error) {
async.feedback = make(chan workerStatus)
async.shutdown = make(chan bool, 1)

async.isShutdown = false

return
}
}
Expand Down Expand Up @@ -465,21 +469,29 @@ func NewAsynchConnection(spec *ConnectionSpec) (conn AsyncConnection, err Error)
// asyncConnHdl support for AsyncConnection interface
// ----------------------------------------------------------------------------

// TODO Quit - see REVU notes added for adding Quit to async in body
func (c *asyncConnHdl) QueueRequest(cmd *Command, args [][]byte) (*PendingResponse, Error) {

if c.isShutdown {
return nil, NewError(SYSTEM_ERR, "Connection is shutdown.")
}

select {
case <-c.shutdown:
log.Println("<DEBUG> we're shutdown and not accepting any more requests ...")
c.isShutdown = true
c.shutdown <- true // put it back REVU likely to be a bug under heavy load ..
// log.Println("<DEBUG> we're shutdown and not accepting any more requests ...")
return nil, NewError(SYSTEM_ERR, "Connection is shutdown.")
default:
}

future := CreateFuture(cmd)

request := &asyncRequestInfo{0, 0, cmd, nil, future, nil}

buff, e1 := CreateRequestBytes(cmd, args)
if e1 == nil {
request.outbuff = &buff
c.pendingReqs <- request
c.pendingReqs <- request // REVU - opt 1 TODO is handling QUIT and sending stop to workers
} else {
errmsg := fmt.Sprintf("Failed to create asynchrequest - %s aborted", cmd.Code)
request.stat = inierr
Expand All @@ -503,8 +515,11 @@ func (c *asyncConnHdl) connect() (e Error) {
return c.super.connect()
}

// REVU - TODO opt 2 for Quit here
func (c *asyncConnHdl) disconnect() (e Error) {
return

panic("asyncConnHdl.disconnect NOT IMLEMENTED!")
// return
}

// responsible for managing the various moving parts of the asyncConnHdl
Expand Down Expand Up @@ -581,7 +596,7 @@ before_stop:
// fmt.Println(name, "_worker: before_stop!")
// TODO: add shutdown hook for worker

log.Println(name, "_worker: STOPPED!")
log.Printf("<INFO> %s - %s STOPPED.", c, name)
}

// ----------------------------------------------------------------------------
Expand All @@ -604,18 +619,19 @@ func managementTask(c *asyncConnHdl, ctl workerCtl) (sig *interrupt_code, te *ta
// log.Println("MGR: do task ...")
select {
case stat := <-c.feedback:
log.Println("MGR: Feedback from one of my minions: ", stat)
// do the shutdown for now -- TODO: try reconnect
if stat.event == faulted {
log.Println("MGR: Shutting down due to fault in ", stat.id)
if stat.event == faulted || stat.event == quit_processed {
if stat.event == faulted {
log.Printf("<INFO> - %s (manager task) FAULT EVENT ", c)
}
log.Printf("<INFO> %s - (manager task) SHUTTING DOWN ...", c)
c.shutdown <- true

log.Printf("<INFO> %s - (manager task) RAISING SIGNAL STOP ...", c)
go func() { c.reqProcCtl <- stop }()
go func() { c.rspProcCtl <- stop }()
go func() { c.heartbeatCtl <- stop }()

log.Println("MGR: Signal SHUTDOWN ... ")
c.shutdown <- true
// stop self // TODO: should manager really be a task or a FSM?
c.managerCtl <- stop
go func() { c.managerCtl <- stop }()
}
case s := <-ctl:
return &s, &ok_status
Expand Down Expand Up @@ -685,7 +701,7 @@ func rspProcessingTask(c *asyncConnHdl, ctl workerCtl) (sig *interrupt_code, te
reader := c.super.reader
cmd := req.cmd

resp, e3 := GetResponse(reader, cmd)
resp, e3 := GetResponse(reader, cmd) // REVU - protocol modified to handle VIRTUALS
if e3 != nil {
// system error
log.Println("<TEMP DEBUG> Request sent to faults chan on error in GetResponse: ", e3)
Expand All @@ -694,8 +710,18 @@ func rspProcessingTask(c *asyncConnHdl, ctl workerCtl) (sig *interrupt_code, te
c.faults <- req
return nil, &taskStatus{rcverr, e3}
}
SetFutureResult(req.future, cmd, resp)

// if responsed processed was for cmd QUIT then signal the rest of the crew
// REVU - ok, a bit hacky but it works.
if cmd == &QUIT {
c.feedback <- workerStatus{0, quit_processed, nil, nil}
fakesig := pause
c.isShutdown = true
SetFutureResult(req.future, cmd, resp)
return &fakesig, &ok_status
}

SetFutureResult(req.future, cmd, resp)
return nil, &ok_status
}

Expand Down
20 changes: 12 additions & 8 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func CreateFuture(cmd *Command) (future interface{}) {
case NUMBER:
future = newFutureInt64()
case STATUS:
// future = newFutureString();
future = newFutureBool()
case STRING:
future = newFutureString()
// case VIRTUAL: // TODO
// resp, err = getVirtualResponse ();
case VIRTUAL:
// REVU - treating virtual futures as FutureBools (always true)
future = newFutureBool()
}
return
}
Expand All @@ -122,12 +122,12 @@ func SetFutureResult(future interface{}, cmd *Command, r Response) {
case NUMBER:
future.(FutureInt64).set(r.GetNumberValue())
case STATUS:
// future.(FutureString).set(r.GetMessage());
future.(FutureBool).set(true)
case STRING:
future.(FutureString).set(r.GetStringValue())
// case VIRTUAL: // TODO
// resp, err = getVirtualResponse ();
case VIRTUAL:
// REVU - OK to treat virtual commands as FutureBool
future.(FutureBool).set(true)
}
}
}
Expand All @@ -151,8 +151,12 @@ func GetResponse(reader *bufio.Reader, cmd *Command) (resp Response, err error)
resp, err = getStatusResponse(reader, cmd)
case STRING:
resp, err = getStringResponse(reader, cmd)
// case VIRTUAL:
// resp, err = getVirtualResponse ();
case VIRTUAL:
// REVU - no data expected from reader so no parsing
// - response status is always OK/True
// - no error expected
resp = newBooleanResponse(true, false)
err = nil
}
return
}
Expand Down
8 changes: 0 additions & 8 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,6 @@ import (
"flag"
)

//// Common interface supported by all clients
//// to consolidate common ops
//type RedisClient interface {
//
// // Redis QUIT command.
// Quit() (err Error)
//}

// The synchronous call semantics Client interface.
//
// Method names map one to one to the Redis command set.
Expand Down

0 comments on commit bfefff6

Please sign in to comment.