Skip to content

Commit

Permalink
chore(example48): add publish and subscribe.
Browse files Browse the repository at this point in the history
Signed-off-by: Bo-Yi Wu <[email protected]>
  • Loading branch information
appleboy committed Apr 30, 2022
1 parent 7cbccdb commit e215e06
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
26 changes: 26 additions & 0 deletions example48-pub-sub-pattern/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,42 @@ type hub struct {
subs map[*subscriber]struct{}
}

func (h *hub) publish(ctx context.Context, msg *message) error {
h.Lock()
for s := range h.subs {
s.publish(ctx, msg)
}
h.Unlock()

return nil
}

func (h *hub) subscribe(ctx context.Context, s *subscriber) error {
h.Lock()
h.subs[s] = struct{}{}
h.Unlock()

go func() {
select {
case <-ctx.Done():
h.Lock()
delete(h.subs, s)
h.Unlock()
}
}()

go s.run(ctx)

return nil
}

func (h *hub) Subscribers() int {
h.Lock()
c := len(h.subs)
h.Unlock()
return c
}

func newHub() *hub {
return &hub{
subs: map[*subscriber]struct{}{},
Expand Down
11 changes: 10 additions & 1 deletion example48-pub-sub-pattern/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package main

import "context"
import (
"context"
"time"
)

func main() {
ctx := context.Background()
Expand All @@ -12,4 +15,10 @@ func main() {
h.subscribe(ctx, sub01)
h.subscribe(ctx, sub02)
h.subscribe(ctx, sub03)

_ = h.publish(ctx, &message{data: []byte("test01")})
_ = h.publish(ctx, &message{data: []byte("test02")})
_ = h.publish(ctx, &message{data: []byte("test03")})

time.Sleep(2 * time.Second)
}
11 changes: 10 additions & 1 deletion example48-pub-sub-pattern/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s *subscriber) run(ctx context.Context) {
for {
select {
case msg := <-s.handler:
log.Println(msg.data)
log.Println(s.name, string(msg.data))
case <-s.quit:
return
case <-ctx.Done():
Expand All @@ -31,6 +31,15 @@ func (s *subscriber) run(ctx context.Context) {
}
}

func (s *subscriber) publish(ctx context.Context, msg *message) {
select {
case <-ctx.Done():
return
case s.handler <- msg:
default:
}
}

func newSubscriber(name string) *subscriber {
return &subscriber{
name: name,
Expand Down

0 comments on commit e215e06

Please sign in to comment.