Skip to content

Commit

Permalink
wip: support grpc for talking to leader
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jun 23, 2024
1 parent 1d60ae6 commit de05722
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 129 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,33 @@ Test the `Put()` API:

```sh
# Open a terminal and run:
$ kubepfm --target deployment/hedgedemo:8081:8081
$ kubepfm --target deployment/hedgedemo:9090:9090

# Open another terminal and run:
$ curl localhost:8081/put -d "samplekey samplevalue"
$ curl localhost:9090/put -d "samplekey samplevalue"

# To ensure a non-leader sender, you can also specify a
# non-leader pod for the kubepfm command above:
$ kubepfm --target hedgedemo-6b5bcd4998-n95n7:8081:8081
$ kubepfm --target hedgedemo-6b5bcd4998-n95n7:9090:9090
```

Test the `Get()` API:

```sh
# While kubepfm is running on a different terminal, run:
$ curl localhost:8081/get -d "samplekey"
$ curl localhost:9090/get -d "samplekey"
```

Test the `Send()` API:

```sh
# While kubepfm is running on a different terminal, run:
$ curl localhost:8081/send -d "hello-world"
$ curl localhost:9090/send -d "hello-world"
```

Test the `Broadcast()` API:

```sh
# While kubepfm is running on a different terminal, run:
$ curl localhost:8081/broadcast -d "hello-all"
$ curl localhost:9090/broadcast -d "hello-all"
```
47 changes: 41 additions & 6 deletions cmd/demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/rand"
"log/slog"
"net/http"
"os"
"os/signal"
Expand All @@ -16,6 +17,7 @@ import (

"cloud.google.com/go/spanner"
"github.com/flowerinthenight/hedge"
protov1 "github.com/flowerinthenight/hedge/proto/v1"
)

var (
Expand All @@ -27,13 +29,30 @@ var (

func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
client, err := spanner.NewClient(context.Background(), *dbstr)
ctx, cancel := context.WithCancel(context.Background())
client, err := spanner.NewClient(ctx, *dbstr)
if err != nil {
log.Println(err)
slog.Error("NewClient failed:", "err", err)
return
}

in := make(chan *hedge.StreamMessage)
out := make(chan *hedge.StreamMessage)
go func(_ctx context.Context) {
for {
select {
case <-_ctx.Done():
return
case m := <-in:
b, _ := json.Marshal(m)
slog.Info("input stream:", "val", string(b))
out <- &hedge.StreamMessage{Payload: &protov1.Payload{Data: []byte("one")}}
out <- &hedge.StreamMessage{Payload: &protov1.Payload{Data: []byte("two")}}
out <- nil // end
}
}
}(context.WithValue(ctx, struct{}{}, nil))

defer client.Close()
op := hedge.New(client, ":8080", *spindleTable, *lockName, *logTable,
hedge.WithGroupSyncInterval(time.Second*5),
Expand Down Expand Up @@ -95,10 +114,10 @@ func main() {
// return nil, nil
},
),
hedge.WithLeaderStreamChannels(in, out),
)

log.Println(op)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go op.Run(ctx, done)

Expand Down Expand Up @@ -171,6 +190,22 @@ func main() {
w.Write([]byte(out))
})

mux.HandleFunc("/streamsend", func(w http.ResponseWriter, r *http.Request) {
in, out, err := op.StreamToLeader(context.Background())
if err != nil {
w.Write([]byte(err.Error()))
return
}

in <- &hedge.StreamMessage{Payload: &protov1.Payload{Data: []byte("test")}}
close(in) // we're done with input
for m := range out {
slog.Info("reply:", "out", string(m.Payload.Data))
}

w.Write([]byte("OK"))
})

mux.HandleFunc("/broadcast", func(w http.ResponseWriter, r *http.Request) {
hostname, _ := os.Hostname()
msg := "hello" // default
Expand Down Expand Up @@ -211,7 +246,7 @@ func main() {
w.Write([]byte(strings.Join(outs, "\n")))
})

s := &http.Server{Addr: ":8081", Handler: mux}
s := &http.Server{Addr: ":9090", Handler: mux}
go s.ListenAndServe()

// Interrupt handler.
Expand Down
65 changes: 65 additions & 0 deletions grpcsvc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package hedge

import (
"io"
"sync"

protov1 "github.com/flowerinthenight/hedge/proto/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type service struct {
op *Op

protov1.UnimplementedHedgeServer
}

func (s *service) Send(hs protov1.Hedge_SendServer) error {
ctx := hs.Context()
var w sync.WaitGroup
w.Add(1)
go func() {
defer w.Done()
for {
select {
case <-ctx.Done():
return
default:
}

in, err := hs.Recv()
if err == io.EOF {
return
}

s.op.streamIn <- &StreamMessage{Payload: in}
}
}()

w.Add(1)
go func() {
defer w.Done()
for {
select {
case <-ctx.Done():
return
default:
}

out := <-s.op.streamOut
if out == nil {
return
}

hs.Send(out.Payload)
}
}()

w.Wait()
return nil
}

func (s *service) Broadcast(hs protov1.Hedge_BroadcastServer) error {
return status.Errorf(codes.Unimplemented, "method Broadcast not implemented")
}
Loading

0 comments on commit de05722

Please sign in to comment.