Skip to content

Commit

Permalink
WIP - PubSub working; WIP is timeout (general) and PSUB/PUNSUB - ADHO…
Browse files Browse the repository at this point in the history
…C TESTED

	NOTE - not entirely pleased with this API - SUBJECT TO FURTHER
CHANGES.

	TODO - Pattern based pubsub
	TODO - error handling on send SUB/UNSUB side (minor issue but there)
  • Loading branch information
Joubin Houshyar committed Sep 24, 2012
1 parent f0702a8 commit bd940d0
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 46 deletions.
104 changes: 68 additions & 36 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"log"
"net"
"reflect"
"time"
)

Expand All @@ -35,8 +36,8 @@ const (
DefaultRespChanSize = 1000000
DefaultTCPReadBuffSize = 1024 * 256
DefaultTCPWriteBuffSize = 1024 * 256
DefaultTCPReadTimeoutNSecs = 10 * time.Second
DefaultTCPWriteTimeoutNSecs = 10 * time.Second
DefaultTCPReadTimeoutNSecs = 1000 * time.Nanosecond
DefaultTCPWriteTimeoutNSecs = 1000 * time.Nanosecond
DefaultTCPLinger = 0 // -n: finish io; 0: discard, +n: wait for n secs to finish
DefaultTCPKeepalive = true
DefaultHeartbeatSecs = 1 * time.Second
Expand Down Expand Up @@ -189,13 +190,16 @@ type PendingResponse struct {

type PubSubConnection interface {
Subscriptions() map[string]*Subscription
ServiceRequest(cmd *Command, args [][]byte) (ok bool, err Error)
ServiceRequest(cmd *Command, args [][]byte) (pending map[string]FutureBool, err Error)
}

// REVU - why is this exported?
type Subscription struct {
activated chan bool
Channel chan []byte
IsActive bool
activated FutureBool
// closed FutureBool // REVU - for unsubscribe - necessary?
// activated chan bool
Channel chan []byte
IsActive bool // REVU - not necessary
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -261,12 +265,11 @@ func newConnHdl(spec *ConnectionSpec) (hdl *connHdl) {
}

func configureConn(conn net.Conn, spec *ConnectionSpec) {
// REVU [jh] - TODO look into this 09-13-2012
// these two -- the most important -- are causing problems on my osx/64
// where a "service unavailable" pops up in the async reads
// but we absolutely need to be able to use timeouts.
// conn.SetReadTimeout(spec.rTimeout);
// conn.SetWriteTimeout(spec.wTimeout);
// REVU - this requires a refact of protocol.go's error propagation
// starting from read or write op -- 09-22-2012
// TODO 09-23-2012
// Deadline can be set in a handful of callsites in connection.go
// but need a clean way to test for net.Error (as cause) and timeout
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetLinger(spec.lingerspec)
tcp.SetKeepAlive(spec.keepalive)
Expand Down Expand Up @@ -595,7 +598,8 @@ func (c *asyncConnHdl) QueueRequest(cmd *Command, args [][]byte) (pending *Pendi
// PubSubConnection support (only)
// Accepts Redis commands (P)SUBSCRIBE and (P)UNSUBSCRIBE.
// Request is processed asynchronously but call semantics are sync/blocking.
func (c *asyncConnHdl) ServiceRequest(cmd *Command, args [][]byte) (ok bool, err Error) {
func (c *asyncConnHdl) ServiceRequest(cmd *Command, args [][]byte) (pending map[string]FutureBool, err Error) {
//func (c *asyncConnHdl) ServiceRequest(cmd *Command, args [][]byte) (ok bool, err Error) {

defer func() {
if re := recover(); re != nil {
Expand All @@ -620,30 +624,33 @@ func (c *asyncConnHdl) ServiceRequest(cmd *Command, args [][]byte) (ok bool, err
default:
}

// REVU - issue with this pattern is that request side errors
// REVU are not captured.
pending = make(map[string]FutureBool)

buff := CreateRequestBytes(cmd, args) // panics
for _, arg := range args {
topic := string(arg)
if s := c.subscriptions[topic]; s != nil {
panic(fmt.Errorf("already subscribed to topic %s", topic))
}
pendingActivation := newFutureBool()
pending[topic] = pendingActivation
subscription := &Subscription{
IsActive: false,
activated: make(chan bool, 1),
activated: pendingActivation,
Channel: make(chan []byte, 100), // TODO - from spec
}
c.subscriptions[topic] = subscription
// TODO - go routine for each sub to wait on the ack
// TODO - and we wait here for all
}

// REVU - errors on request side are conveyed via the future in request
// REVU - issue is how t
// future := CreateFuture(cmd)
// request := &asyncRequestInfo{0, 0, cmd, &buff, future, nil}
request := &asyncRequestInfo{0, 0, cmd, &buff, nil, nil}
c.pendingReqs <- request

// REVU - we can do timeout and mod the sig.
ok = true

return
}

Expand Down Expand Up @@ -692,6 +699,8 @@ func (c *asyncConnHdl) startup() {
rspProcTask = dbRspProcessingTask
case REDIS_PUBSUB:
rspProcTask = msgProcessingTask
// cmd := SUBSCRIBE
// c.pendingResps <- &asyncRequestInfo{0, 0, &cmd, nil, nil, nil}
}
go c.worker(responsehandler, "response-processor", rspProcTask, c.rspProcCtl, c.feedback)
c.rspProcCtl <- start
Expand Down Expand Up @@ -890,39 +899,58 @@ func dbRspProcessingTask(c *asyncConnHdl, ctl workerCtl) (sig *interrupt_code, t
return nil, &ok_status
}

// REVU - this is wrong
// Given that there is no symmetric REQ for RESPs (unlike db reqs/rsps)
// this needs to be a read on net with timeout so we need to mod
// or enhance the protocol funcs to deal with net read timeout.
func msgProcessingTask(c *asyncConnHdl, ctl workerCtl) (sig *interrupt_code, te *taskStatus) {
log.Println("<TEMP DEBUG> msgProcessingTask - S0 ")
log.Printf("<TEMP DEBUG> msgProcessingTask - c.pendingResps:%s\n", c.pendingResps)
var req asyncReqPtr
// log.Println("<TEMP DEBUG> msgProcessingTask - S0 ")
// log.Printf("<TEMP DEBUG> msgProcessingTask - c.pendingResps:%s\n", c.pendingResps)
// var req asyncReqPtr
timer := time.NewTimer(2 * time.Nanosecond)
select {
case sig := <-ctl:
// interrupted
return &sig, &ok_status
case req = <-c.pendingResps:
c.pendingResps <- req
// continue to process
case <-timer.C:
// continue
}

log.Println("<TEMP DEBUG> msgProcessingTask - S1 ")
// log.Println("<TEMP DEBUG> msgProcessingTask - S2 ")
reader := c.super.reader
// cmd := &SUBSCRIBE

deadline := time.Now().Add(100 * time.Second)
c.super.conn.SetReadDeadline(deadline)
message, e := GetPubSubResponse(reader)
if e != nil {
// REVU - need to check if it is a timeout error
// TODO - GetPubSubResponse needs to return cause
etype := reflect.TypeOf(e)
log.Printf("<TEMP><DEBUG> e: %s - etype:%s\n", e, etype)
// system error
log.Println("<TEMP DEBUG> on error in msgProcessingTask: ", e)
req.stat = rcverr
req.error = NewErrorWithCause(SYSTEM_ERR, "GetResponse os.Error", e)
c.faults <- req
return nil, &ok_status
// req.stat = rcverr
// req.error = NewErrorWithCause(SYSTEM_ERR, "GetResponse os.Error", e)
// c.faults <- req
// return nil, &taskStatus{rcverr, e}
}
s := c.subscriptions[message.Topic]
switch message.Type {
case SUBSCRIBE_ACK:
s.activated.set(true)
s.IsActive = true
// c.subscriptions[message.Topic].activated <- true
case UNSUBSCRIBE_ACK:
s.IsActive = false
close(s.Channel)
case MESSAGE:
log.Printf("MSG IN: %s\n", message)
s.Channel <- message.Body
default:
e := fmt.Errorf("BUG - TODO - unhandled message type - %s", message.Type)
return nil, &taskStatus{rcverr, e}
}
log.Printf("MSG IN: %s\n", message)
//
panic("msgProcessingTask not implemented")
// panic("msgProcessingTask not implemented")
return nil, &ok_status
}

// Pending further tests, this addresses bug in earlier version
Expand Down Expand Up @@ -997,9 +1025,13 @@ func (c *asyncConnHdl) processAsyncRequest(req asyncReqPtr) (blen int, e error)
e = re.(error)
log.Println("<BUG> lazy programmer >> ERROR in processRequest goroutine -req requeued for now")
// TODO: set stat on future & inform conn control and put it in faulted list
c.pendingReqs <- req
req.future.(FutureResult).onError(NewErrorWithCause(SYSTEM_ERR, "recovered panic in processAsyncRequest", e))
c.faults <- req
// c.pendingReqs <- req
}
}()

// REVU - where is error check on this?
sendRequest(c.writer, *req.outbuff)

req.outbuff = nil
Expand Down
13 changes: 10 additions & 3 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func newUnsubcribeAck(topic string, scnt int) *Message {
// PubSub message processing
// ----------------------------------------------------------------------------

// REVU - needs to return Error to capture cause e.g. net.Error (timeout)
func GetPubSubResponse(r *bufio.Reader) (msg *Message, err error) {
defer func() {
e := recover()
Expand All @@ -366,7 +367,7 @@ func GetPubSubResponse(r *bufio.Reader) (msg *Message, err error) {
num, e := strconv.ParseInt(string(buf[1:len(buf)]), 10, 64)
assertNotError(e, "in getPubSubResponse - ParseInt")
if num != 3 {
panic(fmt.Errorf("<BUG> Expecting *3 for len in response - got %d", num))
panic(fmt.Errorf("<BUG> Expecting *3 for len in response - got %d - buf: %s", num, buf))
}

header := readMultiBulkData(r, 2)
Expand All @@ -381,15 +382,18 @@ func GetPubSubResponse(r *bufio.Reader) (msg *Message, err error) {

// TODO - REVU decisiont to conflate P/SUB and P/UNSUB
switch msgtype {
case "subscribe", "psubscribe":
case "subscribe":
assertCtlByte(buf, NUM_BYTE, "subscribe")
msg = newSubcribeAck(subid, n)
case "unsubscribe", "punsubscribe":
case "unsubscribe":
assertCtlByte(buf, NUM_BYTE, "unsubscribe")
msg = newUnsubcribeAck(subid, n)
case "message":
assertCtlByte(buf, SIZE_BYTE, "MESSAGE")
msg = newMessage(subid, readBulkData(r, int(n)))
// TODO
case "psubscribe", "punsubscribe", "pmessage":
panic(fmt.Errorf("<BUG> - pattern-based message type %s not implemented", msgtype))
}

return
Expand All @@ -406,6 +410,7 @@ func GetPubSubResponse(r *bufio.Reader) (msg *Message, err error) {
//
// panics on error

// REVU - needs to throw Error to capture cause e.g. net.Error (timeout)
func readToCRLF(r *bufio.Reader) []byte {
// var buf []byte
buf, err := r.ReadBytes(CR_BYTE)
Expand All @@ -424,6 +429,7 @@ func readToCRLF(r *bufio.Reader) []byte {
return buf[0 : len(buf)-1]
}

// REVU - needs to throw Error to capture cause e.g. net.Error (timeout)
func readBulkData(r *bufio.Reader, n int) (data []byte) {
if n >= 0 {
buffsize := n + 2
Expand All @@ -440,6 +446,7 @@ func readBulkData(r *bufio.Reader, n int) (data []byte) {
return
}

// REVU - needs to throw Error to capture cause e.g. net.Error (timeout)
func readMultiBulkData(conn *bufio.Reader, num int) [][]byte {
data := make([][]byte, num)
for i := 0; i < num; i++ {
Expand Down
18 changes: 14 additions & 4 deletions pubsubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package redis

import ()

// -----------------------------------------------------------------------------
// pubsubClient - supports PubSubClient interface
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -46,9 +44,21 @@ func NewPubSubClientWithSpec(spec *ConnectionSpec) (PubSubClient, Error) {
}

func (c *pubsubClient) Messages(topic string) PubSubChannel {
if s := c.conn.Subscriptions()[topic]; s != nil && s.IsActive {
return s.Channel
// REVU - only after impl blocking subscribe in connection#ServiceRequest
if s := c.conn.Subscriptions()[topic]; s != nil {
ok, err := s.activated.Get()
if err != nil {
panic("BUG")
}
if ok {
return s.Channel
} else {
panic("BUG - isActivated.Get() returned nil err and false future results")
}
}
// if s := c.conn.Subscriptions()[topic]; s != nil {
// return s.Channel
// }
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions specification.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ var (
INFO Command = Command{"INFO", NO_ARG, BULK}
MONITOR Command = Command{"MONITOR", NO_ARG, VIRTUAL}
// TODO SORT (RequestType.MULTI_KEY, ResponseType.MULTI_BULK),
PUBLISH Command = Command{"PUBLISH", KEY_VALUE, NUMBER}
SUBSCRIBE Command = Command{"PSUBSCRIBE", MULTI_KEY, MULTI_BULK}
UNSUBSCRIBE Command = Command{"PUNSUBSCRIBE", MULTI_KEY, MULTI_BULK}
PUBLISH Command = Command{"PUBLISH", KEY_VALUE, NUMBER}
SUBSCRIBE Command = Command{"SUBSCRIBE", MULTI_KEY, MULTI_BULK}
UNSUBSCRIBE Command = Command{"UNSUBSCRIBE", MULTI_KEY, MULTI_BULK}
PSUBSCRIBE Command = Command{"PSUBSCRIBE", MULTI_KEY, MULTI_BULK}
PUNSUBSCRIBE Command = Command{"PUNSUBSCRIBE", MULTI_KEY, MULTI_BULK}
)

// ----------------------------------------------------------------------
Expand Down

0 comments on commit bd940d0

Please sign in to comment.