Skip to content

Commit 63d76ac

Browse files
committed
Add exponential backoff example
1 parent 4010a1c commit 63d76ac

File tree

1 file changed

+97
-0
lines changed
  • examples/exponential_backoff

1 file changed

+97
-0
lines changed

examples/exponential_backoff/main.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package main
2+
3+
import (
4+
"encoding/base64"
5+
"encoding/json"
6+
"flag"
7+
"fmt"
8+
"io"
9+
"log"
10+
"math"
11+
"net/http"
12+
"os"
13+
"strings"
14+
)
15+
16+
var (
17+
url = "https://localhost:8080"
18+
topic = "test_topic"
19+
)
20+
21+
func main() {
22+
flag.Parse()
23+
24+
publish()
25+
consume(0)
26+
}
27+
28+
func publish() {
29+
log := log.New(os.Stdout, "producer: ", 0)
30+
31+
msg := "hello_world"
32+
res, err := http.Post(fmt.Sprintf("%s/publish/%s", url, topic), "application/json", strings.NewReader(msg))
33+
if err != nil {
34+
log.Fatalf("failed to publish: %v\n", err)
35+
}
36+
if res.StatusCode != http.StatusCreated {
37+
log.Fatalf("failed to publish, received status code: %d\n", res.StatusCode)
38+
}
39+
40+
log.Printf("published message %s\n", msg)
41+
}
42+
43+
type subRes struct {
44+
Msg string `json:"msg"`
45+
Error string `json:"error"`
46+
DackCount int `json:"dackCount"`
47+
}
48+
49+
func consume(id int) {
50+
for {
51+
log := log.New(os.Stdout, fmt.Sprintf("consumer-%d: ", id), 0)
52+
53+
reader, writer := io.Pipe()
54+
enc := json.NewEncoder(writer)
55+
go func() {
56+
_ = enc.Encode("INIT")
57+
}()
58+
59+
res, err := http.Post(fmt.Sprintf("%s/subscribe/%s", url, topic), "application/json", reader)
60+
if err != nil {
61+
log.Fatalf("failed to consume: %v", err)
62+
}
63+
if res.StatusCode != http.StatusOK {
64+
log.Fatalf("failed to consume, received status code: %d", res.StatusCode)
65+
}
66+
67+
dec := json.NewDecoder(res.Body)
68+
69+
for {
70+
var out subRes
71+
if _ = dec.Decode(&out); err != nil {
72+
log.Fatalf("failed decode response body: %v\n", err)
73+
}
74+
75+
if out.Error != "" {
76+
log.Fatalf("received error: %s\n", out.Error)
77+
}
78+
79+
delay := int(math.Pow(2, float64(out.DackCount)))
80+
81+
strMsg := mustBase64Decode(out.Msg)
82+
log.Printf("consumed message: %s\n", strMsg)
83+
log.Printf("delaying message: %s by %ds\n", strMsg, delay)
84+
85+
_ = enc.Encode(fmt.Sprintf("DACK %d", delay))
86+
}
87+
}
88+
}
89+
90+
func mustBase64Decode(b string) string {
91+
s, err := base64.StdEncoding.DecodeString(string(b))
92+
if err != nil {
93+
log.Fatal(err)
94+
}
95+
96+
return string(s)
97+
}

0 commit comments

Comments
 (0)