Skip to content

Commit

Permalink
ADD PubSub - support PubSub PUBLISH - TESTED
Browse files Browse the repository at this point in the history
	ADD - sync PUBLISH
	ADD - async PUBLISH
  • Loading branch information
Joubin Houshyar committed Sep 18, 2012
1 parent 3296f9c commit 711ab51
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 0 deletions.
12 changes: 12 additions & 0 deletions asynchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,3 +941,15 @@ func (c *asyncClient) Lastsave() (result FutureInt64, err Error) {
return result, err

}

func (c *asyncClient) Publish(arg0 string, arg1 []byte) (result FutureInt64, err Error) {
arg0bytes := []byte(arg0)
arg1bytes := arg1

var resp *PendingResponse
resp, err = c.conn.QueueRequest(&PUBLISH, [][]byte{arg0bytes, arg1bytes})
if err == nil {
result = resp.future.(FutureInt64)
}
return result, err
}
109 changes: 109 additions & 0 deletions examples/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2009-2012 Joubin Houshyar
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package main

import (
"flag"
"fmt"
"log"
"redis"
)

// PubSub - PubSub message publisher example demonstrates publishing messages
// via Redis PubSub's PUBLISH command. Both sync and async client examples are
// provided.
//
// Messages are published to a Redis channel named "example/pubsub/channel".
// Ideally, you will want to telnet to the spec'd redis server and subscribe
// to the same channel. Each client type will send 100 messages.
//
// This example works whether or not there are subscribers to the channel, but
// naturally the receiver count from Publish() method will depend on the number
// of subscribers.

func main() {

// Parse command-line flags; needed to let flags used by Go-Redis be parsed.
flag.Parse()

// create the client. Here we are using a synchronous client.
// Using the default ConnectionSpec, we are specifying the client to connect
// to db 13 (e.g. SELECT 13), and a password of go-redis (e.g. AUTH go-redis)

spec := redis.DefaultSpec().Password("go-redis")
channel := "example/pubsub/channel"

// publish using sync client
syncPublish(spec, channel)

// publish using async client
asyncPublish(spec, channel)

}

func syncPublish(spec *redis.ConnectionSpec, channel string) {

client, e := redis.NewSynchClientWithSpec(spec)
if e != nil {
log.Println("failed to create the sync client", e)
return
}

for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("this is message # %d (using sync client)!", i))
rcvCnt, err := client.Publish(channel, msg)
if err != nil {
fmt.Printf("Error on Publish - %s", err)
} else {
fmt.Printf("Message sent to %d subscribers\n", rcvCnt)
}
}

client.Quit()
}

func asyncPublish(spec *redis.ConnectionSpec, channel string) {

client, e := redis.NewAsynchClientWithSpec(spec)
if e != nil {
log.Println("failed to create the async client", e)
return
}

// ref will ultimately point to the last future returned from Publish()
var rcvCntFuture redis.FutureInt64
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("this is message # %d (using async client)!", i))
var err redis.Error
// publish the message and don't wait for the future
// we only care if an error was raised on publish
rcvCntFuture, err = client.Publish(channel, msg)
if err != nil {
fmt.Printf("Error on Publish - %s", err)
}
}

// ok, now let's wait until the last publish's future is done
// before quiting.
rcvCnt, fe := rcvCntFuture.Get()
if fe != nil {
fmt.Printf("Error on future get - %s\n", fe)
return
} else {
fmt.Printf("(LAST) Message sent to %d subscribers\n", rcvCnt)
}

client.Quit()
}
43 changes: 43 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ type Client interface {

// Redis LASTSAVE command.
Lastsave() (result int64, err Error)

// Redis PUBLISH command.
// Publishes a message to the named channels. This is a blocking call.
//
// Returns the number of PubSub subscribers that received the message.
// OR error if any.
Publish(channel string, message []byte) (recieverCout int64, err Error)
}

// The asynchronous client interface provides asynchronous call semantics with
Expand Down Expand Up @@ -506,6 +513,42 @@ type AsyncClient interface {

// Redis LASTSAVE command.
Lastsave() (result FutureInt64, err Error)

// Redis PUBLISH command.
// Publishes a message to the named channels.
//
// Returns the future for number of PubSub subscribers that received the message.
// OR error if any.
Publish(channel string, message []byte) (recieverCountFuture FutureInt64, err Error)
}

// PubSub Client
type PubSubClient interface {
// // REVU - publish can/should be be a method on both sync/async
// Publish(channel string, message []byte) (recieverCout int, err Error)

// returns the incoming messages channel for this client.
// Never nil.
// If the client has not currently subscribed to any PubSub channels
// then (obviously) nothing ever appears on this channel.
Channel() <-chan []byte

// return the subscribed channel ids
Subscriptions() []string

// unsubscribe from 1 or more pubsub channels.
//
// Returns the number of currently subscribed channels OR error (if any)
Unsubscribe(channel string, otherChannels ...string) (subscriptionCount int, err Error)

// Subscribes to one or more pubsub channels.
//
// Returns the number of currently subscribed channels OR error (if any)
Subscribe(channel string, otherChannels ...string) (subscriptionCount int, err Error)

// Quit closes the client and client reference can be disposed.
// Returns error, if any, e.g. network issues.
Quit() Error
}

// ----------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions specification.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,7 @@ var (
INFO Command = Command{"INFO", NO_ARG, BULK}
MONITOR Command = Command{"MONITOR", NO_ARG, VIRTUAL}
// TODO SORT (RequestType.MULTI_KEY, ResponseType.MULTI_BULK),
PUBLISH Command = Command{"PUBLISH", KEY_VALUE, NUMBER}
SUBSCRIBE Command = Command{"SUBSCRIBE", MULTI_KEY, MULTI_BULK}
UNSUBSCRIBE Command = Command{"UNSUBSCRIBE", MULTI_KEY, MULTI_BULK}
)
13 changes: 13 additions & 0 deletions synchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,3 +930,16 @@ func (c *syncClient) Lastsave() (result int64, err Error) {
return result, err

}

// Redis PUBLISH command.
func (c *syncClient) Publish(arg0 string, arg1 []byte) (rcvCnt int64, err Error) {
arg0bytes := []byte(arg0)
arg1bytes := arg1

var resp Response
resp, err = c.conn.ServiceRequest(&PUBLISH, [][]byte{arg0bytes, arg1bytes})
if err == nil {
rcvCnt = resp.GetNumberValue()
}
return rcvCnt, err
}

0 comments on commit 711ab51

Please sign in to comment.